liym 2018-06-26
  public class Producer {
     private static final String EXCHANGE_name="rabbit_test_exchange";
     private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
     public static void main(String[] args)throws Exception{
         //先和RabbitMQ Server建立連接
        ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("localhost");
         factory.setPort(5672);
         factory.setUsername("duanzx");
         factory.setPassword("duanzx");
         factory.setVirtualHost("/duanzx_host");
         Connection connection = factory.newConnection();
         //创建出连接通道
        Channel channel = connection.createChannel();
         //声明交换机名称和类型(直接根据路由键)
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
         //向RabbitMQ发送消息
        channel.basicPublish(EXCHANGE_NAME,EXCHANGE_ROOTING_KEY, MessageProperties.TEXT_PLAIN,"just for test".getBytes());
         //关闭和RabbitMQ Server之间的通道
        channel.close();
         //关闭连接
        connection.close();
     }
 }
public class ConsumerTest {
     private static final String EXCHANGE_name="rabbit_test_exchange";
     private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
     private static final String QUEUE_name="rabbit_test_queue";
     public static void main(String[] args)throws Exception{
         //先和RabbitMQ Server建立連接
        ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("localhost");
         factory.setPort(5672);
         factory.setUsername("duanzx");
         factory.setPassword("duanzx");
         factory.setVirtualHost("/duanzx_host");
         Connection connection = factory.newConnection();
         //创建出连接通道
        Channel channel = connection.createChannel();
         //声明交换机名称和类型(直接根据路由键)
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
         //声明队列名称
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         //将队列绑定到交换机上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROOTING_KEY);
         //读取队列里的数据并进行处理
        Consumer consumer = new DefaultConsumer(channel){
             @Override
            public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                     throws IOException
             {
                 try {
                     System.out.println(new String(body,"UTF-8"));
                 } catch (Exception e) {
                     e.printStackTrace();
                 }finally {
                     channel.basicAck(envelope.getDeliveryTag(),false);
                 }
             }
         };
         channel.basicConsume(QUEUE_NAME,false,consumer);
     }
 }