SpringCloud(4) rabbitmq使用

zhoucheng0 2020-05-10

一。rabbitmq基本知识

exchange: 交换器,接收生产者发送的消息并路由给对应的队列。三种常用的交换器类型:1.direct(发布订阅,完全匹配) 2。广播型 3.topic(主题,规则匹配)

queue: 消息队列,用来保存消息直到发送给消费者。消息一直在队列中,知道消费者链接到队列将它取走

binding: 绑定。用于消息队列和交换器之间的关联,一个绑定就是基于路由键将两者连接起来的路由规则。

 routingkey: 路由键。1.队列通过路由键和交换器绑定。2.消息带着路由键发送到交换器,交换器根据路由键发到匹配的队列

SpringCloud(4) rabbitmq使用

二。代码示例

2.1 使用amqp

(1).pom依赖

<!-- rabbitMQ的依赖。rabbitmq已经被spring-boot做了整合访问实现。
    spring cloud也对springboot做了整合逻辑。所以rabbitmq的依赖可以在spring cloud中直接使用。
 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2).消费端

//1. 只指定队列@RabbitListener(queues = "myQueue")
    //2.自动创建队列
    //@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue("myQueue"))
    //3.自动创建, exchange和queue绑定
    @RabbitListener(bindings = @QueueBinding(
                exchange = @Exchange("myOrder"),
                key = "fruit",
                value = @Queue("fruitQueue")
            ))
    public void process(String message) {
        log.info("fruitProcess receive mq message: {}", message);
    }
    
    /**
     * Producer(生产者): 将消息发送到Exchange
    Exchange(交换器):将从生产者接收到的消息路由到Queue
    Queue(队列):存放供消费者消费的消息
    BindingKey(绑定键):建立Exchange与Queue之间的关系(个人看作是一种规则,也就是Exchange将什么样的消息路由到Queue)
    RoutingKey(路由键):Producer发送消息与路由键给Exchange,Exchange将判断RoutingKey是否符合BindingKey,如何则将该消息路由到绑定的Queue
    Consumer(消费者):从Queue中获取消息
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("myOrder"),
            key = "computer",
            value = @Queue("computerQueue")
        ))
    public void computerProcess(String message) {
        log.info("computerProcess receive mq message: {}", message);
    }

(3).生产者

@SpringBootTest
public class MQSenderTest {
    
    @Autowired
    private AmqpTemplate amqpTemplate;
    
    @Test
    public void send() {
        amqpTemplate.convertAndSend("myQueue", "hello mq!!");
    }
    
    @Test
    public void send3() {
        amqpTemplate.convertAndSend("myOrder", "computer", "computerMsgsg");
    }
}

2.2 使用stream

2.2.1 生产者

(1) pom依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

(2)application.yml定义

spring:
  cloud:
    stream:
      bindings:
        #连接源定义的input和output        
        messagesSend:          #交换器名称
          destination: msg-topic
        backRec:
          destination: back-topic          ##消费者组名,这个组下只有一个queue,多台应用情况下只有一台能收到消息
          group:
            back-group
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: guest
    password: guest

(3)定义mq连接源

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface StreamClient {
    public static final String INPUT = "messagesSend"; 
    public static final String BACK = "backRec";
    
        // 发送
    @Output(INPUT)
    MessageChannel output();
    
        // 回调接收
    @Input(BACK)
    SubscribableChannel input();
}

(4)发送类, @EnableBinding(StreamClient.class)声明

@RestController
@EnableBinding(StreamClient.class)
public class SendMessageController {
    @Autowired
    StreamClient streamClient;
    
    @RequestMapping("/sendMsg")
    public void sendMsg() {
        OrderDTO order = new OrderDTO();
        order.setOrderId("123");
        streamClient.output().send(MessageBuilder.withPayload(order).build());
    }
}

(5)接收返回的mq消息

@Component
@Slf4j
@EnableBinding(StreamClient.class)
public class StreamReciever {
            // 接收对象
            @StreamListener(StreamClient.BACK) 
            public void process(Object message) {
              log.info("StreamReciever:{}", message); 
            }
}

2.2.2 消费者

(1) pom依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

(2)application.yml定义

spring:
  cloud:
    stream:
      bindings:
        messageRec:
          #交换器exchange名称
          destination: msg-topic
          ##消费者组名,这个组下只有一个queue,多台应用情况下只有一台能收到消息
          group: msg-group
        backSend:
          destination: back-topic  #我没有配置mq信息,自动用的默认配置
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: guest
    password: guest

(3)连接源

public interface StreamServer {
    public static final String INPUT = "messageRec";
    public static final String BACK = "backSend";
    @Input(INPUT)
    SubscribableChannel input();
      
    @Output(BACK)
    MessageChannel output();
}

(4)接收端。并范松返回mq消息

@Component
@Slf4j
@EnableBinding(StreamServer.class)
public class StreamReciever {
       // 接收对象
        @StreamListener(StreamServer.INPUT)
        @SendTo(StreamServer.BACK)
        public String process(Object message) {
          log.info("StreamReciever:{}", message);          // 返回的消息
          return "Revieved..";
        }     
}

SpringCloud(4) rabbitmq使用

SpringCloud(4) rabbitmq使用

相关推荐