newbird0 2019-06-29
因公司业务需要,最近在设计一个通用队列功能模块,主体要求两大点:
消息队列的作用有:异步化、解耦和消除峰值等。目前异步化对于我来说使用最频繁,在很多业务场景下,我们可以将实时性要求较低的请求转为异步处理,减小系统负载压力,提高系统稳定性。在离线数据异步处理过程中,消息队列要满足以下要求:
以上是队列实现的说明,具体用MySql实现事务型消息队列可以参考文章
https://spockwangs.github.io/...
此次设计的表结构如下:
CREATE TABLE `comom_queue` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id', `type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '队列类型,代码业务备注', `conn_id` int(11) NOT NULL DEFAULT '0' COMMENT '消费者标识', `param_content` text COMMENT '队列入参', `callback` varchar(255) NOT NULL DEFAULT '' COMMENT '队列消费回调函数', `status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '0新建 1消费中 2成功 3失败 4需重试', `create_time` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间', `update_time` int(11) NOT NULL DEFAULT '0' COMMENT '状态变更时间', `preexec_time` int(11) NOT NULL DEFAULT '0' COMMENT '预消费时间', `p_key` varchar(100) NOT NULL DEFAULT '' COMMENT '业务唯一标识key,查询用', `mark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注', PRIMARY KEY (`id`), KEY `indx_s` (`p_key`,`type`) USING BTREE, KEY `indx_exec` (`conn_id`,`status`) USING BTREE, KEY `indx_ty` (`type`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
说明下几个字段的设计:
此次php多进程的实现依赖pcntl,posix扩展,读者可自行检查是否安装了此拓展。queue队列服务设计和实现包括以下功能点:
不多说了,直接看代码,抽离出来的queue服务类代码如下:
<?php /** * Created by PhpStorm. * User: Javion * Date: 2018/12/7 * Time: 15:10 */ abstract class queue { protected $process = []; // 子进程数组 ['type' => 'process_num'] protected $child = []; // 子进程pid数组 protected $result = []; // 计算的结果 protected $overTime = 0; //主进程超时时间 protected $startTime; //主进程运行时间 protected $childOverTime = 3600; //子进程超时时间 protected $alarm_time = 2; public function __construct($process = [], $overTime = 0, $childOverTime = 3600) { if (!function_exists('pcntl_fork')) { die("pcntl_fork not existing"); } $this->process = $process; $this->overTime = $overTime; $this->childOverTime = $childOverTime; $this->startTime = time(); } /** * 设置子进程 */ public function setProcess($process) { $this->process = $process; } /** * 设置检测时间间隔 单位s */ public function setAlarmTime($time){ $this->alarm_time = $time; } /** * fork 子进程 */ protected function forkProcess() { //循环创建每个type 的消费子进程 $process = $this->process; foreach($process as $key => $num) { for ($i = 0; $i < $num; $i++){ $this->forkOneProcess($key); } } return $this; } /** * 创建子进程操作 * @param $key * @return $this */ private function forkOneProcess($key) { $pid = pcntl_fork(); if ($pid == 0) { $id = getmypid(); $this->processDo($id, $key); exit(0); } else if ($pid > 0) { //记录子进程信息 $childProcess = array( 'pid' => $pid, 'type' => $key, 'create_time' => time() ); $this->child[$pid] = $childProcess; } return $this; } /** * 子进程做的事情,消费者 */ abstract protected function processDo($id, $key); /** * 队列数量检测 */ abstract protected function checkQueueNum(); /** * 等待子进程结束 */ protected function waiteProcess() { while(count($this->child)) { foreach($this->child as $pid => $item){ $res = pcntl_waitpid($pid,$status,WNOHANG); pcntl_signal_dispatch(); if ( -1 == $res || $res > 0 ) { unset($this->child[$pid]); echo "pid $pid 退出", PHP_EOL; //判断主进程是否超时 未超时拉起新的子进程 $leftTime = time() - $this->startTime; if ($this->overTime > $leftTime){ $this->forkOneProcess($item['type']); echo "创建新进程", PHP_EOL; } }//判断子进程是否存在且超时,超过时限20分钟则强制退出 elseif (posix_kill($pid, 0) && (time() - $item['create_time'] - 20*60) > $this->childOverTime){ posix_kill($pid, SIGUSR1); echo "pid $pid 退出2", PHP_EOL; } } } return $this; } /** * 队列检测 */ protected function timeHandler(){ $this->checkQueueNum(); pcntl_alarm($this->alarm_time); } /** * 启动 */ public function runProcess() { //注册信号 pcntl_signal(SIGALRM, array($this, 'timeHandler')); pcntl_alarm($this->alarm_time); $leftTime = time() - $this->startTime; while(($this->overTime ==0 || $this->overTime > $leftTime)){ echo "新进程processlist", PHP_EOL; $this->forkProcess()->waiteProcess(); $leftTime = time() - $this->startTime; } } }
最后一个功能点:各个业务子进程数可配置正常拉起数和最大进程数,根据队列积压情况,子进程动态启动进程数 暂未实现。目前的queue服务设计如上,请各位看官多多指教!