jiangxuege 2019-11-04
title: 基于springboot实现rabbitmq消息通信
date: 2019-09-11 09:00:30
tags:
- [rabbitmq]
categories:
- [springboot]
permalink: zxh
ps : 这篇文章比较长,读者还是需要耐心的阅读的。干货多多。
在分布式项目中为了提高性能,也为了实现项目规范,我们都会在处理消息队列的时候引入消息中间件。中间件的作用一个是为了解耦,还有一个是性能提升。消息中间件我们每个人每天都在接触,相信大家都用过美团或者是听过美团。从程序员的角度看美团外卖涉及三方角色。【商家】【骑手】【顾客】。这三者的关系简单理解如下
下面案例会通过代码说明,如下是项目结构
rabbit-demo
今天我们着重介绍下订单的流程,那为什么选择下订单流程而不选择其他两个流程呢。因为派送流程不是消息队列。如果非要规划为消息块我只能认为是消息消费。而发订单呢流程和下订单是一样的。考虑读者应该都是平时点餐的那位。对下订单应该更加的了解。
我们想想在美团外卖推出之前我们点餐的过程。是顾客到商家店铺里进行买单消费的。这种模式存在什么弊端。这就必须商家的店铺够大,人手够多才能够让我们消费者不拥堵起来。但是这样就增加了商家的成本。所以在这种局限的条件下我们的线上交易诞生了。
有了美团我们就只需要在手机点餐,商家就会进行选择行接受,然后出货。这就是下单的一个流程。后面的就是骑手商家流程了。
我们的顾客相当于是在向消息队列中添加消息,消息体就是我们的订单。在美团外卖的系统中同一时刻可能多人点单这就导致商家一时间处理不过来,这是我们的mq就作为一中缓存机制先将顾客的订单本地化,mq按先进先出的顺序将消息有条不紊的派发到只能的商家手中。这就解决我们人多导致订单繁琐的问题了。
好了以上是我们用mq的好处。下面我们通过代码看看mq具体的发送消息的集中方式。
由于RabbitMQ是用Erlang语言编写的,因此需要先安装Erlang。
安装完Erlang之后,我们就可以安装RabbitMQ了。
安装好之后,RabbitMQ就作为一个服务按照默认方式进行启动了
运行命令rabbitmq-plugins enable rabbitmq_management 开启Web管理插件
通过浏览器访问http://localhost:15672,并通过默认用户guest进行登录,密码也是guest,登录后的页面:
关于springboot的搭建这里就不能细说了。我们直接在zxhtom框架中扩展了。只要在里面添加如下依赖就行了
<!-- rabbit mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
然后我们只需要创建一个RabbitConfig
类。这个类的作用就是让mq在项目启动初期检查并完成各个组件的创建。既然交给spring完成自然需要在类上加上@Configuration
注解。
只需要完成以上操作,我们的mq服务就算是配置完成了。剩下的就应该是生产消费了。
这里比较注意的是RabbitConfig
这个类,这个类里到底是什么东西,我们在看代码前我们先来了解下mq的原理。
Queue : 消息队列 , 生产者的直接接触对象。
Exchange : 交换机 , 正常情况消费者是直接接触交换机的。
Routing Key : 生产者和消费者接触的两个不同对象。这就需要routing key 将两者对象进行关联起来
Binding : 通过routing key进行绑定到一起。
下面是消费者、生产者、队列、交换机、rk、binding之间的关系
上面我们了解了mq的内部结构,其实还是很简单的。但是我们mq有的时候有广播模式,有的点对点模式。这些是这么来的呢。对了正是我们本节介绍的内容--交换机。
顾名思义就是【直连交换机】。要求发送的消息与一个特定的路由键完全的匹配。如下代码中我们queue1的队列和directExchange交换机通过DIRECTION绑定到了一起。那么我们发送消息是必须将消息标记为DIRECTION
@Bean public Binding binding1(){ return BindingBuilder.bind(queue1()).to(directExchange()).with(DIRECTIONKEY); }
public void directionSend() { rabbitTemplate.convertAndSend(RabbitConfig.DIRECTEXCHANGE, "directionKey", msg); }
fanout中文翻译为【输出】,在这里我们理解成广播。【广播交换机】
广播的意思就是所有人都能接受到,在这里需要重新定义为只要绑定在次交换机上的所有队列都能接收到。不想其他交换机需要key来进行暗号匹配。Fanout Exchange就是我们常用的广播模式,只要订阅就会收到消息。像一些房产中介就是这种模式,只要你留下手机号码楼盘动态就会实时的推送在你的手机上。
@Bean public Binding fanoutBinding1(){ return BindingBuilder.bind(queue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2(){ return BindingBuilder.bind(queue2()).to(fanoutExchange()); } @Bean public Binding fanoutBinding3(){ return BindingBuilder.bind(queue3()).to(fanoutExchange()); } @Bean public Binding fanoutBinding4(){ return BindingBuilder.bind(queue4()).to(fanoutExchange()); }
下面代码就是只要有消息发送到Fanout Exchange上所有的队列就会收到消息
public void fanoutSend1() { rabbitTemplate.convertAndSend(RabbitConfig.FANOUTEXCHANGE, RabbitConfig.QUEUETHIRD, msg); }
将路由键和某种模式进行匹配,这里可以理解成Direction Exchange里的key可以通过某种正则匹配
# : 表示一个或多个词,
* : 表示一个词
zxh.# : 能够匹配 zxh ; zxh. ; zxh.abc ; zxh.ab.sd.s.a
zxh.* : 能够匹配 zxh.test ; zxh.demo
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
//请求数据中必须符合headerValues中任意一个参数 @Bean public Binding headBinding1(){ Map<String,Object> headerValues = new HashMap<>(); headerValues.put("type", "cash"); headerValues.put("aging", "fast"); return BindingBuilder.bind(queue1()).to(headersExchange()).whereAll(headerValues).match(); }
上面四中交换机介绍完了。我们这里还有一种叫做死信队列
还是美团外卖为列, 我们平时美团点单完之后,但是不确定是不是现在就要商家制作。这个时候我们就不会选择下单。美团就会提示我们30分钟之内完成付款,负责失效。想想也对美团不可能一直在哪里等着你下单。你这样会占用人家内存的。
@Bean public Queue deadLetterQueue(){ Map<String, Object> arguments = new HashMap<String, Object>(); //设置此死信队列消息过期后,消息转发到那个队列上 arguments.put("x-dead-letter-exchange", DIRECTEXCHANGE); arguments.put("x-dead-letter-routing-key", DIRECTIONKEY); return new Queue(QUEUEDEAD, true, false, false,arguments); }
@Bean public Binding deadBinding(){ return BindingBuilder.bind(deadLetterQueue()).to(directExchange()).with(QUEUEDEAD); }
public void deadSend() { MessagePostProcessor processor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("60000"); return message; } }; log.info("延时消息开始发送:"); rabbitTemplate.convertAndSend(RabbitConfig.DIRECTEXCHANGE, RabbitConfig.QUEUEDEAD, msg,processor); }
通过上述代码我们就能实现发送一个消息,会出现延时的效果。这就是我们平时30分钟支付的效果代码。
这篇文章比较长,读者还是需要耐心的阅读的。干货多多。
加入战队