PHP与RabbitMQ(下)

lishijian 2019-12-17

RabbitMQ是用来在多个异构系统之间进行数据交换的,生产者无需知道消费者的存在,消费者也无需关注生产者的行为,从而进行解耦,提高系统性能。

PHP与RabbitMQ(下)

这里生产者只需要将数据送到队列里面,就完成了任务,而消费者只需要去某个队列消费,就接收到了数据。那么生产者是如何将数据送到指定的队列呢?这里就用到了交换机。

PHP与RabbitMQ(下)

RabbitMQ中有四种交换机:

1.Direct(默认)

Direct交换机是路由键匹配的话,就将消息投递到相应的队列:

protected function getConn()
{
    $conf = Yii::$app->params[‘rabbitMQ‘];
    $conn = new AMQPStreamConnection($conf[‘host‘], $conf[‘port‘], $conf[‘user‘], $conf[‘password‘], $conf[‘vhost‘]);

    return $conn;
}
$conn = $this->getConn();
$channel = $conn->channel();

/*
 * 声明一个white队列和一个green队列
 * 参数介绍:
 * passive = false 没有此队列时不会报错
 * durable = true 队列持久化
 * exclusive = false 当一个连接关闭时队列不会删除
 * auto_delete = false 当最后一个消费者断开连接后队列不会自动删除
 * nowait = false 不返回执行结果
*/
$channel->queue_declare(‘white‘, false, true, false, false, false);
$channel->queue_declare(‘green‘, false, true, false, false, false);

//声明direct交换机
$channel->exchange_declare(‘direct_exchange‘, ‘direct‘, false, true, false, false, false);

/**
 * 将white队列绑定到direct交换机上
 * 此步骤可以省略,默认队列会有一个同名路由键,所以上一章中生产者是这样发送消息的:
 * $channel->basic_publish($msg, ‘‘, ‘white‘);
 */
$channel->queue_bind(‘white‘, ‘direct_exchange‘, ‘color.white‘);

$msgBody = ‘Hello White Rabbit!‘;

//delivery_mode = 2,消息持久化
$msg = new AMQPMessage($msgBody, [‘content_type‘ => ‘text/plain‘, ‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

$channel->basic_publish($msg, ‘direct_exchange‘, ‘color.white‘);

2.Fanout

Fanout交换机不匹配路由,会将消息投递给所有绑定该交换机的队列:

//声明fanout交换机
$channel->exchange_declare(‘fanout_exchange‘, ‘fanout‘, false, true, false, false, false);

//绑定white和green队列,fanout交换机无需路由键
$channel->queue_bind(‘white‘, ‘fanout_exchange‘);
$channel->queue_bind(‘green‘, ‘fanout_exchange‘);

//以下一行代码会同时给white和green队列发送一条消息
$channel->basic_publish($msg, ‘fanout_exchange‘);

3.Topic

Topic交换机和Direct交换机类似,只不过会根据定义的规则进行投递:

//声明topic交换机
$channel->exchange_declare(‘topic_exchange‘, ‘topic‘, false, true, false, false, false);

/*
 * 绑定white和green队列
 * "#"表示0个或多个关键字, "*"表示匹配一个关键字
 */
$channel->queue_bind(‘white‘, ‘topic_exchange‘, ‘color.*‘);
$channel->queue_bind(‘green‘, ‘topic_exchange‘, ‘color.#‘);

//以下一行代码会给green队列发送一条消息
$channel->basic_publish($msg, ‘topic_exchange‘, ‘color.white.green‘);

4.Headers

Headers交换机会根据消息的头部进行投递:

//声明headers交换机
$channel->exchange_declare(‘headers_exchange‘, ‘headers‘, false, true, false, false, false);

$channel->queue_bind(‘white‘, ‘headers_exchange‘, ‘‘, false, new AMQPTable([
    ‘x-match‘ => ‘all‘, //要求所有参数都要符合
    ‘author‘ => ‘74percent‘,
    ‘type‘ => ‘color‘
]));

$channel->queue_bind(‘green‘, ‘headers_exchange‘, ‘‘, false, new AMQPTable([
    ‘x-match‘ => ‘any‘, //要求任意参数符合即可
    ‘author‘ => ‘74percent‘,
]));

$msg->set(‘application_headers‘, new AMQPTable([
    ‘author‘ => ‘74percent‘
]));

//以下一行代码会给green队列发送一条消息
$channel->basic_publish($msg, ‘headers_exchange‘);

相关推荐