白话RabbitMQ(五): 主题路由器(Topic Exchange)

nxin的小抄本 2019-06-26

推广

RabbitMQ专题讲座
CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面

前言

在之前的建立路由中我们改进了日志系统。我们摒弃无脑发送消息的广播路由器,而使用能够根据绑定键(binding key)来发送消息的,从而能有有选择的后去logs.

尽管使用直达路由器大大的改进了我们系统,但也存在局限性 - 无法加入更多条件。比如我们希望能够加入更多的维度,我们希望不仅是基于严重程度,而且是基于来源,如果你对linux tool工具有了解的话,它不仅仅是基于严重程度(info/warn/crit...) 而且有来源(auth/cron/kern...),这个给到我们更大的灵活性-我们需要监听所有来自'cron'的errors消息,以及来自'kern'的所有log。所以我们需要的是一个更复杂的主题交换机

主题交换机

发送到主题交换机的消息并不会有一个确定的路由键-而是一长串字符列表,以"."来分割,而这个字符串列表表明了路由信息,比如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit",字符串的最大长度限制在255bytes。

同时,在队列绑定交换机时也需要指定模式,而符合模式的消息将会被发送至该队列,模式可以由通配符组成:
'*' 可以表示一个词
'#' 表示0个或多个词
可以通过如下的例子来说明

白话RabbitMQ(五): 主题路由器(Topic Exchange)

请看例子,以发送动物的消息为例,我们会发送包含三个词的路由键(两个".")。第一个是速度,第二个是颜色,而第三个是种族

同时,我们建立了三个绑定,Q1绑定了键".orange.",Q2绑定了键"..rabbit"以及"lazy.#"。可以做如下的解释,Q1用来接受所有orange的动物,Q2用来接受所有rabbits,以及lazy的动物

一个路由为"quick.orange.rabbit"的消息将会被同时发送给这两个队列,消息"lazy.orange.elephant"也会被同时发给它们;"quick.orange.fox"只会发给第一个队列;"lazy.brown.fox"会发到第二个;"lazy.pink.rabbit"将只会发送给第二个;"quick.brown.fox"会被丢弃因为匹配不上任何一个。

如果我们发送四个词的呢?比如"oragne"或者"quick.orange.male.rabbit"?这些没有任何匹配的队列将会丢失。但比如"quick.orange.male.rabbit"会匹配到第二个队列。

主题交换机也可以当成其它交换机来使用,假如队列绑定到了 "#",那么它会接收所有的消息,就像广播路由器一样;而如果未使用"*","#",那么就跟直达路由器一样了。

整合所有的代码

我们用主题交换机替换掉之前的直达交换机,用如同"<facility>.<severity>"的形式, EmitLogTopic.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.java的代码片段

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {
  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
      System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
      System.exit(1);
    }

    for (String bindingKey : argv) {
      channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

编译这段代码

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接受所有的logs

java -cp $CP ReceiveLogsTopic "#"

接受来自"kern"的消息

java -cp $CP ReceiveLogsTopic "kern.*"

接受来自"critical"的消息

java -cp $CP ReceiveLogsTopic "*.critical"

创建多个绑定

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

发送消息

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

你可以尝试更多的参数,以此来熟悉这个知识

相关推荐