theScoreONE 2019-06-27
前面几节都是讲解pcntl扩展实现的多进程程序。本节给大家介绍swoole扩展的swoole_process
模块。
swoole_process 是swoole提供的进程管理模块,用来替代PHP的pcntl扩展。
首先,确保安装的swoole版本大于1.7.2:
$ php --ri swoole swoole swoole support => enabled Version => 1.10.1
注意:swoole_process在最新的1.8.0版本已经禁止在Web环境中使用了,所以也只能支持命令行。
swoole提供的多进程扩展基本功能和pcntl提供的一样,但swoole更易简单上手,并且提供了:
swoole_process 模块提供的方法(Method)主要分为四部分:
swoole_process::__construct swoole_process->start swoole_process->name swoole_process->exec swoole_process->close swoole_process->exit swoole_process::kill swoole_process::wait swoole_process::daemon swoole_process::setAffinity
swoole_process->write swoole_process->read swoole_process->setTimeout swoole_process->setBlocking
swoole_process->useQueue swoole_process->statQueue swoole_process->freeQueue swoole_process->push swoole_process->pop
swoole_process::signal swoole_process::alarm
本例实现的是tcp server,特性:
<?php class TcpServer{ const MAX_PROCESS = 3;//最大进程数 private $pids = []; //存储子进程pid private $socket; private $mpid; public function run(){ $process = new swoole_process(function(){ $this->mpid = $id = getmypid(); echo time()." Master process, pid {$id}\n"; //创建tcp server $this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr); if(!$this->socket) exit("start server err: $errstr --- $errno"); for($i=0; $i<self::MAX_PROCESS;$i++){ $this->start_worker_process(); } echo "waiting client...\n"; //Master进程等待子进程退出,必须是死循环 while(1){ foreach($this->pids as $k=>$pid){ if($pid){ $res = swoole_process::wait(false); if ( $res ){ echo time()." Worker process $pid exit, will start new... \n"; $this->start_worker_process(); unset($this->pids[$k]); } } } sleep(1);//让出1s时间给CPU } }, false, false); //不启用管道通信 swoole_process::daemon(); //守护进程 $process->start();//注意:start之后的变量子进程里面是获取不到的 } /** * 创建worker进程,接受客户端连接 */ private function start_worker_process(){ $process = new swoole_process(function(swoole_process $worker){ $this->acceptClient($worker); }, false, false); $pid = $process->start(); $this->pids[] = $pid; } private function acceptClient(&$worker) { //子进程一直等待客户端连接,不能退出 while(1){ $conn = stream_socket_accept($this->socket, -1); if($this->onConnect) call_user_func($this->onConnect, $conn); //回调连接事件 //开始循环读取消息 $recv = ''; //实际收到消息 $buffer = ''; //缓冲消息 while(1){ $this->checkMpid($worker); $buffer = fread($conn, 20); //没有收到正常消息 if($buffer === false || $buffer === ''){ if($this->onClose) call_user_func($this->onClose, $conn); //回调断开连接事件 break;//结束读取消息,等待下一个客户端连接 } $pos = strpos($buffer, "\n"); //消息结束符 if($pos === false){ $recv .= $buffer; }else{ $recv .= trim(substr($buffer, 0, $pos+1)); if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); //回调收到消息事件 //客户端强制关闭连接 if($recv == "quit"){ echo "client close conn\n"; fclose($conn); break; } $recv = ''; //清空消息,准备下一次接收 } } } } //检查主进程是否存在,若不存在子进程在干完手头活后退出 public function checkMpid(&$worker){ if(!swoole_process::kill($this->mpid,0)){ $worker->exit(); // 这句提示,实际是看不到的.需要写到日志中 echo "Master process exited, I [{$worker['pid']}] also quit\n"; } } function __destruct() { @fclose($this->socket); } } $server = new TcpServer(); $server->onConnect = function($conn){ echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n"; fwrite($conn,"conn success\n"); }; $server->onMessage = function($conn,$msg){ echo "onMessage --" . $msg . "\n"; fwrite($conn,"received ".$msg."\n"); }; $server->onClose = function($conn){ echo "onClose --" . stream_socket_get_name($conn,true) . "\n"; fwrite($conn,"onClose "."\n"); }; $server->run();
运行后可以使用telnet连接:
telnet 127.0.0.1 9201
由于设置了最大三个子进程,最多只能接受3个客户端连接。
前面讲解的例子里,主进程和子进程直接是没有直接的数据交互的。如果主进程需要得到的来自子进程的反馈,或者子进程接受来自主进程的数据,那么就需要进程间通信了。
swoole内置了管道通信和消息队列通信。
管道通信主要是数据传输:一个进程需要将数据发送给另外一个进程。
这个swoole封装后,使用非常简单:
<?php $workers = []; for ($i=0; $i<3; $i++) { $process = new swoole_process(function(swoole_process $worker){ //子进程逻辑 $cmd = $worker->read(); ob_start(); passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。 $return = ob_get_clean() ? : ' '; $return = trim($return).". worker pid:".$worker->pid."\n"; // $worker->write($return);//写入数据到管道 echo $return;//写入数据到管道。注意:子进程里echo也是写入到管道 }, true); //第二个参数为true,启用管道通信 $pid = $process->start(); $workers[$pid] = $process; } foreach($workers as $pid=>$worker){ $worker->write('whoami'); //通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的 $recv = $worker->read();//同步阻塞读取管道数据 echo "recv result: $recv"; } //回收子进程 while(count($workers)){ // echo time(). "\n"; foreach($workers as $pid=>$worker){ $ret = swoole_process::wait(false); if($ret){ echo "worker exit: $pid\n"; unset($workers[$pid]); } } }
运行:
$ php swoole_process_pipe.php recv result: Linux recv result: 2018年 06月 24日 星期日 16:18:01 CST recv result: yjc worker exit: 14519 worker exit: 14522 worker exit: 14525
注意点:
1、管道数据读取是同步阻塞的;上面的例子里如果子进程里再加一句$worker->read()
,会一直阻塞。可以使用swoole_event_add
将管道加入到事件循环中,变为异步模式。
2、子进程里的输出(例如echo)与write效果相同。
3、通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的。
这里额外讲解一下swoole_process::wait()
:
1、swoole_process::wait()
默认是阻塞的, swoole_process::wait(false)
则是非阻塞的;
2、swoole_process::wait()
阻塞模式调用一次仅能回收一个子进程,非阻塞模式调用一次不一定能当前就能回收子进程;
3、如果不加swoole_process::wait()
,主进程又是死循环,主进程退出后会变成僵尸进程。
ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
可以查询僵尸进程。
防盗版声明:本文系原创文章,发布于公众号飞鸿影的博客
(fhyblog)及博客园,转载需作者同意。
消息队列与管道有些不一样:消息队列是全局的,所有进程都可以发送、读取。你可以把它看做redis list结构。
消息队列更常见的用途是主进程分配任务,子进程消费执行。
<?php $workers = []; for ($i=0; $i<3; $i++) { $process = new swoole_process(function(swoole_process $worker){ //子进程逻辑 sleep(1); //防止父进程还未往消息队列中加入内容直接退出 while($cmd = $worker->pop()){ // echo "recv from master: $cmd\n"; ob_start(); passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。 $return = ob_get_clean() ? : ' '; $return = "res: ".trim($return).". worker pid: ".$worker->pid."\n"; echo $return; // sleep(1); } $worker->exit(0); }, false, false); //不创建管道 $process->useQueue(1, 2 | swoole_process::IPC_NOWAIT); //使用消息队列 $pid = $process->start(); $workers[$pid] = $process; } //由于所有进程是共享使用一个消息队列,所以只需向一个子进程发送消息即可 $worker = current($workers); for ($i=0; $i<3; $i++) { $worker->push('whoami'); //发送消息 } //回收子进程 while(count($workers)){ foreach($workers as $pid=>$worker){ $ret = swoole_process::wait(); if($ret){ echo "worker exit: $pid\n"; unset($workers[$pid]); } } }
运行结果:
$ php swoole_process_quene.php res: yjc. worker pid: 15885 res: yjc. worker pid: 15886 res: yjc. worker pid: 15887 worker exit: 15885 worker exit: 15886 worker exit: 15887
注意点:
1、所有进程共享使用一个消息队列;
2、消息队列的读取操作是阻塞的,可以在useQueue
的时候第2个参数mode改为2 | swoole_process::IPC_NOWAIT
,则是异步的。mode仅仅设置为2
是阻塞的,示例里去掉swoole_process::IPC_NOWAIT
后读取消息的while会死循环。
3、子进程前面加了个sleep(1);
,这是为了防止父进程还未往消息队列中加入内容直接退出。
4、子进程末尾也加了sleep,这是为了防止一个进程把所有消息都消费完了,实际应用需要去掉。
swoole_process::alarm
支持微秒定时器:
<?php function ev_timer(){ static $i = 0; echo "#{$i}\talarm\n"; $i++; if ($i > 5) { //清除定时器 swoole_process::alarm(-1); //退出进程 swoole_process::kill(getmypid()); } } //安装信号 swoole_process::signal(SIGALRM, 'ev_timer'); //触发定时器信号:单位为微秒。如果为负数表示清除定时器 swoole_process::alarm(100 * 1000);//100ms echo getmypid()."\n"; //该句会顺序执行,后续无需使用while循环防止进程直接退出
运行:
$ php swoole_process_alarm.php 13660 #0 alarm #1 alarm #2 alarm #3 alarm #4 alarm #5 alarm
注:alarm不能和Swoole\Timer
同时使用。
1、Process-Swoole-Swoole文档中心
https://wiki.swoole.com/wiki/...
欢迎关注公众号及时获取最新文章推送!
推荐!每月仅需$2.5,即可拥有配置SSD的VPS!
本文实例讲述了php多进程应用场景。分享给大家供大家参考,具体如下:。因此,不能再PHP Web开发中使用多进程。每个进程处理一个小文件,则不同进程发送完邮件的耗时为8、8、6,总耗时取最大值为8s。