zhoucheng0 2019-10-23
本片文章续《Spring Boot 入门(七):集成 swagger2》,关于RabbitMQ的介绍请参考《java基础(六):RabbitMQ 入门》
1.增加依赖
<!--rabbitMq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.增加conf
2 import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * @program: * @description: Rabbit相关配置 * @author: DZ * @create: 2019-10-18 17:07 **/ @Slf4j @Configuration public class RabbitConfig implements ConfirmCallback, ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; //目前就声明了一个消息队列 // 队列名称 31 public String queue = "queue"; // 交换机名称 34 public String exchang="exchange"; // 关键字 37 public String key="key"; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } //此主要用于检查交换机(exChange),当 ack=false,交换机可能错误 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //在发送消息的时候correlationData传入的为进件编号 if (ack) { log.info("消息发送成功:correlationData = " + correlationData); } else { //如果有多个交换机,这里日志需要优化 log.error("消息发送失败,交换机可能错误:correlationData = " + correlationData + ",exchang:" + exchang); } } //次方法用于检查队列(queue),当此方法执行时,队列可能错误 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //如果有多个队列,这里日志需要优化 log.error("消息发送失败,队列可能错误:correlationData = " + message.getMessageProperties().getCorrelationId() + ",queue:" + queue); } // 声明队列 @Bean public Queue queue() { return new Queue(queue, true);//表示持久化 } // 声明交换机,注意交换机的类别 @Bean public FanoutExchange exchange() { return new FanoutExchange(exchang); //return new DirectExchange(exchang); //return new TopicExchange(exchang); } // 绑定交换机和队列,如果是fanout,就不需要key @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()); //return BindingBuilder.bind(queue()).to(exchange()).with(key); } }
在实际开发过程中,mq的相关属性都配置在application.yml的配置文件中。
在绑定交换机的过程中,需要注意绑定方式以及key。
3.调用
@Autowired private RabbitTemplate rabbitTemplate; 6 @RequestMapping(value = "testRabbitMQ", method = RequestMethod.POST) public String testRabbitMQ() { String msg = "{\"id\":\"123\",\"msg\":\"555555\"}"; String id = "123456789"; CorrelationData correlationId = new CorrelationData(id); log.info("开始发送消息 : correlationId= " + correlationId + ",exchange=" + exchange + ",msg= " + msg); Object response = rabbitTemplate.convertSendAndReceive(exchange, "", msg, correlationId); log.info("开始发送结束 : correlationId= " + correlationId); return "testRabbitMQ"; }
由于本文中交换机的绑定方式为fanout,所以不需要key,这里在发送消息的时候rabbitTemplate.convertSendAndReceive(exchange, "", msg, correlationId);key直接传入一个空字符串即可。