1、消息中间件
1、通讯方式
1、点对点
2、发布订阅
2、JMS
1、jms
JMS是java消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
2、消费模型
1、点对点
1、模型:
生产者---消息队列---消费者
2、发布订阅
1、模型:
生产者---主题---消费者
3、ActiveMQ使用
1、依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
2、生产者
3、消费者
4、ActiveMQ持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT)
5、JMS可靠消息
1、ActiveMQ消息签收机制
客戶端成功接收一条消息的标志是一条消息被签收,成功应答。
2、ActiveMQ可靠消息保证
1、自动签收(拿到消息就告诉MQ签收成功)---无事务机制
2、事务消息
生产者:完成消息发送后,提交事务至队列。
消费者:获取到事务消息,如果消费者没有提交事务,默认表示没有进行消费。
3、手动签收
手动签收消息
3、代码
1、自动签收
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
2、手动签收
Session session = conn.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
消费者手动签收消息:
msg.acknowledge();
3、事务消息
Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
生产者:
session.commit()
消费者:
session.commit()
6、SpringBoot整合ActiveMQ
1、依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2、生产者
1、配置文件
spring:
activemq:
broker-url: tcp://192.168.52.128:61616
user: admin
password: admin
queue: springboot-queue
server:
port: 8080
2、配置类
@Configuration public class QueueConfig { @Value("${queue}") private String queue; @Bean public Queue logQueue() { return new ActiveMQQueue(queue); }
@Bean public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) { JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);// 进行持久化配置 1表示非持久化,2表示持久化</span> jmsTemplate.setConnectionFactory(activeMQConnectionFactory); jmsTemplate.setDefaultDestination(queue); // 此处可不设置默认,在发送消息时也可设置队列 jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);// 客户端签收模式 return jmsTemplate; } // 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂 @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory( ActiveMQConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(activeMQConnectionFactory); // 设置连接数 factory.setConcurrency("1-10"); // 重连间隔时间 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return factory; } }
|
3、生产者类
@Component @EnableScheduling public class Producer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired private Queue queue;
@Scheduled(fixedDelay = 5000) public void send() { jmsMessagingTemplate.convertAndSend(queue, "测试消息队列" + System.currentTimeMillis()); } } |
3、消费者
@Component public class Consumer {
@JmsListener(destination="${queue}") public void receive(String msg) { System.out.println("接收到消息:" + msg); } } |
7、ActiveMQ注意事项
1、消费者抛出异常,默认会自动重试
2、若消费者代码问题,记录日志及报文信息,采用人工补偿,不进行ActiveMQ自动重试
3、消费者幂等性问题:使用全局ID区分消息