网络编程与RabbitMQ消息通信

liym 2018-06-26

1.RabbitMQ简介,RabbitMQ是一个开源的消息中间件,负责接收Producer的消息,并将该消息放入队列中,然后Consumer从队列里取出消息并进行处理。
在这里Producer不需要关心Consumer如何处理消息,Consumer也不需要关心Producer的消息如何发送。两方实现了完全的解耦。代码实例:

  public class Producer {
    private static final String EXCHANGE_name="rabbit_test_exchange";
    private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";

    public static void main(String[] args)throws Exception{
        //先和RabbitMQ Server建立連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("duanzx");
        factory.setPassword("duanzx");
        factory.setVirtualHost("/duanzx_host");
        Connection connection = factory.newConnection();
        //创建出连接通道
        Channel channel = connection.createChannel();
        //声明交换机名称和类型(直接根据路由键)
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //向RabbitMQ发送消息
        channel.basicPublish(EXCHANGE_NAME,EXCHANGE_ROOTING_KEY, MessageProperties.TEXT_PLAIN,"just for test".getBytes());
        //关闭和RabbitMQ Server之间的通道
        channel.close();
        //关闭连接
        connection.close();
    }
}

public class ConsumerTest {

    private static final String EXCHANGE_name="rabbit_test_exchange";
    private static final String EXCHANGE_ROOTING_KEY="rabbit_test_routingkey";
    private static final String QUEUE_name="rabbit_test_queue";

    public static void main(String[] args)throws Exception{
        //先和RabbitMQ Server建立連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("duanzx");
        factory.setPassword("duanzx");
        factory.setVirtualHost("/duanzx_host");
        Connection connection = factory.newConnection();
        //创建出连接通道
        Channel channel = connection.createChannel();
        //声明交换机名称和类型(直接根据路由键)
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //声明队列名称
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //将队列绑定到交换机上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROOTING_KEY);
        //读取队列里的数据并进行处理
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                try {
                    System.out.println(new String(body,"UTF-8"));
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

2.Socket 网络通信包括同步阻塞,异步阻塞,同步非阻塞,异步非阻塞。
      其中同步指的是应用程序与系统之间的通信方式,假设应用程序与系统需要完成两个动作,如果第一个动作完成后才能执行第二个动作。则该通信方式是同步的。如果在执行第一个动作后,重新启动了另一个线程执行,而在主线程里不需要等待第一个动作完成后,就继续执行第二个动作。则该通信方式是异步的。
      阻塞指的是应用程序读取IO的操作方式。传统的IO操作都是阻塞的,对IO的操作都是在流里面进行,如果文件过大的话,执行时间会变长,并且内存占用的也会很多。
jdk1.5之后的NIO操作是非阻塞的。在操作时候会先将数据放到Buffer里,等到缓冲区保存好数据后再通知接收方处理数据。
同步非阻塞Socket通信代码实例:
public class ClientClass {
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("localhost", 8080));
            String message = "just for test";
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            byteBuffer.put(message.getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            socketChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (socketChannel != null) {
                socketChannel.close();
            }
        }
    }
}
 
public class SocketClass implements Runnable {

    private Selector selector;
    private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    public SocketClass(int port) {
        try {
            //声明Selector
            selector = Selector.open();
            //声明ServerSocketChannel
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //声明ServerSocketChannel通信方式为非阻塞
            serverSocketChannel.configureBlocking(false);
            //ServerSocketChannel端口号码
            serverSocketChannel.bind(new InetSocketAddress(port));
            //注册到Selector
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("start server port:" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        //一直执行该线程
        while (true) {
            try {
                //阻塞等待,直到声明READ事件的那些通道已经就绪。
                selector.select();
                //遍历所有已经注册的通道
                Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
                while (selectionKeyIterator.hasNext()) {
                    SelectionKey key = selectionKeyIterator.next();
                    selectionKeyIterator.remove();
                    if (key.isValid()) {
                        //如果是ACCEPT事件,代表ServerSocketChannel,此时应该接收请求
                        if (key.isAcceptable()) {
                            this.accept(key);
                        }
                        //如果是READ事件,代表SocketChannel,此时应该处理请求
                        if (key.isReadable()) {
                            this.read(key);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void accept(SelectionKey key) {
        //获取服务端通道
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        try {
            //接收客户端请求,并创建通道
            SocketChannel socketChannel = serverSocketChannel.accept();
            //声明SocketChannel通信方式为非阻塞
            socketChannel.configureBlocking(false);
            //注册到Selector
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void read(SelectionKey key) {
        try {
            //获取客户端连接通道
            SocketChannel socketChannel = (SocketChannel) key.channel();
            //先清空缓冲区里的数据
            byteBuffer.clear();
            //读取通道里的数据并写入到缓冲区
            int count = socketChannel.read(byteBuffer);
            //如果通道里没有数据,关闭通道并取消SelectionKey,
            if (count == -1) {
                socketChannel.close();
                key.cancel();
                return;
            }
            //将缓冲区由写模式切换到读模式
            byteBuffer.flip();
            byte[] bytes = new byte[byteBuffer.remaining()];
            //读取缓冲区里的数据并放入byte数组里
            byteBuffer.get(bytes);
            System.out.println("has receive request:"+new String(bytes));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws Exception {
        new SocketClass(8080).run();
    }
}

相关推荐