落羽成舟 2019-06-27
在分布式系统中消息通信技术主要包括以下几种:
JMS
(Java Message Service)、AMQP
(Advanced Message Queuing Protocol)和STOMP
(Streaming Text Oriented Messaging Protocol)。其中JMS是Java平台上的面向接口的消息规范,是一套API标准,并有考虑异构系统。AMQP是一个面向协议的,跟语言平台无关的消息传递应用层协议规范。STOMP是流文本定向消息协议,是一种为MOM设计的简单文本协议,在使用websocket通信时可以使用该协议来中继消息中间件功能。AMQP和STOMP都是跟http处于同一层的协议,在 AMQP 模型中,消息的 producer 将 Message 发送给 Exchange,Exchange 负责交换/路由,将消息正确地转发给相应的 Queue。消息的 Consumer 从 Queue 中读取消息。这些通信方式中MOM消息中间件一般是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ
,RabbitMQ
,ZeroMQ
,Kafka
,RocketMQ
等,本文介绍其中的一种ActiveMQ
消息中间件来展开消息中间件的使用过程。
ActiveMQ
是一种开源的,实现了JMS规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。它可以部署于代理模式和P2P模式。完全支持JMS1.1和J2EE 1.4规范。跨平台的,多种语言和协议编写客户端,Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire, Stomp REST, WS Notification, XMPP, AMQP。如需配置ActiveMQ则需要在目标机器上安装Java环境。支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,多个消息也可以组成原子事务。
授权协议: Apache
开发语言: Java
操作系统: 跨平台
要理解这个问题得从一个实际项目业务说起,举个例子。
业务场景:某财务系统A需要给某甲方公司开发合并报表业务,其中A系统的合并报表业务的基础是需要凭证作为基准源数据(A系统不生产凭证,只是凭证的搬运工)。但是甲方公司历史已存在一套凭证系统B专门做凭证业务(B系统历史凭证数量大概1000万,同时每天会产生2万数量的新凭证),外加合并报表业务处理的实时性不是非常严格,一般一个月出一次报表即可。因此就衍生出如下业务需求。
业务需求描述:B系统需要把历史存在的凭证数据和每天新产生的凭证都需要推送给A系统,然后A系统根据传过来的凭证做凭证的后续合并报表业务处理。
系统方案设计:为了解决该业务需求,A系统按约定开发了一套基于WebService
技术的接口服务来专门负责接收B系统的凭证数据,A系统接收凭证后需要做凭证数据合法性校验、重复数据校验、保存等及一系列业务处理。
初步处理方案:A系统针对每个B系统传来的接口请求线程的处理都包括凭证校验、保存等处理都采用同步通信处理完后即时反馈处理结果给B系统的。这种方式的处理造成单个请求线程处理耗时较长,平均算下来基本1条数据的处理大致得花费1秒时间。而这种处理方式生产过程中存在非常大的性能问题,在B系统海量数据推送的情况下,按1秒1条的话,B系统当天推送的凭证A系统当天来不及处理完,这样就会堆积在第二天继续处理,但是B系统第二天又会传新的一批大量凭证,这样就导致A系统处理的还是第一天B系统传输的凭证数据,第二天的数据一直压着消费不了,时间一久系统的业务可用性大大降低。所以该方案需要废弃优化。
优化处理方案:B系统每个接口请求线程同步推送值A接口服务中,A系统接收后即时将消息转发至消息中间件ActiveMQ的队列中然后结束本次接口调用。系统A在内部单独提供消息中间件的多个消费者来异步消费中间件队列里面的凭证数据,每消费一条消息就校验保存操作,其中不满足业务的记录将错误系统通过与B系统约定的另一个接口将处理成功、处理错误信息反馈结果推送到B接口服务中,保存后的数据再通过计划任务或者手工触发来异步做合并业务处理,这样再经过一系列细节优化后,最终实现了业务处理的高可用性。
通过上述例子我们可以看出消息中间件在处理业务起到了很大的作用,其作用取决于其拥有的特点,对于传统的通信一般分为同步通信(比如:消息中间件)和异步通信(比如:RPC)。对于如今的分布式系统,消息队列已经演变为独立的消息中间件产品,相比于RPC同步通信的方式来说有几个明显的优势:
但是异步通信也存在程序设计和编程方面的复杂,同时对于实时性要求较高的业务也不能采用异步通信,所以要根据业务具体分析。
消息中间件ActiveMQ的JMS支持两种消息传送模型:点对点消息通信模型和发布订阅模型。
Queue
模式,特定的一条消息只能被一个消费者消费。生产者将消息发送到指定的Queue当中,Broker(中间件)针对消息是否需要持久化进行持久化存储后通知消费者进行处理,消费者处理完毕后发送一个回执(Acknowledge)给Broker,Broker认为该消息已被正常消费,于是从持久化存储中删除该条消息,回执的发送逻辑内嵌在MQ的API中,无需主动调用。消费者通常可以通过两种方式获取新消息:Push和Pull。Push方式:由ActiveMQ收到消息后主动调用消费者的新消息通知接口,需要消耗ActiveMQ宝贵的线程资源,同时消费者只能被动等待消息通知。Pull方式:由消费者轮询调用 ActiveMQ API 去获取消息,不消耗ActiveMQ 线程,消费者更加主动,虽然消费者的处理逻辑变得稍稍复杂。两种方式的根本区别在于线程消耗问题,由于ActiveMQ 的线程资源相对客户端更加宝贵,Push方式会占用ActiveMQ 过多的线程从而难以适应高并发的消息场景。同时当某一消费者离线一段时间再次上线后,大量积压消息处理会消耗大量ActiveMQ 线程从而拖累其它消费者的消息处理,所以Pull方式相对来说更好(Kafka消息中间件已经抛弃了PUSH模式,全面拥抱PULL模式)。两种模式选择可在具体业务场景下选择合适的模式来开发,业务要求实时性非常高的不建议使用消息中间件,一切以实际业务场景出发,避免乱使用造成数据紊乱引发骚动哈哈。
ActiveMQ依赖于Java,所以需要先安装JDK,然后在安装ActiveMQ消息中间件,然后编写代码实现。
官网地址:http://www.oracle.com/technet...
将下载的jdk-10_linux-x64_bin.tar.gz
文件拷贝到/usr/local/src
目录.
解压文件到/usr/local/bin
,先进入解压目录下cd /usr/local/bin
,然后使用命令 tar -zxvf /usr/local/src/jdk-10_linux-x64_bin.tar.gz
执行解压。
配置JAVA_HOME环境变量。
使用命令:vim /etc/profile
编辑配置文件,在vim中插入数据按键盘上的i或者insert,然后添加如下JAVA_HOME内容:
export JAVA_HOME=/usr/local/bin/jdk-10 export CLASSPATH=$JAVA_HOME/lib/ export PATH=$PATH:$JAVA_HOME/bin export PATH JAVA_HOME CLASSPATH
按esc退出insert模式,再按:输入wq,保存并且退出文件编辑。
重启服务器或者执行配置立即生效命令,刷新配置文件生效命令(我用的这个方法):source /etc/profile
,或者执行重启服务器命令:sudo shutdown -r now
,然后执行java -version
验证安装是否成功。
出现版本号即表示安装成功,JDK安装成功后接下来安装ActiveMQ。
官网地址: http://activemq.apache.org/
将下载的apache-activemq-5.15.3-bin.tar.gz
文件拷贝到/usr/local/src
目录。
解压文件到/usr/local/bin
,先进入解压目录下cd /usr/local/bin
,然后使用命令 tar -zxvf /usr/local/src/apache-activemq-5.15.3-bin.tar.gz
执行解压。
然后进入cd /usr/local/bin/apache-activemq-5.15.3/bin
目录下执行使用后端启动模式命令: ./activemq start
运行activemq,执行后使用命令 ps -ef | grep -i activemq
查看进程。
启动后可以看到activemq默认使用了8161
(监控平台)和61616
(tcp通信服务)端口。
默认端口配置可以自己修改,修改ActiveMQ的tcp通信服务端口,修改服务地址和端口:打开conf/activemq.xml
文件,找到如下部分,修改红色部分即可:
修改监控平台地址和端口:打开conf/jetty.xml
文件,找到如下部分,修改红色部分即可:
进入目录cd /etc/sysconfig
执行命令 vim iptables
开启8161, 61616防火墙端口。
然后刷新服务。
由于我使用的是阿里云的服务器部署的,所以需要登陆ECS云服务器上配置安全组策略配置8161, 61616入口规则才能在外网访问。
配置完后使用浏览器然后访问http://47.93.63.64:8161/admin/进入管理平台,默认登陆用户名和密码都为admin。
配置了域名解析后也可以使用域名访问.
经过以上部署后准备单点服务的简单版工作已做完,下面编写代码实现消息的生产和消费功能。
原始的 ActiveMQ API 处理流程图:
由于Spring Boot提供了大量的自动配置和注解功能,已经把这部分代码封装好了,所以开发起来很方便。
Gradle构建工具依赖添加:
// 集成Active MQ消息中间件 compile group: 'org.springframework.boot', name: 'spring-boot-starter-activemq', version: '2.0.0.RELEASE'
application.properties
属性配置##################################---Active MQ消息中间件---############################################## spring.activemq.broker-url=tcp://www.javalsj.com:61616 spring.activemq.user=admin spring.activemq.password=admin
ActiveMQConfiguration
配置类代码:package com.javalsj.blog.activemq; import javax.jms.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.config.SimpleJmsListenerContainerFactory; /** * @description ActiveMQ消息队列配置类 * @author WANGJIHONG * @date 2018年3月25日 下午10:52:26 * @Copyright 版权所有 (c) www.javalsj.com * @memo 无备注说明 */ @Configuration public class ActiveMQConfiguration { /** * 在Queue模式中,对消息的监听需要对containerFactory进行配置 */ @Bean(ActiveMQQueueConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY) public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } /** * 在Topic模式中,对消息的监听需要对containerFactory进行配置 */ @Bean(ActiveMQTopicConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY) public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } }
ActiveMQQueueConst
队列常量类代码:package com.javalsj.blog.activemq; /** * @description ActiveMQ队列常量 * @author WANGJIHONG * @date 2018年3月25日 下午10:59:47 * @Copyright 版权所有 (c) www.javalsj.com * @memo 无备注说明 */ public class ActiveMQQueueConst { /** * 在Queue模式中,对消息的监听需要对containerFactory进行配置,工厂标识 */ public static final String BEAN_NAME_JMSLISTENERCONTAINERFACTORY = "queueJmsListenerContainerFactory"; /** * 队列消息标识_WebSocket的Java老司机聊天室 */ public static final String QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ = "queue.websocket.chatroom.javalsj"; }
ActiveMQQueueProducer
队列生产者代码:package com.javalsj.blog.activemq; import javax.jms.Destination; import org.apache.activemq.command.ActiveMQQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; /** * @description ActiveMQ消息生产者 * @author WANGJIHONG * @date 2018年3月25日 下午10:57:54 * @Copyright 版权所有 (c) www.javalsj.com * @memo 无备注说明 */ @Component public class ActiveMQQueueProducer { private final static Logger logger = LoggerFactory.getLogger(ActiveMQQueueProducer.class); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 发送队列消息 * @param destinationName 消息目的地标识 * @param message 消息文本 */ public void sendMsg(String destinationName, String message) { logger.info("发布了一条队列{}消息{}。", destinationName, message); Destination destination = new ActiveMQQueue(destinationName); jmsMessagingTemplate.convertAndSend(destination, message); } }
ActiveMQQueueConsumer
队列消费者代码:package com.javalsj.blog.activemq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; /** * @description ActiveMQ队列消息消费者 * @author WANGJIHONG * @date 2018年3月25日 下午10:59:10 * @Copyright 版权所有 (c) www.javalsj.com * @memo 无备注说明 */ @Component public class ActiveMQQueueConsumer { private final static Logger logger = LoggerFactory.getLogger(ActiveMQQueueConsumer.class); /** * WebSocket的Java老司机聊天室队列消息消费者 */ @JmsListener(destination = ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ, containerFactory = ActiveMQQueueConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY) public void receiveQueueWebSocketJavalsjChatroomMsg(String message) { logger.info("消费了一条队列{}消息{}。", ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ, message); } }
ActiveMQTopicConst
主题消息常量类package com.javalsj.blog.activemq; /** * @description ActiveMQ主题常量 * @author WANGJIHONG * @date 2018年3月25日 下午11:24:09 * @Copyright 版权所有 (c) www.javalsj.com * @memo 无备注说明 */ public class ActiveMQTopicConst { /** * 在Topic模式中,对消息的监听需要对containerFactory进行配置,工厂标识 */ public static final String BEAN_NAME_JMSLISTENERCONTAINERFACTORY = "topicJmsListenerContainerFactory"; /** * 主题消息标识_WebSocket的系统公告 */ public static final String TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE = "topic.websocket.system.notice"; }
ActiveMQTopicPublisher
主题消息发布者代码:package com.javalsj.blog.activemq; import javax.jms.Destination; import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; /** * @description ActiveMQ主题消息发布者 * @author WANGJIHONG * @date 2018年3月25日 下午11:19:45 * @Copyright 版权所有 (c) www.javalsj.com * @memo 无备注说明 */ @Component public class ActiveMQTopicPublisher { private final static Logger logger = LoggerFactory.getLogger(ActiveMQTopicPublisher.class); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 发布主题消息 */ public void publishMsg(String destinationName, String message) { logger.info("发布了一条主题{}消息{}。", destinationName, message); Destination destination = new ActiveMQTopic(destinationName); jmsMessagingTemplate.convertAndSend(destination, message); } }
ActiveMQTopicSubscriber
主题消息订阅者代码:package com.javalsj.blog.activemq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; /** * @description ActiveMQ主题消息订阅者 * @author WANGJIHONG * @date 2018年3月25日 下午11:22:50 * @Copyright 版权所有 (c) www.javalsj.com * @memo 无备注说明 */ @Component public class ActiveMQTopicSubscriber { private final static Logger logger = LoggerFactory.getLogger(ActiveMQTopicSubscriber.class); @JmsListener(destination = ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE, containerFactory = ActiveMQTopicConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY) public void subscribeTopicWebsocketSystemNoticeMsg(String message) { logger.info("消费了一条主题{}消息{}。", ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE, message); } }
ActiveMQTest
测试类package com.javalsj.blog.activemq; import java.time.Instant; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * @description 测试 * @author WANGJIHONG * @date 2018年3月25日 下午11:41:03 * @Copyright 版权所有 (c) www.javalsj.com * @memo 无备注说明 */ @Component @EnableScheduling public class ActiveMQTest { @Autowired private ActiveMQQueueProducer activeMQQueueProducer; @Autowired private ActiveMQTopicPublisher activeMQTopicPublisher; @Scheduled(fixedRate = 10000, initialDelay = 3000) public void test() { activeMQQueueProducer.sendMsg(ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ, "队列message" + Instant.now().toString()); activeMQTopicPublisher.publishMsg(ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE, "主题message" + Instant.now().toString()); } }
重启服务验证功能使用情况,至此简单的ActiveMQ实例demo已完成。
本文只是简单的介绍了下Apache ActiveMQ消息中间件简单使用,实际生产环境开发比这要复杂些,比如为了高可用性一般都需要把 ActiveMQ 部署为集群,其中消费过程也需要做业务上的去重等等一系列细节处理,感兴趣的可以网上查查资料了解下高可用消息中间件的部署和使用。也可以了解下其他类型的消息中间件产品,每种都有合适的适用场景。写了两个小时,困了,帮我关下灯,嘿嘿。