ActiveMQ

Java高知 2019-10-20

1、消息中间件

1、通讯方式

1、点对点

2、发布订阅

2、JMS

1、jms

JMS是java消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

2、消费模型

1、点对点

1、模型:

生产者---消息队列---消费者

2、发布订阅

1、模型:

生产者---主题---消费者

3、ActiveMQ使用

1、依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
  </dependency>

2、生产者

ActiveMQ

 3、消费者

ActiveMQ

4、ActiveMQ持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT)

5、JMS可靠消息

1、ActiveMQ消息签收机制

客戶端成功接收一条消息的标志是一条消息被签收,成功应答。

2、ActiveMQ可靠消息保证

1、自动签收(拿到消息就告诉MQ签收成功)---无事务机制

2、事务消息

生产者:完成消息发送后,提交事务至队列。

消费者:获取到事务消息,如果消费者没有提交事务,默认表示没有进行消费。

3、手动签收

手动签收消息

3、代码

1、自动签收

Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

2、手动签收

Session session = conn.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

消费者手动签收消息:

  msg.acknowledge();

3、事务消息

Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

生产者:

  session.commit()

消费者:

  session.commit()

6、SpringBoot整合ActiveMQ

1、依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>

2、生产者

1、配置文件

spring:
    activemq:
      broker-url: tcp://192.168.52.128:61616
      user: admin
      password: admin
  queue: springboot-queue
  server:
    port: 8080

2、配置类

@Configuration
public class QueueConfig {
@Value("${queue}")
private String queue;

@Bean
public Queue logQueue() {
return new ActiveMQQueue(queue);
}

@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);// 进行持久化配置 1表示非持久化,2表示持久化</span>
jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
jmsTemplate.setDefaultDestination(queue); // 此处可不设置默认,在发送消息时也可设置队列
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);// 客户端签收模式
return jmsTemplate;
}

// 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(
ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
// 设置连接数
factory.setConcurrency("1-10");
// 重连间隔时间
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}


}

3、生产者类

@Component
@EnableScheduling
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired
private Queue queue;

@Scheduled(fixedDelay = 5000)
public void send() {
jmsMessagingTemplate.convertAndSend(queue, "测试消息队列" + System.currentTimeMillis());
}

}

3、消费者

@Component
public class Consumer {

@JmsListener(destination="${queue}")
public void receive(String msg) {
System.out.println("接收到消息:" + msg);
}
}

7、ActiveMQ注意事项

1、消费者抛出异常,默认会自动重试

2、若消费者代码问题,记录日志及报文信息,采用人工补偿,不进行ActiveMQ自动重试

3、消费者幂等性问题:使用全局ID区分消息

相关推荐

wuddny的blog / 0评论 2012-11-15