cj0 2019-12-10
RabbitMQ 是一个用 Erlang 编写的开源的消息队列中间件,它实现了 AMQP 协议(其实还实现了 MTQQ 等消息协议)。和其他两个主流的消息队列中间件 Kafka 和 RocketMQ 相比,拥有更低的延迟、更高的稳定性、更完备的功能、更完善的文档支持以及较活跃的开源社区支持,但是在吞吐量上和分布式扩展能力上逊色一些。
AMQP(Advanced Message Queuing Protocol),高级消息队列协议,是一个语言无关的面向消息中间件的开放标准,它定义了一套消息中间件的模型架构,即生产者将消息发送给交换机,交换机根据路由键将消息路由到队列,消费者通过订阅队列来获取消息。从更低的层面来看,AMQP是一套应用层的通信协议,它跟 HTTP 这样的协议一样提供了 TCP 之上的报文封装定义,定义了协议命令的交互规则。RabbitMQ 就用 Erlang 实现了 AMQP。
通常消息队列只会有生产者、队列和消费者三个概念,而 AMQP 多引入了一个概念 Exchange 交换机(也译交换器),生产者会将消息发送到交换机,交换机再根据自身的路由策略和 routing key(路由键)将消息转发到合适的队列上。这里 Exchange 其实更应该叫 Router 也就是路由器而不是交换机,因为它的作用更像网络路由器根据 IP 将数据路由到指定的设备。
RabbitMQ 的 routing key 标识了消息的目标队列。它有两个含义,在发送消息的时候要指定一个 routing key,下文所说的 routing key 都是指发送消息指定的 routing key;在绑定交换机和路由器的时候也要指定一个 routing key,这时候这个 routing key 又叫 binding key,binding key 是可以用通配符表示的。下文描述“绑定时指定的 routing key”的时候都会用 binding key 替代。
绑定是随着交换机这个概念一起出现的概念,指的是将交换机和队列关联起来的动作或者说是关联关系。沿用上述的网络路由器的类比,绑定就类似于将路由器用网线和终端设备连接起来(上述的 binding key 相当于网线的命名),只有建立了绑定关系交换机才能把消息路由到匹配 routing key 的队列。
交换机将消息路由到哪些队列由两个因素共同决定,分别是 routing key 和交换机类型。RabbitMQ 的交换机有4种类型:
direct 类型的交换器会把消息路由到 binding key 和发送消息指定的 routing key 完全相同的队列上。如下图生产者P向交换机X发送了一个routing key为 error 的消息就会转发到两个队列上,如果 routing key 为 info就只会转发到下面的队列。
只有用到 topic 这种路由类型,binding 和 binding key 才真正有其产生的意义。topic 和 direct 的区别在于,direct 要求消息的 routing key 和 绑定的 binding key 完全相同,而 topic 支持 routing key 和 binding key 模糊匹配。如下图生产者P向交换机X发送了一个 routing key 为 a.orange.drink 的消息的话,交换机只会把消息路由到 Q1 队列,如果消息的 routing key 为 a.orange.rabbit 的话,交换机会把消息路由到 Q1 和 Q2 两个队列。这里"*"代表1个单词,"#"代表0到n个单词,这里说的“单词”指使用"."进行分隔的字符串单位。具体的匹配规则见 官方文档。
所有发送到 fanout 交换机的消息都会被路由到所有与该交换机绑定的队列上,routing key 和 binding key 不会被 fanout 交换机理会。fanout 可以实现广播消息的目的,不过 RabbitMQ 不支持单个队列层面的广播消费,Kafka 的消息存储在 topic 这个逻辑层面,广播消息只需要为每个消费者分别维护 topic 上的消息位移;而 RabbitMQ 的消息直接存储在队列这个逻辑层面,要实现广播消费就要为每个消费者创建一个队列,如果多个消费者订阅同一个队列,RabbitMQ 默认会采用负载均衡的策略将消息分摊给多个消费者,而不是所有消费者都收到所有消息。
这种交换机类型比较冷门不太实用,不说了。
这里使用 Docker 快速部署,并开启自带的web管理界面。
docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management
1、添加依赖,这里使用 Maven
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、配置文件,这里使用 yml
spring: rabbitmq: username: guest # RabbitMQ默认用户名和密码,生产环境记得改 password: guest addresses: 127.0.0.1:5672
3、声明队列
@Configuration public class RabbitConfig { @Bean public Queue Queue() { //单参构造传入队列名称 return new Queue("myQueue"); } }
4、生产者
@Component public class DemoSender { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); this.rabbitTemplate.convertAndSend("hello", context); } }
5、消费者
@Component public class DemoReceiver { //注解填写要消费的队列名称 @RabbitListener(queues = "myQueue") public void receive(String message) { System.out.println("Received " + message); } } }
6、测试
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitDemoTest { @Autowired private DemoSender demoSender; @Test public void testSend() throws Exception { demoSender.send(); } }
配置无误的话运行上面的单元测试消费者会把消费到的消息打印出来,这里我们只是用一个 hello world级别的demo验证了 RabbitMQ 的正常运行。
@Configuration public class RabbitConfig { /** * 声明一个名为exchange.d的direct交换机 */ @Bean DirectExchange exchangeD() { return new DirectExchange("exchange.d"); } /** * 声明一个名为queue-d的队列 */ @Bean public Queue queueD() { return new Queue("queue.d"); } /** * 绑定队列queue.d和交换机exchange.d,使用队列名称作为binding key */ @Bean public Binding bindingDirect() { return BindingBuilder.bind(queueD()).to(exchangeD()).withQueueName(); } }
测试
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitDemoTest { @Autowired private RabbitTemplate rabbitTemplate; //订阅queue.d的消费者可以消费到这条消息 @Test public void sendCorrect() { String context = "hello " + new Date(); rabbitTemplate.convertAndSend("exchange.d", "queue.d", context); } //订阅queue.d的消费者无法消费到到这条消息 @Test public void sendIncorrect() { String context = "hello " + new Date(); rabbitTemplate.convertAndSend("exchange.d", "queue.d.2", context); } }
@Configuration public class RabbitConfig { /** * 声明一个名为exchange.t的topic交换机 */ @Bean TopicExchange exchangeT() { return new TopicExchange("exchange.t"); } @Bean public Queue queueT1() { return new Queue("queue.t.1"); } @Bean public Queue queueT2() { return new Queue("queue.t.2"); } /** * 绑定队列queue.t.1和交换机exchange.t,使用queue.#作为binding key */ @Bean public Binding bindingT1() { return BindingBuilder.bind(queueT1()).to(exchangeT()).with("queue.#"); } /** * 绑定队列queue.t.2和交换机exchange.t,使用queue.t.2作为binding key */ @Bean public Binding bindingT2() { return BindingBuilder.bind(queueT2()).to(exchangeT()).with("queue.t.2"); } }
测试
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitDemoTest { @Autowired private RabbitTemplate rabbitTemplate; //两个队列的消费者都能消费到这条消息 @Test public void send1() { String context = "hello " + new Date(); rabbitTemplate.convertAndSend("exchange.t", "queue.t.2", context); } //只有队列queue.t.1的消费者能消费到这条消息,因为它绑定的路由键queue.#能匹配queue.t.3 @Test public void send1() { String context = "hello " + new Date(); rabbitTemplate.convertAndSend("exchange.t", "queue.t.3", context); } }
@Configuration public class RabbitConfig { /** * 声明一个名为exchange.f的fanout交换机 */ @Bean FanoutExchange exchangeF() { return new TopicExchange("exchange.f"); } @Bean public Queue queueF1() { return new Queue("queue.f.1"); } @Bean public Queue queueF2() { return new Queue("queue.f.2"); } /** * 绑定队列queue.f.1和交换机exchange.f,不需要指定binding key */ @Bean public Binding bindingF1() { return BindingBuilder.bind(queueF1()).to(exchangeF()); } /** * 绑定队列queue.f.2和交换机exchange.f,不需要指定binding key */ @Bean public Binding bindingF2() { return BindingBuilder.bind(queueF2()).to(exchangeF()); } }
测试
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitDemoTest { @Autowired private RabbitTemplate rabbitTemplate; //两个队列的消费者都能消费到这条消息 @Test public void send() { String context = "hello " + new Date(); //发送消息时不需要指定routing key,指定了也会被交换机忽略 rabbitTemplate.convertAndSend("exchange.f", "", context); } }
对于比较重要不能丢失的消息,消费确认机制非常重要。RabbitMQ 提供了消息确认机制,当开启自动确认时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存或磁盘中删除,不管消费者是否真正消费到了消息;当关闭自动确认机制,RabbitMQ 会等待消费者主动回复确认信号才会移除消息。spring boot 可以在配置文件关闭自动确认,使用手动确认:
spring: rabbitmq: listener: simple: acknowledge-mode: manual
消费端手动确认消息:
@Component public class DemoReceiver { @RabbitListener(queues = "myQueue") public void receive(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { System.out.println("Received " + message); channel.basicAck(deliveryTag, false); } }
对于比较重要不能丢失的消息,保证消息能够持久化是必要的,消息持久化能防止服务器宕机导致队列中的消息丢失。RabbitMQ 的持久化要求将交换机、队列和消息都配置为持久化。在 Spring/Spring Boot 中只需要配置交换机和队列为持久化就行。其实上面的配置中队列和交换机都是默认持久化的,以第一个demo为例,显式的配置如下:
@Configuration public class RabbitConfig { @Bean DirectExchange exchangeD() { //第二个参数为true表示持久化该交换机 return new DirectExchange("exchange.d", true, false); } @Bean public Queue queueD() { //第二个参数为true表示持久化该队列 return new Queue("queue.d", true); } ... }
配置持久化之后,先不启动消费者,生产者将消息发送到RabbitMQ 后,关闭 RabbitMQ 服务并重启,再启动消费者,消费者仍能收到消息,证明 RabbitMQ 宕机后消息仍然存在,持久化是生效的。并不是所有消息都应该开启持久化,由于持久化需要 RabbitMQ 将消息写入磁盘来实现,过多的持久化必定影响 RabbitMQ 的性能,类似日志这样的非关键数据就不建议持久化,当然这个都要看业务。
为防止消息长期积压在队列中导致队列甚至是磁盘被占满,应该给消息设置过期时间。
@Bean public Queue queue() { Map<String, Object> args = new HashMap<>(1); //设置该队列的消息的过期时间为1小时 args.put("x-message-ttl", 3600 * 1000); return new Queue("queue.d", true, false, false, args); }
当消息过了过期时间还没有被消费,消息就会变成死信(Dead message),除了消息过期这种情况,当消息要入队的队列已达到最大长度,或者消息被消费者拒绝且不允许重发,消息也会变成死信。为了死信不被丢弃删除,防止没有被成功消费的消息无迹可寻,RabbitMQ 允许设置一种特殊的交换机————DLX 死信交换机(Dead-Letter-Exchange),使变成死信的消息重新发送到死信交换机上。我们可以另外声明一个队列并绑定该死信交换机,这种绑定死信交换机的队列就叫死信队列。通过订阅死信队列,我们就可以收集到消费失败的消息,进行问题的排查分析。配置死信队列的方式也很简单:
@Bean DirectExchange exchangeNormal() { //声明一个交换机用于常规消息的分发 return new DirectExchange("normal_exchange", true, false); } @Bean DirectExchange exchangeDLX() { //声明一个交换机作为死信交换机 return new DirectExchange("dlx_exchange", true, false); } @Bean public Queue queue() { Map<String, Object> args = new HashMap<>(2); args.put("x-message-ttl", 3600 * 1000); //将dlx_exchange设置为该队列的死信交换机 args.put("x-dead-letter-exchange", "dlx_exchange"); return new Queue("queue.normal", true, false, false, args); } @Bean public Binding bindingNormal() { //常规业务队列和业务交换机绑定 return BindingBuilder.bind(queue()).to(exchangeNormal()).withQueueName(); } @Bean public Queue queueDLX() { //声明一个死信队列专门消费死信交换机转发的死信 return new Queue("queue.dlx", false); } @Bean public Binding bindingDLX() { //绑定死信队列和死信交换机 return BindingBuilder.bind(queueDLX()).to(exchangeDLX()).withQueueName(); }
生产环境下没人用单点的 RabbitMQ ,其实任何基础服务和核心服务都不应该是单点的,否则单点故障会影响整个系统。RabbitMQ 提供两种集群模式以实现高可用性。
普通的 RabbitMQ 集群允许消费者和生产者在 RabbitMQ 出现单点故障的时候仍然能够进行生产和消费,通过添加集群节点可以线性地提高消息的吞吐量。这里演示搭建2个节点的集群:
1、启动2个 RabbitMQ 节点:
docker run -d -p 5672:5672 -p 15672:15672 --name rabbit-node1 --hostname host1 -e RABBITMQ_ERLANG_COOKIE='myrabbitcookie' rabbitmq:3-management docker run -d -p 5673:5672 -p 15673:15672 --name rabbit-node2 --hostname host2 -e RABBITMQ_ERLANG_COOKIE='myrabbitcookie' --link rabbit-node1:host1 rabbitmq:3-management
这里需要注意的是设置了环境变量 RABBITMQ_ERLANG_COOKIE,相当于写 /var/lib/rabbitmq/.erlang.cookie 文件,所有节点的这个值需要保证相同,才能够协商加入集群。
2、节点2加入节点1组成集群:
docker exec -it rabbit-node2 bash rabbitmqctl join_cluster --ram
这里需要注意的是 --ram 这个设置,它将节点2指定为内存节点,内存节点将队列、交换机、用户等元信息放在内存里;如果指定--disc 就是磁盘节点,顾名思义它将这些元信息存在磁盘。配置内存节点有助于提高性能,但是集群中至少需要一个磁盘节点。
普通集群其实并不够高可用,虽然集群的所有节点都会同步所有元数据,但是消息数据只保存在于一个节点上。一旦该节点宕机,其他节点可以重新声明队列继续进行生产消费,但是宕机节点的数据在节点恢复前无法恢复。如果宕机节点刚好保存了某个持久化队列的消息数据,我们称这个节点是主节点,那么这个队列甚至无法在其他节点上被重新使用,必须等到主节点恢复。因此,对于重要的业务消息,在将其设置为持久化的同时,我们还要使用另一种真正高可用的集群模式:镜像队列集群。
RabbitMQ 的镜像队列机制,可以将队列镜像到集群的所有节点中,如果一个节点失效了,消息通常不会丢失还能切换到其他节点消费到。可以这样配置镜像队列:
rabbitmqctl set_policy ha-all "hello" '{"ha-mode":"all"}'
这里指定了完整队列名"hello"将其设为镜像队列,只有设置了镜像模式的队列才会被镜像到其他节点。也可以使用通配符,"^ha." 表示将所有ha.开头的队列设置为镜像队列,或者使用"^" 指定所有队列。ha-all 和 "ha-mode":"all" 指定镜像到所有节点,也可以镜像到特定节点:
rabbitmqctl set_policy ha-nodes "^ha\." '{"ha-mode":"nodes","ha-params":["", ""]}
配置了集群和镜像队列之后,在应用的配置文件指定集群的节点:
spring: rabbitmq: addresses: 127.0.0.1:5672,127.0.0.1:5673
关闭任意节点进行验证即可,消费者和生产者都会自动切换到其他节点。
跟设置持久化的原则一样,并不是所有队列都设置镜像最好,在集群的每个节点镜像备份相同的数据必然对集群性能有一定的负担。如果是不需要持久化的非重要消息,可以考虑不设置镜像。
本文根据个人经验介绍了 RabbitMQ 从概念到应用的相关知识,尽可能把关键的点都写出来,但是由于能力有限追求全面的同时就很难兼顾详细的配置说明和截图验证,实际上生产环境中的配置当然比这里更多也更严谨,真正技术落地还是要参考官方文档才是!