wbingyang 2020-05-26








<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>public class Producer {
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址
connectionFactory.setHost("192.168.47.132");
//连接端口: 默认5672
connectionFactory.setPort(5672);
//虚拟主机名: 默认 /
connectionFactory.setVirtualHost("/yellowstreak");
//连接用户名: 默认guest
connectionFactory.setUsername("yellowstreak");
//连接密码: 默认guest
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道 - 根据频道通信
Channel channel = connection.createChannel();
/**
* 参数1: 队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//要发送的消息
String message = "Hello World";
/**
* 参数1: 交换机exchange名称: 没有则使用默认Default Exchange
* 参数2: 路由key, 简单模式下可以传递队列名称
* 参数3: 消息的其他属性
* 参数4: 参数内容, 要转换成字节数组
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息");
// 关闭资源
channel.close();
connection.close();
}
}
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPort(5672);
connectionFactory.setHost("192.168.47.132");
connectionFactory.setVirtualHost("/yellowstreak");
connectionFactory.setUsername("yellowstreak");
connectionFactory.setPassword("123456");
return connectionFactory.newConnection();
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//去连通生产者的队列.
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
* @param envelope 消息包的内容,可从中获取消息id,路由Key,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* @param properties 属性信息
* @param body 消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key (这里是队列名)
System.out.println(envelope.getRoutingKey());
//消息id
System.out.println(envelope.getDeliveryTag());
//收到的消息
System.out.println(new String(body, "utf-8"));
}
};//监听消息
/**
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了, mq接收到回复会删除消息, 设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
//这里不关闭资源, 应该一直监听着..
//这样只要生产者发出消息, 消费者就能收到并消费.
}
}