OnMyHeart 2020-05-27
1,安装依赖库composer require php-amqplib/php-amqplib
地址:https://github.com/php-amqplib/php-amqplib
2,mq生产者.php
include(__DIR__ . ‘../../public/config.php‘); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * Created by PhpStorm. * User: pandeng * Date: 2017-07-26 * Time: 21:51 */ class MessageQueue { const exchange = ‘router‘; const queue = ‘msgs‘; public static function pushMessage($data) { $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); $channel->queue_declare(self::queue, false, true, false, false); $channel->exchange_declare(self::exchange, ‘direct‘, false, true, false); $channel->queue_bind(self::queue, self::exchange); $messageBody = $data; $message = new AMQPMessage($messageBody, array(‘content_type‘ => ‘text/plain‘, ‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message, self::exchange); $channel->close(); $connection->close(); return "ok"; } }
3.消费者.php
namespace app\index\controller; include(__DIR__ . ‘../../../../public/config.php‘); use PhpAmqpLib\Connection\AMQPStreamConnection; use think\Controller; use think\Log; use think\Request; use think\Db; class MessageConsume extends Controller { const exchange = ‘router‘; const queue = ‘msgs‘; const consumerTag = ‘consumer‘; function shutdown($channel, $connection) { $channel->close(); $connection->close(); write_log("closed",3); } function process_message($message) { if ($message->body !== ‘quit‘) { $obj = json_decode($message->body); if (!isset($obj->id)) { echo ‘error data\n‘; write_log("error data:" . $message->body, 2); } else { try { write_log("data:" . json_encode($message)); } catch (\Think\Exception $e) { write_log($e->getMessage(), 2); write_log(json_encode($message), 2); } catch (\PDOException $pe) { write_log($pe->getMessage(), 2); write_log(json_encode($message), 2); } } } $message->delivery_info[‘channel‘]->basic_ack($message->delivery_info[‘delivery_tag‘]); // Send a message with the string "quit" to cancel the consumer. if ($message->body === ‘quit‘) { $message->delivery_info[‘channel‘]->basic_cancel($message->delivery_info[‘consumer_tag‘]); } } /** * 启动 * * @return \think\Response */ public function start() { $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); $channel->queue_declare(self::queue, false, true, false, false); $channel->exchange_declare(self::exchange, ‘direct‘, false, true, false); $channel->queue_bind(self::queue, self::exchange); $channel->basic_consume(self::queue, self::consumerTag, false, false, false, false, array($this, ‘process_message‘)); register_shutdown_function(array($this, ‘shutdown‘), $channel, $connection); while (count($channel->callbacks)) { $channel->wait(); } write_log("starting",3); } }
nohup php index.php index/Message_Consume/start &
原文链接:https://www.jianshu.com/p/89dc541c6362