Spring Boot 入门(八):集成RabbitMQ消息队列

zhoucheng0 2019-10-23

本片文章续《Spring Boot 入门(七):集成 swagger2》,关于RabbitMQ的介绍请参考《java基础(六):RabbitMQ 入门

1.增加依赖

<!--rabbitMq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.增加conf

2 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @program:
 * @description: Rabbit相关配置
 * @author: DZ
 * @create: 2019-10-18 17:07
 **/
@Slf4j
@Configuration
public class RabbitConfig implements ConfirmCallback, ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //目前就声明了一个消息队列
    // 队列名称
31     public String queue = "queue";
    // 交换机名称
   34     public String exchang="exchange";
    // 关键字
    37     public String key="key";

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    //此主要用于检查交换机(exChange),当 ack=false,交换机可能错误
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //在发送消息的时候correlationData传入的为进件编号
        if (ack) {
            log.info("消息发送成功:correlationData = " + correlationData);
        } else {
            //如果有多个交换机,这里日志需要优化
            log.error("消息发送失败,交换机可能错误:correlationData = " + correlationData + ",exchang:" + exchang);
        }
    }

    //次方法用于检查队列(queue),当此方法执行时,队列可能错误
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        //如果有多个队列,这里日志需要优化
        log.error("消息发送失败,队列可能错误:correlationData = " + message.getMessageProperties().getCorrelationId() + ",queue:" + queue);
    }

    // 声明队列
    @Bean
    public Queue queue() {
        return new Queue(queue, true);//表示持久化
    }

    // 声明交换机,注意交换机的类别
    @Bean
    public FanoutExchange exchange() {
        return new FanoutExchange(exchang);
        //return new DirectExchange(exchang);
        //return new TopicExchange(exchang);
    }

    // 绑定交换机和队列,如果是fanout,就不需要key
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange());
        //return BindingBuilder.bind(queue()).to(exchange()).with(key);
    }
}

在实际开发过程中,mq的相关属性都配置在application.yml的配置文件中。

在绑定交换机的过程中,需要注意绑定方式以及key。

3.调用

@Autowired
    private RabbitTemplate rabbitTemplate;
  6     @RequestMapping(value = "testRabbitMQ", method = RequestMethod.POST)
    public String testRabbitMQ() {
        String msg = "{\"id\":\"123\",\"msg\":\"555555\"}";
        String id = "123456789";
        CorrelationData correlationId = new CorrelationData(id);
        log.info("开始发送消息 : correlationId= " + correlationId + ",exchange=" + exchange + ",msg= " + msg);
        Object response = rabbitTemplate.convertSendAndReceive(exchange, "", msg, correlationId);
       
        log.info("开始发送结束 : correlationId= " + correlationId);
        return "testRabbitMQ";
    }

 由于本文中交换机的绑定方式为fanout,所以不需要key,这里在发送消息的时候rabbitTemplate.convertSendAndReceive(exchange, "", msg, correlationId);key直接传入一个空字符串即可。

相关推荐