OnMyHeart 2019-10-24
在这一项中,我们创建一个工作队列,用于在多个工作者之间分配耗时的任务。
Work Queues的主要思想是,避免立即执行资源密集的任务而不得不等待其执行完成。我们将任务封装为消息并将其发送到队列中,在后台运行的一个工作进程将会弹出任务并最终执行该任务,当你管理许多工作节点时,任务就会在他们之间共享。
这个概念在web应用程序中尤其有用,因为在一个短HTTP请求窗口中不可能处理复杂的任务
下面我们发送一个特殊的String,用Thread.sleep()辅助,来模拟一些耗时的工作,用点来简单表示一个任务的复杂度,例如Hello.表示此任务需要两秒进行处理。
我们将在原来生产者的基础上修改一些代码,文件名叫send_work.py:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost'))#默认端口5672,可不写 #创建通道,声明一个管道,在管道里发送消息 channel = connection.channel() #在管道里声明queue channel.queue_declare(queue='hello') message = ''.join(sys.argv[1:]) or "Hello World" #一条消息永远不能直接发送到队列,它总需要经过一个交换exchange channel.basic_publish(exchange='', routing_key='hello', body=message)#设置routing_key(消息队列的名称)和body(发送的内容) print("[x] Sent %r" % message) connection.close()#关闭连接,队列关闭
我们将在原来消费者的基础上修改一些代码,文件名叫receive_work.py:
#receiving(消费者接收者) import pika import time #创建一个连接 connection = pika.BlockingConnection( pika.ConnectionParameters('localhost'))#默认端口5672,可不写 #创建通道,声明一个管道,在管道里发送消息 channel = connection.channel() #把消息队列的名字为hello,把消费者和queue绑定起来,生产者和queue的也是hello #为什么又声明了一个hello队列 #如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次 channel.queue_declare(queue='hello') #回调函数get消息体 def callback(ch,method,properties,body):#四个参数为标准格式 #管道内存对象,内容相关信息 print("打印看下是什么:",ch,method,properties) #打印看下是什么 print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print("[x] Done") #消费消息 channel.basic_consume( queue='hello',#你要从那个队列里收消息 on_message_callback=callback,#如果收到消息,就调用callback函数来处理消息 auto_ack=True #写的话,如果接收消息,机器宕机消息就丢了 #一般不写,宕机则生产者检测到发给其他消费者 ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #创建死循环,监听消息队列,可使用CTRL+C结束监听
对于上面的,开启两个控制台的消费者,开启一个生产者的控制台,运行的结果如下:
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者,平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环(平均分配)
完成一项任务可能需要一定的时间。你可能会想,如果一个消费者开始一项长时间的任务,并且只完成了一部分,那么会发生什么。在我们当前的代码中,一旦RabbitMQ向客户发送一条消息,它立即将其标记为删除。在这种情况下。如果我们强制关闭了一个工作节点。我们将丢失它正在处理的消息。我们还将丢失发送给这个特定工作者的所有消息,但是还没有处理
通常我们不希望因为一个节点挂掉而丢失任何消息,而希望能够将这些消息传递给其他存活节点进行处理
RabbitMQ支持message acknowledgments(消息确认),一条特定的消息被接收后,返回一个ack告诉RabbitMQ可以随意地进行删除
如果一个消费者挂了(通道关闭,TCP连接关闭)就不能发送ack给RabbitMQ,此时RabbitMQ就会意识到,某条消息没有被处理完成,那么就会将其重新发送到其他消费者。这种处理流程保证了信息不会丢失,即使偶尔有消费者挂掉
RabbitMQ将在某个消费者挂掉后重新传递消息,即使处理消息需要很长时间
<Contextpath="/recon"reloadable="true"docBase="D:\workspace\seashell-recon\seashell-appcontroller\sr