zhoucheng0 2020-05-10
一。rabbitmq基本知识
exchange: 交换器,接收生产者发送的消息并路由给对应的队列。三种常用的交换器类型:1.direct(发布订阅,完全匹配) 2。广播型 3.topic(主题,规则匹配)
queue: 消息队列,用来保存消息直到发送给消费者。消息一直在队列中,知道消费者链接到队列将它取走
binding: 绑定。用于消息队列和交换器之间的关联,一个绑定就是基于路由键将两者连接起来的路由规则。
routingkey: 路由键。1.队列通过路由键和交换器绑定。2.消息带着路由键发送到交换器,交换器根据路由键发到匹配的队列
二。代码示例
2.1 使用amqp
(1).pom依赖
<!-- rabbitMQ的依赖。rabbitmq已经被spring-boot做了整合访问实现。 spring cloud也对springboot做了整合逻辑。所以rabbitmq的依赖可以在spring cloud中直接使用。 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
(2).消费端
//1. 只指定队列@RabbitListener(queues = "myQueue") //2.自动创建队列 //@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue("myQueue")) //3.自动创建, exchange和queue绑定 @RabbitListener(bindings = @QueueBinding( exchange = @Exchange("myOrder"), key = "fruit", value = @Queue("fruitQueue") )) public void process(String message) { log.info("fruitProcess receive mq message: {}", message); } /** * Producer(生产者): 将消息发送到Exchange Exchange(交换器):将从生产者接收到的消息路由到Queue Queue(队列):存放供消费者消费的消息 BindingKey(绑定键):建立Exchange与Queue之间的关系(个人看作是一种规则,也就是Exchange将什么样的消息路由到Queue) RoutingKey(路由键):Producer发送消息与路由键给Exchange,Exchange将判断RoutingKey是否符合BindingKey,如何则将该消息路由到绑定的Queue Consumer(消费者):从Queue中获取消息 */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange("myOrder"), key = "computer", value = @Queue("computerQueue") )) public void computerProcess(String message) { log.info("computerProcess receive mq message: {}", message); }
(3).生产者
@SpringBootTest public class MQSenderTest { @Autowired private AmqpTemplate amqpTemplate; @Test public void send() { amqpTemplate.convertAndSend("myQueue", "hello mq!!"); } @Test public void send3() { amqpTemplate.convertAndSend("myOrder", "computer", "computerMsgsg"); } }
2.2 使用stream
2.2.1 生产者
(1) pom依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
(2)application.yml定义
spring: cloud: stream: bindings: #连接源定义的input和output messagesSend: #交换器名称 destination: msg-topic backRec: destination: back-topic ##消费者组名,这个组下只有一个queue,多台应用情况下只有一台能收到消息 group: back-group rabbitmq: addresses: 127.0.0.1:5672 username: guest password: guest
(3)定义mq连接源
import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface StreamClient { public static final String INPUT = "messagesSend"; public static final String BACK = "backRec"; // 发送 @Output(INPUT) MessageChannel output(); // 回调接收 @Input(BACK) SubscribableChannel input(); }
(4)发送类, @EnableBinding(StreamClient.class)声明
@RestController @EnableBinding(StreamClient.class) public class SendMessageController { @Autowired StreamClient streamClient; @RequestMapping("/sendMsg") public void sendMsg() { OrderDTO order = new OrderDTO(); order.setOrderId("123"); streamClient.output().send(MessageBuilder.withPayload(order).build()); } }
(5)接收返回的mq消息
@Component @Slf4j @EnableBinding(StreamClient.class) public class StreamReciever { // 接收对象 @StreamListener(StreamClient.BACK) public void process(Object message) { log.info("StreamReciever:{}", message); } }
2.2.2 消费者
(1) pom依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
(2)application.yml定义
spring: cloud: stream: bindings: messageRec: #交换器exchange名称 destination: msg-topic ##消费者组名,这个组下只有一个queue,多台应用情况下只有一台能收到消息 group: msg-group backSend: destination: back-topic #我没有配置mq信息,自动用的默认配置 rabbitmq: addresses: 127.0.0.1:5672 username: guest password: guest
(3)连接源
public interface StreamServer { public static final String INPUT = "messageRec"; public static final String BACK = "backSend"; @Input(INPUT) SubscribableChannel input(); @Output(BACK) MessageChannel output(); }
(4)接收端。并范松返回mq消息
@Component @Slf4j @EnableBinding(StreamServer.class) public class StreamReciever { // 接收对象 @StreamListener(StreamServer.INPUT) @SendTo(StreamServer.BACK) public String process(Object message) { log.info("StreamReciever:{}", message); // 返回的消息 return "Revieved.."; } }