nxin的小抄本 2019-06-26
我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面
在之前的建立路由中我们改进了日志系统。我们摒弃无脑发送消息的广播路由器,而使用能够根据绑定键(binding key)来发送消息的,从而能有有选择的后去logs.
尽管使用直达路由器大大的改进了我们系统,但也存在局限性 - 无法加入更多条件。比如我们希望能够加入更多的维度,我们希望不仅是基于严重程度,而且是基于来源,如果你对linux tool工具有了解的话,它不仅仅是基于严重程度(info/warn/crit...) 而且有来源(auth/cron/kern...),这个给到我们更大的灵活性-我们需要监听所有来自'cron'的errors消息,以及来自'kern'的所有log。所以我们需要的是一个更复杂的主题交换机
发送到主题交换机的消息并不会有一个确定的路由键-而是一长串字符列表,以"."来分割,而这个字符串列表表明了路由信息,比如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit",字符串的最大长度限制在255bytes。
同时,在队列绑定交换机时也需要指定模式,而符合模式的消息将会被发送至该队列,模式可以由通配符组成:
'*' 可以表示一个词
'#' 表示0个或多个词
可以通过如下的例子来说明
请看例子,以发送动物的消息为例,我们会发送包含三个词的路由键(两个".")。第一个是速度,第二个是颜色,而第三个是种族
同时,我们建立了三个绑定,Q1绑定了键".orange.",Q2绑定了键"..rabbit"以及"lazy.#"。可以做如下的解释,Q1用来接受所有orange的动物,Q2用来接受所有rabbits,以及lazy的动物
一个路由为"quick.orange.rabbit"的消息将会被同时发送给这两个队列,消息"lazy.orange.elephant"也会被同时发给它们;"quick.orange.fox"只会发给第一个队列;"lazy.brown.fox"会发到第二个;"lazy.pink.rabbit"将只会发送给第二个;"quick.brown.fox"会被丢弃因为匹配不上任何一个。
如果我们发送四个词的呢?比如"oragne"或者"quick.orange.male.rabbit"?这些没有任何匹配的队列将会丢失。但比如"quick.orange.male.rabbit"会匹配到第二个队列。
主题交换机也可以当成其它交换机来使用,假如队列绑定到了 "#",那么它会接收所有的消息,就像广播路由器一样;而如果未使用"*","#",那么就跟直达路由器一样了。
我们用主题交换机替换掉之前的直达交换机,用如同"<facility>.<severity>"的形式, EmitLogTopic.java
import com.rabbitmq.client.*; import java.io.IOException; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... }
ReceiveLogsTopic.java的代码片段
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
编译这段代码
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接受所有的logs
java -cp $CP ReceiveLogsTopic "#"
接受来自"kern"的消息
java -cp $CP ReceiveLogsTopic "kern.*"
接受来自"critical"的消息
java -cp $CP ReceiveLogsTopic "*.critical"
创建多个绑定
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
发送消息
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
你可以尝试更多的参数,以此来熟悉这个知识