OnMyHeart 2020-05-15
RabbitMQ的路由模式,可以简单理解为,根据exchange绑定的key,将消息路由到不同的queue,模型图如下:
上图中几个关键点:
Publisher:消息的生产者
Exchange:路由,类型为direct
Queue:队列,存储消息的地方
Consumer:消费者
生产者不直接直接发送消息到队列的,只发送到相应的exchange上,代码如下:
static string EXCHANGE_NAME = "topic_exchange_direct"; static void TopicPublisher() { var conn = RabbitMQHelper.GetConnection(); var channel = conn.CreateModel(); channel.ExchangeDeclare(exchange: EXCHANGE_NAME,type:ExchangeType.Direct); string routingKey = "error"; var msg = $"hello topic {routingKey}!"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine($"消息:{msg} ,发送完成!"); channel.Close(); conn.Close(); }
发送成功,图如下:
我们去rabbitmq的管理页面,可以看到消息已经发送成功,但是还没有消费者绑定到这个exchange上:
要消费topic类型的消息,消费者需要绑定自己需要的路由键(rountingKey),一个消费者可以绑定多个key
static string EXCHANGE_NAME = "topic_exchange_direct"; static string QUEUE_NAME = "topic_queue_direct1"; static void TopicConsumer1() { var conn = RabbitMQHelper.GetConnection(); var channel = conn.CreateModel(); channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct); channel.QueueDeclare(queue: QUEUE_NAME); // 绑定routingKey string routingKey = "error"; channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"Topic Consumer1 收到消息: {message},时间:{DateTime.Now}"); }; channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer); Console.ReadKey(); channel.Close(); conn.Close(); }
static string EXCHANGE_NAME = "topic_exchange_direct"; static string QUEUE_NAME = "topic_queue_direct2"; static void TopicConsumer2() { var conn = RabbitMQHelper.GetConnection(); var channel = conn.CreateModel(); channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct); channel.QueueDeclare(queue: QUEUE_NAME); string routingKeyInfo = "info"; string routingKeyError = "error"; string routingKeyWarning = "warning"; // 可以绑定多个routingKey channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKeyInfo); channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKeyError); channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKeyWarning); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"Topic Consumer2 收到消息: {message},时间:{DateTime.Now}"); }; channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer); Console.ReadKey(); channel.Close(); conn.Close(); }
(1) 启动两个消费者后,我们可以看到一共有四个key绑定到交换机上;
消费者1的队列(topic_queue_direct2)绑定了一个key:error,消费者2的队列(topic_queue_direct2)绑定了三个key:error、info、warning
(2)发送一个rountingKey为error的消息,再发送一个rountingKey为info的消息,我们查看客户端接收消息的情况:
消费者1只接收到了一条消息,图如下:
消费者2两条消息都接收到了,图如下:
至此,我们的Topic模式已经验证完毕。
1、Routing模式需要绑定到交换机,类型为direct
2、routingKey为分发消息的依据,生产者发送消息时要指定routingKey;消费者可以绑定多个routingKey;
参考资料:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html