Soongp 2020-06-07
MQ是消息队列
,也叫做消息中间件
,通过生产者与消费者模型,生产者不断的向消息队列发送消息,消费者不断的从消息队列获取消息,因为发送消息和获取消息都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,可以轻松实现解耦。
MQ的实现有很多种,比如Kafka、ActiveMQ、RabbitMQ、RocketMQ等
# 1.ActiveMQ ActiveMQ是比较老的MQ,API丰富,但是性能没有其他几个MQ好,适合小型企业 # 2.Kafka Kafka追求高吞吐量和性能,但是不支持事务,以及对消息的重复和丢失没有严格要求,一开始主要用于日志收集和传输,适合大量数据收集 # 3.RocketMQ RocketMQ是阿里的,纯java开发,具有高可用、高吞吐、适合大规模系统,思路起源于Kafka,但是对消息的可靠性以及事务做了优化,但是开源版的RocketMQ并没有支持事务 # 4.RabbitMQ RabbitMQ使用Erlang语言开发,基于AMQP协议实现,AMQP主要特征是面向消息、队列、可靠性、安全。对数据的一致性、稳定性、可靠性要求很高,对性能和吞吐量其次,目前也是最欢迎的,而且可以和Spring框架无缝整合
RabbitMQ是基于
AMQP
协议实现的,采用Erlang语言开发,是最受广泛的消息中间件之一
AMQP
:AMQP是一个提供统一消息服务的应用层消息队列协议,可以说是为面向消息的中间件设计的,基于该协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
需要先关闭防火墙或者开放5672和15672端口,5672是rabbitmq的服务端口,15673是rabbitmq的管理页面端口
使用rpm包安装
# 1.下载rabbitMQ依赖包 yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz # 2.下载安装包 wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm # 3.安装 rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm # 4.修改配置文件 cp /usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config # 将rabbitmq.config的 61行改为 {loopback_users, []} vim rabbitmq.config # 5.启动/停止/重启服务 /etc/init.d/rabbitmq-server start /etc/init.d/rabbitmq-server stop /etc/init.d/rabbitmq-server restart /etc/init.d/rabbitmq-server status # 6.安装rabbitMQ管理页面,需要先启动服务再安装,安装后重启服务 rabbitmq-plugins enable rabbitmq_management # 7.访问localhost:15672,使用guest,guest登录
使用Docker安装
# 1.下载rabbitMQ,3.7版本 docker pull rabbitmq:3.7-management # 2.运行 docker run -d --hostname rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management # 3.访问localhost:15672,使用guest,guest登录
先看以下使用RabbitMQ的流程,下图只是演示了RabbleMQ的一种模型,我们也可以不通过exchanges交换机,生产者可以直接生产消息到Quene队列中
对上图解析:
建立连接
:生产者与RabbitMQ服务连接,每一个生产者对应一个虚拟主机,每个虚拟主机对应一个项目,这样多个应用操作RabbitMQ服务的时候就不互受影响;每个虚拟机对应一个特定的用户生产消息
:通过通道生产消息到虚拟主机中消费消息
:在写代码之前首先需要在web管理页面创建虚拟机和添加访问虚拟主机的用户
1.创建虚拟主机
2.创建用户
3.给用户添加虚拟主机权限
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.2</version> </dependency>
对上图解析:
public class Provider { private static final String QUEUE_NAME = "hello"; @Test public void provide() throws IOException, TimeoutException { //1.创建连接工厂对象 ConnectionFactory factory = new ConnectionFactory(); //连接rabbitmq服务 ip factory.setHost("112.126.82.110"); //连接rabbitmq服务 端口号 factory.setPort(5672); //连接哪个虚拟主机 factory.setVirtualHost("/ems"); //连接虚拟主机的用户名和密码 factory.setUsername("ems"); factory.setPassword("ems"); //2.获取连接对象 Connection connection = factory.newConnection(); //3.获取连接通道,通道中存的是消息,要与队列绑定,因为它要知道往哪个消息队列发送 Channel channel = connection.createChannel(); //通道与队列建立连接 //参数1:队列名称,如果队列不存在会自动创建 //参数2:持久化队列,如果为true,即使重启也会存在 //参数3:独占队列,如果为true,那么该连接不允许有其他队列 //参数4:自动删除,如果为true,服务器将在不使用它时删除 //参数5:其他的参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.发送消息 //参数1:交换机名称 //参数2:队列名称 //参数3:传递消息额外设置 //参数4:消息的具体内容,字节数组 channel.basicPublish("", QUEUE_NAME, null, "hello rabbitmq2".getBytes()); //5.关闭连接和通道 channel.close(); connection.close(); } }
一共分为五步: