liym 2018-06-26
public class Producer {
private static final String EXCHANGE_name="rabbit_test_exchange";
private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
public static void main(String[] args)throws Exception{
//先和RabbitMQ Server建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("duanzx");
factory.setPassword("duanzx");
factory.setVirtualHost("/duanzx_host");
Connection connection = factory.newConnection();
//创建出连接通道
Channel channel = connection.createChannel();
//声明交换机名称和类型(直接根据路由键)
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//向RabbitMQ发送消息
channel.basicPublish(EXCHANGE_NAME,EXCHANGE_ROOTING_KEY, MessageProperties.TEXT_PLAIN,"just for test".getBytes());
//关闭和RabbitMQ Server之间的通道
channel.close();
//关闭连接
connection.close();
}
}
public class ConsumerTest {
private static final String EXCHANGE_name="rabbit_test_exchange";
private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
private static final String QUEUE_name="rabbit_test_queue";
public static void main(String[] args)throws Exception{
//先和RabbitMQ Server建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("duanzx");
factory.setPassword("duanzx");
factory.setVirtualHost("/duanzx_host");
Connection connection = factory.newConnection();
//创建出连接通道
Channel channel = connection.createChannel();
//声明交换机名称和类型(直接根据路由键)
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明队列名称
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROOTING_KEY);
//读取队列里的数据并进行处理
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
try {
System.out.println(new String(body,"UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}