Swoole之Process使用记录,Swoole自从发布之后,公司项目一直都只是基于http的情况使用,这次在脚本中应用了下,还是踩了些坑,先分享一个简单的
先看下一个简单的创建Process的流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| class SwooleProcessDemo { public $mpid = 0; public $works = []; public $max_process = 1; public $processes = []; public $new_index = 0; public $ctime = 0;
public function __construct() { swoole_async_set(['enable_coroutine' => false]);
try { if (!preg_match('/Darwin/', php_uname())) { swoole_set_process_name(sprintf('php-ps:%s', 'master')); } $this->mpid = posix_getpid(); $this->run();
$process = current($this->processes);
swoole_timer_tick(1000, function () use ($process) { $data = '';
$process->push(implode(',', $data)); });
$this->processWait(); } catch (\Exception $e) { var_dump($e); } }
public function run() { for ($i = 0; $i < $this->max_process; $i++) { $this->createProcess($i); } }
public function createProcess($index = null) { $process = new swoole_process(function (swoole_process $worker) use ($index) { if (is_null($index)) { $index = $this->new_index; $this->new_index++; }
if (!preg_match('/Darwin/', php_uname())) { try { swoole_set_process_name(sprintf('php-ps:%s', $index)); } catch (\Exception $e) { var_dump('ALL ERROR:' . $e->getMessage()); } }
$data = $worker->pop();
if (!$data) { $worker->exit(0); }
if ($data) { $this->handleData($data); $this->checkMPid($worker); } unset($userId); }, false, false);
$customMsgKey = 1; $mod = 2 | swoole_process::IPC_NOWAIT; $process->useQueue($customMsgKey, $mod); $pid = $process->start(); $this->works[$index] = $pid; $this->processes[$pid] = $process;
return $pid; }
public function checkMPid(&$worker) { if (!swoole_process::kill($this->mpid, 0)) { $worker->exit(); } }
public function rebootProcess($ret) { $pid = $ret['pid']; $index = array_search($pid, $this->works);
if (false !== $index) { $index = intval($index); $new_pid = $this->createProcess($index); } }
public function processWait() { swoole_timer_tick(1000, function () { if (count($this->works)) { $ret = swoole_process::wait(); if ($ret) { $this->rebootProcess($ret); } } }); }
private function handleData($data) { } }
|
首先swoole_async_set([‘enable_coroutine’ => false]);这里我关闭了协程,原因是在进行processWait操作的时候,其swoole_timer_tick是不允许在其内部创建Process的,这个可以试着启用后看下报错信息
这个例子应该是一个比较完整的例子了,实践当中都有在使用,唯一的问题是在push上,之前一个例子比如发送短信,这个要求实时性,也就push了一些用户的ID,大小的话,可以忽略,但是当我向队列中push足够打的字符串的话,就会提示内存不足,原因可以到这里查看:https://wiki.swoole.com/wiki/page/290.html
1 2 3 4 5 6 7
| swoole_timer_tick(1000, function () use ($process) { $data = '';
$process->push(implode(',', $data)); });
|
解决方案如上代码,
之前的逻辑是for循环,然后直接执行push操作,在你push的时候,队列大小增加,最后直接内存满了,进程退出了,但是当我用上面代码的时候,这就是一个无限循环的永动机了。当然如果pop那边的操作延迟比较久的话,导致内存满了,也是push不进去的。
另外注意一点是swoole_timer_tick的函数的调用,不要使用while死循环,会导致swoole_timer_tick函数不起作用的,也就是
1 2 3 4 5 6 7 8 9 10 11
| public function processWait() { swoole_timer_tick(1000, function () { if (count($this->works)) { $ret = swoole_process::wait(); if ($ret) { $this->rebootProcess($ret); } } }); }
|
这个代码其实也可以用另外一个方式实现的,如下
1 2 3 4 5 6 7 8 9 10 11 12 13
| public function processWait() { while (1) { if (count($this->works)) { $ret = swoole_process::wait(); if ($ret) { $this->rebootProcess($ret); } } else { break; } } }
|
总结如下:
1、swoole_timer_tick使用时,不要使用while等类似的死循环阻塞swoole_timer_tick的执行
2、Process在进行push的时候,要注意队列的大小