OnMyHeart 2019-12-28
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.yuan</groupId> <artifactId>rabbitmq03</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq03</name> <packaging>pom</packaging> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <modules> <module>rabbitmq-provider</module> <module>rabbitmq-consumer</module> </modules> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.yuan</groupId> <artifactId>rabbitmq03</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>rabbitmq-provider</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-provider</name> <description>子模块-生产者</description> <packaging>jar</packaging> </project> QueueDelayConfig package com.yuan.rabbitmqprovider.rabbitmq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.lang.model.element.NestingKind; import java.util.HashMap; import java.util.Map; @Configuration public class QueueDelayConfig { /** * 定义正常的队列、交换机、路由键 */ public static final String NORMAL_QUEUE="normal-queue"; public static final String NORMAL_EXCHANGE="normal-exchange"; public static final String NORMAL_ROUTINGKEY="normal-routingkey"; /** * 定义死信的队列、交换机、路由键 */ public static final String DELAY_QUEUE="delay-queue"; public static final String DELAY_EXCHANGE="delay-exchange"; public static final String DELAY_ROUTINGKEY="delay-routingkey"; /** * 定义正常队列 * @return */ @Bean public Queue normalQueue(){ //设定消息过期时间/死信交换机/死信路由键3个参数 Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 15000);//message在该队列queue的存活时间最大为15秒 map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX) map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键 return new Queue(NORMAL_QUEUE, true, false, false, map); } @Bean public DirectExchange normalExchange(){ return new DirectExchange(NORMAL_EXCHANGE, true, false); } @Bean public Binding normalRoutingkey(){ return BindingBuilder.bind(normalQueue()) .to(normalExchange()) .with(NORMAL_ROUTINGKEY); } /** * 定义死信队列 */ @Bean public Queue delayQueue(){ return new Queue(DELAY_QUEUE, true); } @Bean public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE); } @Bean public Binding delayRoutingkey(){ return BindingBuilder.bind(delayQueue()) .to(delayExchange()) .with(DELAY_ROUTINGKEY); } } SendController package com.yuan.rabbitmqprovider.controller; import com.yuan.rabbitmqprovider.rabbitmq.QueueDelayConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; @RestController @Slf4j public class SendController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sender") public Map<String, Object> sender(){ Map<String, Object> data = this.createData(); rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE, QueueDelayConfig.NORMAL_ROUTINGKEY,data); Map<String, Object> result = new HashMap<String, Object>(); result.put("msg","OK"); result.put("code","1"); return result; } private Map<String, Object> createData(){ Map<String, Object> map = new HashMap<String, Object>(); String date = LocalDateTime.now().format(DateTimeFormatter.BASIC_ISO_DATE. ofPattern("yyyy-MM-dd HH:mm:ss")); map.put("msg","hello rabbitmq!!"); map.put("success",true); map.put("createdate", date); return map; } }
server: port: 8081 servlet: context-path: /rabbitmq-provider spring: rabbitmq: virtual-host: / username: guest password: guest host: 192.168.238.129 port: 5672
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.yuan</groupId> <artifactId>rabbitmq03</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>rabbitmq-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-consumer</name> <description>子模块-消费者</description> <packaging>jar</packaging> </project>
package com.yuan.rabbitmqconsumer.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @Slf4j @RabbitListener(queues = {"delay-queue"}) //消费端监听队列,如果delay-queue死信队列中有消息过来就会被消费掉 public class QueueRecevier { @RabbitHandler public void handlerMessage(Map<String, Object> data){ log.info("QueueRecevier.handlerMessage,data={}",data); } } 标红处的log使用需要下载一个插件Lombok 直接右边install, 然后重启idea yml文件配置 server: port: 8082 servlet: context-path: /rabbitmq-consumer spring: rabbitmq: virtual-host: / username: guest password: guest host: 192.168.238.129 port: 5672
启动消费端,消费端会根据我们设定的监听去监听队列中是否有消息有则会被消费掉。。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.yuan</groupId> <artifactId>rabbitmq03</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>common-vo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>common-vo</name> <packaging>jar</packaging> <description>公共子模块</description> </project> 创建一个model的Package,创建一个Order package com.yuan.commonvo.model; import lombok.Data; import java.lang.reflect.ParameterizedType; import java.util.Date; @Data public class Order { private long orderId; private String orderNo; private Date createdate; } vo包下创建一个OrderVo package com.yuan.commonvo.vo; import com.yuan.commonvo.model.Order; public class OrderVo extends Order { } 完了之后在父模块中添加common-vo子模块的一个pom依赖 <modules> <module>rabbitmq-provider</module> <module>rabbitmq-consumer</module> <module>common-vo</module> </modules> <dependency> <groupId>com.yuan</groupId> <artifactId>common-vo</artifactId> <version>0.0.1-SNAPSHOT</version></dependency> 修改生产者SendController @RequestMapping("/sender") public Map<String, Object> sender(){ // Map<String, Object> data = this.createData(); OrderVo orderVo = new OrderVo(); orderVo.setOrderId(1); orderVo.setOrderNo("P001"); rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE, QueueDelayConfig.NORMAL_ROUTINGKEY,orderVo); Map<String, Object> result = new HashMap<String, Object>(); result.put("msg","OK"); result.put("code","1"); return result; } 添加QueueProviderMessageConvert package com.yuan.rabbitmqprovider.rabbitmq;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class QueueProviderMessageConvert { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }} 修改消费端QueueRecevier package com.yuan.rabbitmqconsumer.controller; import com.yuan.commonvo.vo.OrderVo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j @RabbitListener(queues = {"delay-queue"}) //消费端监听队列,如果delay-queue死信队列中有消息过来就会被消费掉 public class QueueRecevier { @RabbitHandler public void handlerMessage(OrderVo orderVo){ log.info("QueueRecevier.handlerMessage,data={}",orderVo); } } 添加消费端QueueRecevierMessageConvert package com.yuan.rabbitmqconsumer.controller;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class QueueRecevierMessageConvert { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }}