RabbitMq三种Exchange模式—订阅、路由、通配符

zhihuijiao 2019-11-11

三种Exchange模式——订阅、路由、通配符模式

一、订阅模式(Fanout Exchange)

一个生产者绑定一个交换机,每个消费者绑定一个队列。生产者将消息通过交换器分发给所有在线的消费者。交换机没有消息存储的能力,只能向当前在线的消费者发送消息。因此未接收到消息的消费者,即使重新连接rabbitmq也无法获取到已发送的消息

示例代码:
生产者:

[Java] 纯文本查看 复制代码
?

<font style="color:rgb(79, 79, 79)"><font face="&quot;"><font style="font-size:16px">public class Send {/size/color

private final static String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    //从连接中创建通道
    Channel channel = connection.createChannel();

    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    // 消息内容
    String message = "商品已经新增,id = 1000";
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
     
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
}

}
</font></font></font>

消费者:

[Java] 纯文本查看 复制代码
?

public class Recv {

private final static String QUEUE_NAME = "test_queue_fanout_1";

private final static String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);

    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, true, consumer);

    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" 前台系统: '" + message + "'");
        Thread.sleep(10);

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}

}

二、路由模式(Direct Exchange)
这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,这样就可以接收到需要接收的消息。

示例代码:
生产者:

[Java] 纯文本查看 复制代码
?

<font style="color:rgb(79, 79, 79)"><font face="&quot;"><font style="font-size:16px">public class Send {/size/color

private final static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    // 消息内容
    String message = "删除商品, id = 1001";
    channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
}

}
</font></font></font>

消费者1:接收更新和删除消息

[Java] 纯文本查看 复制代码
?

public class Recv {

private final static String QUEUE_NAME = "test_queue_direct_1";

private final static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] argv) throws Exception {

    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);

    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);

    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" 前台系统: '" + message + "'");
        Thread.sleep(10);

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}

}

消费者2:接收insert,update,delete的消息

[Java] 纯文本查看 复制代码
?

public class Recv2 {

private final static String QUEUE_NAME = "test_queue_direct_2";

private final static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] argv) throws Exception {

    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);

    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);

    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" 搜索系统: '" + message + "'");
        Thread.sleep(10);

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}

}

如果生产者发布了insert消息,那么消费者2可以收到,消费者 1收不到,如果发布了update或者delete消息,两个消费者都可以收到。如果发布ABC消息两个消费者都收不到,因为没有绑定这个键值。这种模式基本满足了我们的需求,但是还不够灵活,下面介绍另外一个模式。

三、通配符模式(Topic Exchange)

基本思想和路由模式是一样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词

示例代码:
生产者:

[Java] 纯文本查看 复制代码
?

<font style="color:rgb(79, 79, 79)"><font face="&quot;"><font style="font-size:16px">public class Send {/size/color

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");

    // 消息内容
    String message = "删除商品,id = 1001";
    channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
}

}
</font></font></font>

消费者1:

[Java] 纯文本查看 复制代码
?

public class Recv {

private final static String QUEUE_NAME = "test_queue_topic_1";

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);

    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);

    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" 前台系统: '" + message + "'");
        Thread.sleep(10);

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}

}

消费者2:

[Java] 纯文本查看 复制代码
?

public class Recv2 {

private final static String QUEUE_NAME = "test_queue_topic_2";

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");

    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);

    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);

    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" 搜索系统: '" + message + "'");
        Thread.sleep(10);

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}

}

消费者1是按需索取,并没有使用通配符模式,而是用的完全匹配,消费者2使用通配符模式,这样以item.开头的消息都会全部接收。

1.jpg (5.42 KB, 下载次数: 0)

1.jpg

相关推荐