OnMyHeart 2019-12-28
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yuan</groupId>
<artifactId>rabbitmq03</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq03</name>
<packaging>pom</packaging>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<modules>
<module>rabbitmq-provider</module>
<module>rabbitmq-consumer</module>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yuan</groupId>
<artifactId>rabbitmq03</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>rabbitmq-provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-provider</name>
<description>子模块-生产者</description>
<packaging>jar</packaging>
</project>
QueueDelayConfig
package com.yuan.rabbitmqprovider.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.lang.model.element.NestingKind;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class QueueDelayConfig {
/**
* 定义正常的队列、交换机、路由键
*/
public static final String NORMAL_QUEUE="normal-queue";
public static final String NORMAL_EXCHANGE="normal-exchange";
public static final String NORMAL_ROUTINGKEY="normal-routingkey";
/**
* 定义死信的队列、交换机、路由键
*/
public static final String DELAY_QUEUE="delay-queue";
public static final String DELAY_EXCHANGE="delay-exchange";
public static final String DELAY_ROUTINGKEY="delay-routingkey";
/**
* 定义正常队列
* @return
*/
@Bean
public Queue normalQueue(){
//设定消息过期时间/死信交换机/死信路由键3个参数
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 15000);//message在该队列queue的存活时间最大为15秒
map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键
return new Queue(NORMAL_QUEUE, true, false, false, map);
}
@Bean
public DirectExchange normalExchange(){
return new DirectExchange(NORMAL_EXCHANGE, true, false);
}
@Bean
public Binding normalRoutingkey(){
return BindingBuilder.bind(normalQueue())
.to(normalExchange())
.with(NORMAL_ROUTINGKEY);
}
/**
* 定义死信队列
*/
@Bean
public Queue delayQueue(){
return new Queue(DELAY_QUEUE, true);
}
@Bean
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE);
}
@Bean
public Binding delayRoutingkey(){
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with(DELAY_ROUTINGKEY);
}
}
SendController
package com.yuan.rabbitmqprovider.controller;
import com.yuan.rabbitmqprovider.rabbitmq.QueueDelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
@RestController
@Slf4j
public class SendController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sender")
public Map<String, Object> sender(){
Map<String, Object> data = this.createData();
rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE,
QueueDelayConfig.NORMAL_ROUTINGKEY,data);
Map<String, Object> result = new HashMap<String, Object>();
result.put("msg","OK");
result.put("code","1");
return result;
}
private Map<String, Object> createData(){
Map<String, Object> map = new HashMap<String, Object>();
String date = LocalDateTime.now().format(DateTimeFormatter.BASIC_ISO_DATE.
ofPattern("yyyy-MM-dd HH:mm:ss"));
map.put("msg","hello rabbitmq!!");
map.put("success",true);
map.put("createdate", date);
return map;
}
}server:
port: 8081
servlet:
context-path: /rabbitmq-provider
spring:
rabbitmq:
virtual-host: /
username: guest
password: guest
host: 192.168.238.129
port: 5672<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yuan</groupId>
<artifactId>rabbitmq03</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-consumer</name>
<description>子模块-消费者</description>
<packaging>jar</packaging>
</project>package com.yuan.rabbitmqconsumer.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
@RabbitListener(queues = {"delay-queue"}) //消费端监听队列,如果delay-queue死信队列中有消息过来就会被消费掉
public class QueueRecevier {
@RabbitHandler
public void handlerMessage(Map<String, Object> data){
log.info("QueueRecevier.handlerMessage,data={}",data);
}
}
标红处的log使用需要下载一个插件Lombok
直接右边install, 然后重启idea
yml文件配置
server:
port: 8082
servlet:
context-path: /rabbitmq-consumer
spring:
rabbitmq:
virtual-host: /
username: guest
password: guest
host: 192.168.238.129
port: 5672


启动消费端,消费端会根据我们设定的监听去监听队列中是否有消息有则会被消费掉。。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yuan</groupId>
<artifactId>rabbitmq03</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>common-vo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>common-vo</name>
<packaging>jar</packaging>
<description>公共子模块</description>
</project>
创建一个model的Package,创建一个Order
package com.yuan.commonvo.model;
import lombok.Data;
import java.lang.reflect.ParameterizedType;
import java.util.Date;
@Data
public class Order {
private long orderId;
private String orderNo;
private Date createdate;
}
vo包下创建一个OrderVo
package com.yuan.commonvo.vo;
import com.yuan.commonvo.model.Order;
public class OrderVo extends Order {
}
完了之后在父模块中添加common-vo子模块的一个pom依赖
<modules>
<module>rabbitmq-provider</module>
<module>rabbitmq-consumer</module>
<module>common-vo</module>
</modules>
<dependency> <groupId>com.yuan</groupId> <artifactId>common-vo</artifactId> <version>0.0.1-SNAPSHOT</version></dependency>
修改生产者SendController
@RequestMapping("/sender")
public Map<String, Object> sender(){
// Map<String, Object> data = this.createData();
OrderVo orderVo = new OrderVo();
orderVo.setOrderId(1);
orderVo.setOrderNo("P001");
rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE,
QueueDelayConfig.NORMAL_ROUTINGKEY,orderVo);
Map<String, Object> result = new HashMap<String, Object>();
result.put("msg","OK");
result.put("code","1");
return result;
}
添加QueueProviderMessageConvert
package com.yuan.rabbitmqprovider.rabbitmq;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class QueueProviderMessageConvert { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }}
修改消费端QueueRecevier
package com.yuan.rabbitmqconsumer.controller;
import com.yuan.commonvo.vo.OrderVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RabbitListener(queues = {"delay-queue"}) //消费端监听队列,如果delay-queue死信队列中有消息过来就会被消费掉
public class QueueRecevier {
@RabbitHandler
public void handlerMessage(OrderVo orderVo){
log.info("QueueRecevier.handlerMessage,data={}",orderVo);
}
}
添加消费端QueueRecevierMessageConvert
package com.yuan.rabbitmqconsumer.controller;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class QueueRecevierMessageConvert { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }}