RabbitMQ消息确认机制

liym 2019-03-21

应用RabbitMQ可靠性传输机制实现Redis缓存的实时更新

消息中间件集群崩溃,如何保证百万生产数据不丢失?

RabbitMQ暂时放在了自己的内存中,还没来得及投递给下游的仓储服务呢,此时RabbitMQ突然宕机了,会怎么样?

答案其实很简单,默认情况下,按照我们目前的代码和配置,这个数据就会丢失了。

持久化

队列持久化:

//queuechannel.queue_declare(queue='hello2',durable=True)

channel.queueDeclare(
 "warehouse_schedule_delivery",
 true, 
 false,
 false,
 null);

核心在于第二个参数,第二个参数是true。意思就是说,这个创建的queue是durable的,也就是支持持久化的。

这样,RabbitMQ会把这queue的相关信息持久化的存储到磁盘上去,即使RabbitMQ宕机后重启,也会恢复之前创建好的这个queue。

消息持久化:

现在你的queue的信息可以持久化了,RabbitMQ宕机重启后会自动恢复queue。但是,你的queue里的message数据呢?

queue里都是订单服务发送过去的订单消息数据,如果RabbitMQ还没来得及投递queue里的订单消息到仓储服务,结果RabbitMQ就宕机了。

那此时RabbitMQ重启之后,他可以恢复queue的信息,但是queue的message数据是没法恢复了。

所以就需要在你的订单服务发送消息到RabbitMQ的时候,【定义这条消息也是durable】,即持久化的。

#channel.basic_publish(exchange='',routing_key='hello2',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))


/* 参数:

 * 向server发布一条消息
 * 参数1:exchange名字,若为空则使用默认的exchange
 * 参数2:routing key
 * 参数3:其他的属性
 * 参数4:消息体

 * 【RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,

 * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃】
 */

   

channel.basicPublish("", "warehouse_schedule_delivery",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

 通过上面的方式来发送消息,就可以让发送出去的消息是持久化的。

一旦标记了消息是持久化之后,就会让RabbitMQ把消息持久化写入到磁盘上去,此时如果RabbitMQ还没投递数据到仓储服务,结果就突然宕机了。那么再次重启的时候,就会把磁盘上持久化的消息给加载出来。

但是这里要注意一点,RabbitMQ的消息持久化,是不承诺100%的消息不丢失的。

因为有可能【RabbitMQ接收到了消息,但是还没来得及持久化到磁盘,他自己就宕机了】,【这个时候消息还是会丢失的】。

如果要完全100%保证写入RabbitMQ的数据必须落地磁盘,不会丢失,需要依靠其他的机制。

RabbitMQ消息确认机制

Message acknowledgment

消费者消息确认机制

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者消息确认机制(message acknowledgement)。采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息。

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。

【默认情况下,RabbitMQ会顺序的分发每个Message。当每个【收到ack后,会将该Message删除】,然后将下一个Message分发到下一个Consumer】这种分发方式叫做round-robin。这种分发还有问题。

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,【每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了】。

如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。【为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack】。而应该是在【处理完数据后发送ack】。(在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了,【如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer】。这样就保证了在Consumer异常退出的情况下数据也不会丢失)

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;【如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理】。【这里不存在timeout概念】,【一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开】。

这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。

RPC

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 

但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

【RabbitMQ开启手动ack机制保证消费端数据不丢失的时候,prefetch机制对消费者的吞吐量以及内存消耗的影响。

通过分析,我们知道了prefetch过大容易导致内存溢出,prefetch过小又会导致消费吞吐量过低,所以在实际项目中需要慎重测试和设置。

------------------------------------------------------------------------------------------------------

生产者消息确认机制

当消息发送出去之后,我们如何知道消息有没有正确到达exchange呢?如果在这个过程中,消息丢失了,我们根本不知道发生了什么,也不知道是什么原因导致消息发送失败了

为解决这个问题,主要有如下两种方案:

【通过事务】机制实现

【通过生产者消息确认机制】(publisher confirm)实现

但是使用【事务机制实现会严重降低RabbitMQ的消息吞吐量】,我们采用一种轻量级的方案——生产者消息确认机制

  

什么是生产者消息确认机制?

简而言之,就是:生产者发送的消息一旦被投递到所有匹配的队列之后,就会发送一个确认消息给生产者,这就使得生产者知晓消息已经正确到达了目的地。

如果消息和队列是持久化存储的,那么确认消息会在消息写入磁盘之后发出。

再补充一个Mandatory参数:【当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。】

相关推荐