JMS与ActiveMQ消息数据持久化

胡献根 2019-10-21

认识JMS
JMS是Java消息服务(Java message service),是java平台关于面向消息中间件(MOM)的API。用于两个应用程序或分布式系统应用中,消息发送,消息异步处理。即应用程序间通信通过JMS服务,进行消息转发。应用程序间消息异步处理可解除应用程序的耦合。
JMS有消息异步处理、程序间解耦、消息可靠的优势。

JMS消息模型
Queue点对点(Point to Point
Queue队列是点对点消费,发送者发送一条消息,只有唯一的一个消费者能对消息进行消费。消息生产者都将消息发送到消息的队列(Queue)中。队列的消息可以是持久的,保证消息服务出现故障仍然能够传递消息。
特点:
1).每个消息只有一个消费者(Customer),消息一旦被消费,消息就不在队列(Queue)了。
2).消息发送者和接收者没有时间依赖,消息发送者只管消息发送,不管消息的消费者是否有接收消息。
3).消息被接收到消息后,会发送消息确认(ACK)通知给消息队列(Queue)。
Queue模型图:
JMS与ActiveMQ消息数据持久化
发布/订阅(Publish/Subscribe)
消息发布者发布消息,消息通过主题(Topic)传递给所有接收者,消息发布者和订阅者彼此不相干。主题(Topic)主要用于保存和传递消息。
发布/订阅模型中,应用程序有Topic、发布者(Publish)、订阅者(Subscribe)组成。
特点:
1).每个消息可有多个消费订阅者。
2).发布者和订阅者无时间依赖性。某个主题(Topic)的订阅者,必须先创建一个订阅者后才能消费发布者的消息,且为消费消息,订阅者必须保持运行状态。
3).可持久化订阅。
Topic模型图:
JMS与ActiveMQ消息数据持久化

ActiveMQ消息数据持久化
为了避免机器意外宕机,消息丢失。可进行对消息持久化操作,机器重启后恢复消息队列。ActiveMQ消息持久化机制有JDBC、AMQ、KahaDB和LevelDB。消息发送者将消息发送出去后,消息中心将消息数据存储到本地文件、本地内存或者数据库等。消息持久化操作后,消息中心启动后会先检查指定存储位置,如有未发送成功的消息,则继续将消息发送出去。
1).JDBC持久化方式
此持久化方式会在数据库创建3个表:activemq_msgs、activemq_acks、activemq_lock。activemq_msgs用于存储消息,Queue和Topic都存储于这个表中。
配置方式:
修改安装目录下的conf/activemq.xml文件。persistenceAdapter节点中配置。
</persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> 
</persistenceAdapter>

2).AMQ方式
性能高于JDBC,消息会按顺序追加方式写入日志文件中,性能较高。为了提升性能,会创建消息主键索引。缺点是索引文件很大,需占用大量磁盘空间。如果broker崩溃,重建索引速度非常慢。每个日志文件大小有限定(默认32M)。超过此大小,会重新建立一文件。当所有消息消费完成,系统删除这个文件或进行规定(取决于配置)。
配置:

</persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

索引重建时间长,占用磁盘空间大,此方式不推荐。
3).KafaDB方式
KafaDB持久化是ActiveMQ默认的持久化方式。KafaDB持久化和AMQ一样都是基于日志文件,但KafaDB方式恢复时间远少于AMQ方式,且使用更少的数据文件。优于AMQ方式持久化。
配置:

</persistenceAdapter>
<kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>
</persistenceAdapter>

directory:指定消息持久化的存储目录。
journalMaxFileLength:指定保存消息日志文件大小。
4).LevelDB方式
ActiveMQ5.6版本后推出的LevelDB方式持久化。不过LevelDB方式性能要高于KafaDB,后面很可能是这个趋势。
配置:

</persistenceAdapter>
<replicatedLevelDB directory="${activemq.data}/activemq-data" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="127.0.0.1:2181" zkSessionTimeout="2s" hostname="127.0.0.1" zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>

相关推荐

xinglun / 0评论 2020-06-14