RabbitMQ

Soongp 2020-06-07

1.MQ介绍

1.1 什么是MQ

MQ是消息队列,也叫做消息中间件,通过生产者与消费者模型,生产者不断的向消息队列发送消息,消费者不断的从消息队列获取消息,因为发送消息和获取消息都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,可以轻松实现解耦。

1.2 MQ有哪些

MQ的实现有很多种,比如Kafka、ActiveMQ、RabbitMQ、RocketMQ等

1.3 如何选择MQ

# 1.ActiveMQ
	ActiveMQ是比较老的MQ,API丰富,但是性能没有其他几个MQ好,适合小型企业
# 2.Kafka
	Kafka追求高吞吐量和性能,但是不支持事务,以及对消息的重复和丢失没有严格要求,一开始主要用于日志收集和传输,适合大量数据收集
# 3.RocketMQ
	RocketMQ是阿里的,纯java开发,具有高可用、高吞吐、适合大规模系统,思路起源于Kafka,但是对消息的可靠性以及事务做了优化,但是开源版的RocketMQ并没有支持事务
# 4.RabbitMQ
	RabbitMQ使用Erlang语言开发,基于AMQP协议实现,AMQP主要特征是面向消息、队列、可靠性、安全。对数据的一致性、稳定性、可靠性要求很高,对性能和吞吐量其次,目前也是最欢迎的,而且可以和Spring框架无缝整合

2.RabiitMQ介绍

RabbitMQ是基于AMQP协议实现的,采用Erlang语言开发,是最受广泛的消息中间件之一

AMQP:AMQP是一个提供统一消息服务的应用层消息队列协议,可以说是为面向消息的中间件设计的,基于该协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

2.1 RabbitMQ安装

需要先关闭防火墙或者开放5672和15672端口,5672是rabbitmq的服务端口,15673是rabbitmq的管理页面端口

使用rpm包安装

# 1.下载rabbitMQ依赖包
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

# 2.下载安装包
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

# 3.安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

# 4.修改配置文件
cp /usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
# 将rabbitmq.config的 61行改为 {loopback_users, []}
vim rabbitmq.config

# 5.启动/停止/重启服务
/etc/init.d/rabbitmq-server start
/etc/init.d/rabbitmq-server stop
/etc/init.d/rabbitmq-server restart
/etc/init.d/rabbitmq-server status

# 6.安装rabbitMQ管理页面,需要先启动服务再安装,安装后重启服务
rabbitmq-plugins enable rabbitmq_management

# 7.访问localhost:15672,使用guest,guest登录

使用Docker安装

# 1.下载rabbitMQ,3.7版本
docker pull rabbitmq:3.7-management

# 2.运行
docker run -d --hostname rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management

# 3.访问localhost:15672,使用guest,guest登录

2.2 RabbitMQ流程

先看以下使用RabbitMQ的流程,下图只是演示了RabbleMQ的一种模型,我们也可以不通过exchanges交换机,生产者可以直接生产消息到Quene队列中

RabbitMQ

对上图解析:

  • 建立连接:生产者与RabbitMQ服务连接,每一个生产者对应一个虚拟主机,每个虚拟主机对应一个项目,这样多个应用操作RabbitMQ服务的时候就不互受影响;每个虚拟机对应一个特定的用户
  • 生产消息:通过通道生产消息到虚拟主机中
  • 消费消息

3. Hello World!

3.1. 创建虚拟主机和用户

在写代码之前首先需要在web管理页面创建虚拟机和添加访问虚拟主机的用户

1.创建虚拟主机

RabbitMQ

2.创建用户

RabbitMQ

3.给用户添加虚拟主机权限

RabbitMQ

3.2 引入依赖

<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.7.2</version>
	</dependency>

3.3 第一种模型(直连)

RabbitMQ

对上图解析:

  • P:生产者,用于向消息队列发送消息
  • C:消费者,用于从消息队列获取消息
  • Queue:消息队列,也就是图中红色的部分,用于存储消息

1.开发生产者

public class Provider {

    private static final String QUEUE_NAME = "hello";

    @Test
    public void provide() throws IOException, TimeoutException {
        //1.创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        //连接rabbitmq服务  ip
        factory.setHost("112.126.82.110");
        //连接rabbitmq服务  端口号
        factory.setPort(5672);
        //连接哪个虚拟主机
        factory.setVirtualHost("/ems");
        //连接虚拟主机的用户名和密码
        factory.setUsername("ems");
        factory.setPassword("ems");

        //2.获取连接对象
        Connection connection = factory.newConnection();

        //3.获取连接通道,通道中存的是消息,要与队列绑定,因为它要知道往哪个消息队列发送
        Channel channel = connection.createChannel();
        //通道与队列建立连接
        //参数1:队列名称,如果队列不存在会自动创建
        //参数2:持久化队列,如果为true,即使重启也会存在
        //参数3:独占队列,如果为true,那么该连接不允许有其他队列
        //参数4:自动删除,如果为true,服务器将在不使用它时删除
        //参数5:其他的参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4.发送消息
        //参数1:交换机名称
        //参数2:队列名称
        //参数3:传递消息额外设置
        //参数4:消息的具体内容,字节数组
        channel.basicPublish("", QUEUE_NAME, null, "hello rabbitmq2".getBytes());

        //5.关闭连接和通道
        channel.close();
        connection.close();
    }
}

一共分为五步:

  • 建立连接工厂对象,设置rabbitmq服务的主机和端口
  • 获取连接对象
  • 获取连接通道
  • 通道与队列建立连接,并发送消息
  • 关闭连接和通道

2.开发消费者

相关推荐