OnMyHeart 2014-08-21
一、介绍
1、异步消息
异步消息是一个非常普通并且广泛使用的技术,例如Skype。这些服务都有如下特征:
2、AMQP
AMQP是一个异步消息传递所使用的应用层协议规范,是一个抽象的协议。AMQP当中有四个概念非常重要:虚拟主机(virtual host)、交换机(exchange)、队列(queue)和绑定(binding)。一个虚拟主机持有一组交换机、队列和绑定。RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。
队列(queue)是你的消息的终点,可以理解成消息的容器。消息就一直在里面,直到有客户端连接到这个队列并且将其取走为止。
交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键的属性,就是一个简单的字符串。交换机当中有一系列的绑定,即路由规则。每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机用5个核,另外3个核留下来做消息处理。
一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。交换机的类型:
RabbitMQ是一个Erlang编写的AMQP服务器。他的核心原理非常简单:接收和发送消息。你可以把它想象成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,他处理的不是纸,而是接收、存储和发送二进制的数据--消息。
3、RabbitMQ特点
支持持久化:如果RabbitMQ死掉了,消息并不会丢失,当队列重启,一切都会恢复。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,大多数用户都会选择持久化。消息队列持久化包括3个部分:
如果exchange和queue都是持久化的,那么他们之间的binding也是持久化的。
如果exchange和queue两者间有一个持久化,一个非持久化,就不允许建立绑定。
RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放到磁盘。不过,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。
良好的设计架构可以如下:在一个集群里,有3台以上机器,其中1台使用磁盘模式,其他使用内存模式。其他几台为内存模式的节点,无疑速度更快,因此客户端连接访问它们。而磁盘模式的 节点,由于磁盘IO相对较慢,因此仅作为数据备份使用。
二、原理
一个用作发送消息,另一个接受消息并打印消息内容
其中:p为生产者;hello表示队列名称;c为消费者;首先要做的事情就是建立一个到RabbitMQ服务器的连接,在发送消息之前我们要确认队列是存在的,如果我们把消息发送到一个不存在的队列,RabbitMQ会丢弃这条消息。
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
三、RabbitMQ安装
RabbitMQ使用的是AMQP协议。要使用它就必须需要一个使用同样协议的库。几乎所有的编程语言都有可选择的库。python也一样,可以从以下几个库中选择:py-amqplib、txAMQP、pika。
下载RabbitMQ:http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.1/rabbitmq-server-3.2.1-1. noarch.rpm
安装:rpm -ivh rabbitmq-server-3.2.1-1.noarch.rpm
四、结构图
Exchange:消息交换机,它指定消息按什么规则,路由到那个队列。
Queue:消息队列载体,每个消息都会被插入到一个或多个队列。
Channel:消息通道,在客户端的每个链接里,可建立多个Channel,每个Channel代表一个会话任务。
routing key:路由关键字,Exchange根据这个关键字进行消息投递。
消息队列的使用过程大概如下:
五、具体应用
1、服务器段设置
创建用户myuser和密码mypassword: $ rabbitmqctl add_user myuser mypassword 创建虚拟主机名myvhost: $ rabbitmqctl add_vhost myvhost 设置权限: $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" 启动: ./rabbitmq-server 停止: ./rabbitmqctl stop
2、客户端部分代码
from amqplib import client_0_8 as amqp conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="guest", virtual_host="/", insist=False) chan = conn.channel()
AMQP支持在一个TCP连接上启用多个MQ通讯Channel,每个channel都可以被应用作为通讯流。每个AMQP程序都至少有一个连接和一个channel。
每个channel都被分配一个整数标示,自动由connection()类的.channel()方法维护。或者你可以使用.channel(x)来指定channel标示,其中x是你想要使用的channel标示。通常情况下,推荐使用.channel()方法来自从分配channel标示,以便防止冲突。
前面已经有了一个可用的连接和channel。现在代码将分成两个应用,生产者和消费者。先创建一个消费者程序,包含一个叫做po_box的队列和一个叫sorting_room的交换机。
chan.queue_declare(queue="po_box", durable=True,exclusive=False, auto_delete=False) chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,auto_delete=False,)
首先,创建了一个名叫po_box的队列,它是durable的(重启之后会重新建立),并且最后一个消费者断开的时候不会自动删除(auto_delete=false)。在常见durable的队列的时候,将auto_delete设置为FALSE是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durable和auto_delete都设置成TRUE,只有尚有消费者活动的队列可以在rabbitMQ意外崩溃的时候自动恢复。
另外一个标志exclusive。如果设置为TRUE,只有创建这个队列的消费者程序才允许连接该队列。这种队列对于这个消费者程序是私有的。
还有另外一个交换机声明,创建了一个名字叫”sorting_room“的交换机。auto_delete和durable的含义和队列是一样的。但是.excange_declare()还有另外一个参数type,用来指定要创建的交换机的类型:fanout、direct和topic。
到此为止,已经有了一个可以接受消息的队列和一个可以发送消息的交换机。不过需要创建一个绑定,把他们连接起来:
chan.queue_bind(queue=”po_box”, exchange=”sorting_room”, routing_key=”jason”)
这个绑定的过程非常直接。任何送到交换机“sorting_room”的具有路由键“jason” 的消息都被路由到名为“po_box” 的队列。
现在有两种方法从队列当中取出消息。第一个是调用chan.basic_get(),主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body 会在没有消息的时候崩掉):
msg = chan.basic_get("po_box") print msg.body chan.basic_ack(msg.delivery_tag)
但是如果你想要应用程序在消息到达的时候立即得到通知怎么办?这种情况下不能使用chan.basic_get(),你需要用chan.basic_consume()注册一个新消息到达的回调:
def recv_callback(msg): print 'Received: ' + msg.body chan.basic_consume(queue='po_box', no_ack=True, callback=recv_callback, consumer_tag="testtag") while True: chan.wait() chan.basic_cancel("testtag")
chan.wait() 放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。chan.basic_cancel() 用来注销该回调函数。参数consumer_tag 当中指定的字符串和chan.basic_consume() 注册的一致。在这个例子当中chan.basic_cancel() 不会被调用到,因为上面是个无限循环…… 不过你需要知道这个调用,所以我把它放在了代码里。
需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get()和chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack 参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack()方法,使用消息的delivery_tag属性作为参数。
下面的代码示例表明如何将一个简单消息发送到交换区“sorting_room”,并且标记为路由键“jason” :
msg = amqp.Message("Test message!") msg.properties["delivery_mode"] = 2 chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
你也许注意到我们设置消息的delivery_mode属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。
剩下的最后一件事情(生产者和消费者都需要调用的)是关闭channel和连接:
chan.close() conn.close()
结果:
生产者: python amqp_publisher.py thank python amqp_publisher.py think1 消费者: Received:thank from channel #1 Received:thank1 from channel #1
六、队列模式
1、一个队列对应一个消费者:
2、一个队列对应多个消费者
3、一个交换队列对应两个队列,注意要先建立交换机和队列的绑定,才可以发送消息:
4、一个交换机和一个队列进行绑定,交换机类型为direct
5、一个交换机和一个队列进行多个绑定,交换机类型为topic
6、远程程序调用