lishijian 2019-12-20
配置:
spring: rabbitmq: addresses: 192.168.108.128:5672 connection-timeout: 15000 username: guest password: guest publisher-confirms: true publisher-returns: true
依赖:
<!--rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId></dependency>
配置类:
package com.jds.rabbitmq; import com.jds.common.constant.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQConfig { public static final String RBA_QUEUE = "rba.queue"; /* public static final String QUEUE = "queue"; public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String HEADER_QUEUE = "header.queue"; public static final String TOPIC_EXCHANGE = "topicExchage"; public static final String FANOUT_EXCHANGE = "fanoutxchage"; public static final String HEADERS_EXCHANGE = "headersExchage"; *//** * Direct模式 交换机Exchange * *//* @Bean public Queue queue() { return new Queue(QUEUE, true); } *//** * Topic模式 交换机Exchange * *//* @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2, true); } @Bean public TopicExchange topicExchage(){ return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#"); } *//** * Fanout模式 交换机Exchange * *//* @Bean public FanoutExchange fanoutExchage(){ return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding FanoutBinding1() { return BindingBuilder.bind(topicQueue1()).to(fanoutExchage()); } @Bean public Binding FanoutBinding2() { return BindingBuilder.bind(topicQueue2()).to(fanoutExchage()); } *//** * Header模式 交换机Exchange * *//* @Bean public HeadersExchange headersExchage(){ return new HeadersExchange(HEADERS_EXCHANGE); } @Bean public Queue headerQueue1() { return new Queue(HEADER_QUEUE, true); } @Bean public Binding headerBinding() { Map<String, Object> map = new HashMap<String, Object>(); map.put("header1", "value1"); map.put("header2", "value2"); return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match(); }*/ /** * 订单消息实际消费队列所绑定的交换机 */ @Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单延迟队列队列所绑定的交换机 */ @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 订单实际消费队列 */ @Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 订单延迟队列(死信队列) */ @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键 .build(); } /** * 将订单队列绑定到交换机 */ @Bean Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /** * 将订单延迟队列绑定到交换机 */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } }
发送者:
package com.jds.rabbitmq; /** * @program: red-bag-activity->CancelOrderSender * @description: * @author: cxy * @create: 2019-12-20 17:57 **/ import com.jds.common.constant.QueueEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 取消订单消息的发出者 * */ @Component public class CancelOrderSender { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(Long orderId,final long delayTimes){ //给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); LOGGER.info("send delay message orderId:{}",orderId); } }
接受者:
package com.jds.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 取消订单消息的处理 */ @Component @RabbitListener(queues = "mall.order.cancel") public class CancelOrderReceiver { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); @RabbitHandler public void handle(Long orderId){ System.err.println(System.currentTimeMillis()); LOGGER.info("receive delay message orderId:{}",orderId); System.err.println(System.currentTimeMillis()); System.err.println(orderId); System.out.println("大傻逼222222"); } }
配置调用类:
package com.jds.controller; import com.jds.rabbitmq.CancelOrderSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * @program: red-bag-activity->DelayQController * @description: * @author: cxy * @create: 2019-12-20 18:00 **/ @RestController public class DelayQController { @Autowired CancelOrderSender cancelOrderSender; @RequestMapping(value = "/id", method = RequestMethod.GET) @ResponseBody public void detail() { long delayTimes = 3 * 1000; //发送延迟消息 cancelOrderSender.sendMessage(12333333L, delayTimes); System.out.println(12333333L); System.err.println(delayTimes); System.out.println(System.currentTimeMillis()); System.out.println(" 大傻逼"); } }
测试结果:
2019-12-20 21:48:44.932 |-INFO [http-nio-8082-exec-1] com.jds.rabbitmq.CancelOrderSender [40] -| send delay message orderId:12333333 12333333 1576849724932 大傻逼 3000 1576849727975 2019-12-20 21:48:47.975 |-INFO [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] com.jds.rabbitmq.CancelOrderReceiver [21] -| receive delay message orderId:12333333 1576849727975 大傻逼222222 12333333
调用:
http://localhost:8082/id,可以看到时间会延迟三秒。
实现主要地方:
new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; }
给参数设置延迟时间