SpringBoot整合ActiveMQ

xinglun 2020-02-16

1.生产者

1.1 导入依赖 

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- spring boot web支持:mvc,aop... -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>



1.2 创建application.yml配置文件

server:
port: 8080
#activemq配置
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin



1.3 创建生产者,通过JMSTemplate模板发送消息

@Component
public class ActiveMQProvider {
//注入JMSTemplate模板
@Resource
private JmsTemplate jmsTemplate;
//创建方法
public void sendMessage(){
//点对点,创建队列
ActiveMQQueue queue=new ActiveMQQueue("boot_queue");
//发送消息
jmsTemplate.convertAndSend(queue,"生产者产生的消息~");
}
}


1.4 创建客户端访问的方法

@RestController
public class ProController {
@Resource
private ActiveMQProvider activeMQProvider;

@RequestMapping("/sendMessage")
public String sendMessage(){
activeMQProvider.sendMessage();
return "success";
}
}

1.5 创建消费者
1.5.1 创建配置文件

server:
port: 8081
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin


1.5.2 创建消费者

//消费者消费
@JmsListener(destination = "boot_queue")
public void getMessage(TextMessage message) throws JMSException {
System.out.println("消费者获取到消息:"+message.getText());
}


2.SpringBoot发送主题
2.1 生产者需要制定当前发送的主题,需要开启

@Component
public class ActiveMQProvider {
//注入JMSTemplate模板
@Resource
private JmsTemplate jmsTemplate;
//创建方法
public void sendMessage(){
//点对点,创建队列
//ActiveMQQueue queue=new ActiveMQQueue("boot_queue");
//发布订阅:创建主题
ActiveMQTopic topic=new ActiveMQTopic("boot_topic");
//springboot默认是queue
jmsTemplate.setPubSubDomain(true);
//发送消息
jmsTemplate.convertAndSend(topic,"生产者产生主题的消息~");

}
}


2.2 消费者消费同样开启

//springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置该bean
@Bean
public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//这里必须设置为true,false则表示是queue类型
factory.setPubSubDomain(true);
return factory;
}


//消费者消费 destination队列或者主题的名字

@JmsListener(destination = "boot_topic",containerFactory = "jmsTopicListenerContainerFactory")
public void getMessage(TextMessage message) throws JMSException {

System.out.println("消费者获取到消息:"+message.getText());
}


3.SpringBoot整合ActiveMQ开启持久化
3.1.开启队列持久化

jmsTemplate.setDeliveryMode(2);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setDeliveryPersistent(true);


3.2.开启主题持久化
不会进行数据消费的,但是数据可以持久化

@Bean(name = "topicListenerFactory")
public JmsListenerContainerFactory<DefaultMessageListenerContainer> topicListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

factory.setSubscriptionDurable(true);// Set this to "true" to register a durable subscription,

factory.setClientId("B");

factory.setConnectionFactory(connectionFactory);
return factory;

}

//消费者消费 destination队列或者主题的名字
@JmsListener(destination = "boot_topic",containerFactory = "topicListenerFactory")
public void getMessage(TextMessage message,Session session) throws JMSException {
System.out.println("消费者获取到消息:"+message.getText());
}

相关推荐

xinglun / 0评论 2020-06-14