1、消息中间件
1、通讯方式
1、点对点
2、发布订阅
2、JMS
1、jms
JMS是java消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
2、消费模型
1、点对点
1、模型:
生产者---消息队列---消费者
2、发布订阅
1、模型:
生产者---主题---消费者
3、ActiveMQ使用
1、依赖
<dependency>
			    <groupId>org.apache.activemq</groupId>
			    <artifactId>activemq-core</artifactId>
			    <version>5.7.0</version>
		  </dependency>
2、生产者

 3、消费者

4、ActiveMQ持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT)
5、JMS可靠消息
1、ActiveMQ消息签收机制
客戶端成功接收一条消息的标志是一条消息被签收,成功应答。
2、ActiveMQ可靠消息保证
1、自动签收(拿到消息就告诉MQ签收成功)---无事务机制
2、事务消息
生产者:完成消息发送后,提交事务至队列。
消费者:获取到事务消息,如果消费者没有提交事务,默认表示没有进行消费。
3、手动签收
手动签收消息
3、代码
1、自动签收
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
2、手动签收
Session session = conn.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
消费者手动签收消息:
  msg.acknowledge();
3、事务消息
Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
生产者:
  session.commit()
消费者:
  session.commit()
6、SpringBoot整合ActiveMQ
1、依赖
<dependency>
			    <groupId>org.springframework.boot</groupId>
			    <artifactId>spring-boot-starter-activemq</artifactId>
		  </dependency>
2、生产者
1、配置文件
spring:
      activemq:
          broker-url: tcp://192.168.52.128:61616
          user: admin
          password: admin
  queue: springboot-queue
  server:
      port: 8080
2、配置类
| @Configurationpublic class QueueConfig {
 @Value("${queue}")
 private String queue;
 @Beanpublic Queue logQueue() {
 return new ActiveMQQueue(queue);
 }
 
 @Bean
 public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) {
 JmsTemplate jmsTemplate = new JmsTemplate();
 jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);// 进行持久化配置 1表示非持久化,2表示持久化</span>
 jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
 jmsTemplate.setDefaultDestination(queue); // 此处可不设置默认,在发送消息时也可设置队列
 jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);// 客户端签收模式
 return jmsTemplate;
 }
 // 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂@Bean(name = "jmsQueueListener")
 public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(
 ActiveMQConnectionFactory activeMQConnectionFactory) {
 DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
 factory.setConnectionFactory(activeMQConnectionFactory);
 // 设置连接数
 factory.setConcurrency("1-10");
 // 重连间隔时间
 factory.setRecoveryInterval(1000L);
 factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
 return factory;
 }
 }
 | 
3、生产者类
| @Component@EnableScheduling
 public class Producer {
 @Autowired
 private JmsMessagingTemplate jmsMessagingTemplate;
 
 @Autowired
 private Queue queue;
 
 @Scheduled(fixedDelay = 5000)
 public void send() {
 jmsMessagingTemplate.convertAndSend(queue, "测试消息队列" + System.currentTimeMillis());
 }
 } | 
3、消费者
| @Componentpublic class Consumer {
 
 @JmsListener(destination="${queue}")
 public void receive(String msg) {
 System.out.println("接收到消息:" + msg);
 }
 }
 | 
7、ActiveMQ注意事项
1、消费者抛出异常,默认会自动重试
2、若消费者代码问题,记录日志及报文信息,采用人工补偿,不进行ActiveMQ自动重试
3、消费者幂等性问题:使用全局ID区分消息