Swoole之Process使用记录-坑的解决方案

Swoole之Process使用记录,Swoole自从发布之后,公司项目一直都只是基于http的情况使用,这次在脚本中应用了下,还是踩了些坑,先分享一个简单的

先看下一个简单的创建Process的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class SwooleProcessDemo
{
public $mpid = 0;
public $works = [];
public $max_process = 1;
public $processes = [];
public $new_index = 0;
public $ctime = 0;

public function __construct()
{
swoole_async_set(['enable_coroutine' => false]); // Process中仅用协程

// 由于所有进程是共享使用一个消息队列,所以只需向一个子进程发送消息即可 - 注意队列大小限制
try {
if (!preg_match('/Darwin/', php_uname())) {
swoole_set_process_name(sprintf('php-ps:%s', 'master'));
}
$this->mpid = posix_getpid();
$this->run();

$process = current($this->processes);

swoole_timer_tick(1000, function () use ($process) {
$data = '';
// $data = $this->getData(); 这里是需要自己是实现的

// push data
$process->push(implode(',', $data));
});

$this->processWait();
} catch (\Exception $e) {
var_dump($e);
}
}

public function run()
{
for ($i = 0; $i < $this->max_process; $i++) {
$this->createProcess($i);
}
}

public function createProcess($index = null)
{
$process = new swoole_process(function (swoole_process $worker) use ($index) {
if (is_null($index)) {
$index = $this->new_index;
$this->new_index++;
}

if (!preg_match('/Darwin/', php_uname())) {
try {
swoole_set_process_name(sprintf('php-ps:%s', $index));
} catch (\Exception $e) {
var_dump('ALL ERROR:' . $e->getMessage());
}
}

$data = $worker->pop();

if (!$data) {
$worker->exit(0);
}

if ($data) {
$this->handleData($data);
$this->checkMPid($worker);
}
unset($userId);
}, false, false);

$customMsgKey = 1;
$mod = 2 | swoole_process::IPC_NOWAIT; //这里设置消息队列为非阻塞模式
$process->useQueue($customMsgKey, $mod);
$pid = $process->start();
$this->works[$index] = $pid;
$this->processes[$pid] = $process;

return $pid;
}

public function checkMPid(&$worker)
{
if (!swoole_process::kill($this->mpid, 0)) {
$worker->exit();
}
}

public function rebootProcess($ret)
{
$pid = $ret['pid'];
$index = array_search($pid, $this->works);

if (false !== $index) {
$index = intval($index);
$new_pid = $this->createProcess($index);
}
}

public function processWait()
{
swoole_timer_tick(1000, function () {
if (count($this->works)) {
$ret = swoole_process::wait();
if ($ret) {
$this->rebootProcess($ret);
}
}
});
}

private function handleData($data)
{
// 你自己的逻辑
}
}

首先swoole_async_set([‘enable_coroutine’ => false]);这里我关闭了协程,原因是在进行processWait操作的时候,其swoole_timer_tick是不允许在其内部创建Process的,这个可以试着启用后看下报错信息

这个例子应该是一个比较完整的例子了,实践当中都有在使用,唯一的问题是在push上,之前一个例子比如发送短信,这个要求实时性,也就push了一些用户的ID,大小的话,可以忽略,但是当我向队列中push足够打的字符串的话,就会提示内存不足,原因可以到这里查看:https://wiki.swoole.com/wiki/page/290.html

1
2
3
4
5
6
7
swoole_timer_tick(1000, function () use ($process) {
$data = '';
// $data = $this->getData(); 这里是需要自己是实现的

// push data
$process->push(implode(',', $data));
});

解决方案如上代码,

之前的逻辑是for循环,然后直接执行push操作,在你push的时候,队列大小增加,最后直接内存满了,进程退出了,但是当我用上面代码的时候,这就是一个无限循环的永动机了。当然如果pop那边的操作延迟比较久的话,导致内存满了,也是push不进去的。

另外注意一点是swoole_timer_tick的函数的调用,不要使用while死循环,会导致swoole_timer_tick函数不起作用的,也就是

1
2
3
4
5
6
7
8
9
10
11
public function processWait()
{
swoole_timer_tick(1000, function () {
if (count($this->works)) {
$ret = swoole_process::wait();
if ($ret) {
$this->rebootProcess($ret);
}
}
});
}

这个代码其实也可以用另外一个方式实现的,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
public function processWait()
{
while (1) {
if (count($this->works)) {
$ret = swoole_process::wait();
if ($ret) {
$this->rebootProcess($ret);
}
} else {
break;
}
}
}

总结如下:

1、swoole_timer_tick使用时,不要使用while等类似的死循环阻塞swoole_timer_tick的执行

2、Process在进行push的时候,要注意队列的大小