Yii2结合Workerman的websocket

1、安装workerman

1
composer require workerman/workerman

2、启动workerman

创建commands/WorkermanWebSocketController.php文件
创建actionIndex()函数,用来启动,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public function actionIndex()
{
if ('start' == $this->send) {
try {
$this->start($this->daemon);
} catch (\Exception $e) {
$this->stderr($e->getMessage() . "\n", Console::FG_RED);
}
} else if ('stop' == $this->send) {
$this->stop();
} else if ('restart' == $this->send) {
$this->restart();
} else if ('reload' == $this->send) {
$this->reload();
} else if ('status' == $this->send) {
$this->status();
} else if ('connections' == $this->send) {
$this->connections();
}
}

添加初始化模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public function initWorker()
{
$ip = isset($this->config['ip']) ? $this->config['ip'] : $this->ip;
$port = isset($this->config['port']) ? $this->config['port'] : $this->port;
$wsWorker = new Worker("websocket://{$ip}:{$port}");

// 4 processes
$wsWorker->count = 4;

// Emitted when new connection come
$wsWorker->onConnect = function ($connection) {
echo "New connection\n";
};

// Emitted when data received
$wsWorker->onMessage = function ($connection, $data) {
// Send hello $data
$connection->send('hello ' . $data);
};

// Emitted when connection closed
$wsWorker->onClose = function ($connection) {
echo "Connection closed\n";
};
}

添加启动模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* workman websocket start
*/
public function start()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'start';
if ($this->daemon) {
$argv[2] = '-d';
}

// Run worker
Worker::runAll();
}

添加停止模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* workman websocket stop
*/
public function stop()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'stop';
if ($this->gracefully) {
$argv[2] = '-g';
}

// Run worker
Worker::runAll();
}

添加重启模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* workman websocket restart
*/
public function restart()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'restart';
if ($this->daemon) {
$argv[2] = '-d';
}

if ($this->gracefully) {
$argv[2] = '-g';
}

// Run worker
Worker::runAll();
}

添加重载模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* workman websocket reload
*/
public function reload()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'reload';
if ($this->gracefully) {
$argv[2] = '-g';
}

// Run worker
Worker::runAll();
}

添加状态模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* workman websocket status
*/
public function status()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'status';
if ($this->daemon) {
$argv[2] = '-d';
}

// Run worker
Worker::runAll();
}

添加链接数模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* workman websocket connections
*/
public function connections()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'connections';

// Run worker
Worker::runAll();
}

3、前端调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<script>
// Create WebSocket connection.
const ws = new WebSocket('ws://{{ app.request.hostName }}:2347/'); // 这里是获取的网站的域名,测试的时候可以改为自己的本地的ip地址

// Connection opened
ws.addEventListener('open', function (event) {
ws.send('Hello Server!');
});

// Listen for messages
ws.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
});

setTimeout(function() {
ws.send('ssssss');
}, 10000);

</script>

4、config参数配置

修改console.php并添加如下代码

1
2
3
4
5
6
7
8
9
10
'controllerMap' => [
'workerman-web-socket' => [
'class' => 'app\commands\WorkermanWebSocketController',
'config' => [
'ip' => '127.0.0.1',
'port' => '2346',
'daemonize' => true,
],
],
],

5、nginx配置

为什么会用 nginx, 我们正常部署上线是不可能直接使用ip的,这个户存在安全隐患,最好是绑定一个域名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
server {
charset utf-8;
client_max_body_size 128M;

listen 2347;

server_name www.gowhich.com; # 这里改为自己的域名

access_log /xxx.workerman.access.log; # 换成自己服务器的nginx日志路径
error_log /xxx.workerman.error.log; # 换成自己服务器的nginx日志路径

location / {
proxy_pass http://127.0.0.1:2346; # 代理2346 也可以根据项目配置为自己的端口

proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}

重新nginx

1
nginx -s relad 或者 sudo nginx -s reload

然后将第3步的代码加入自己做的视图中,如果没有问题的话,websocket启动后就能正常通讯了。

6、启动workerman websocket

1
2
// 启动
./yii workerman-web-socket -s start -d

如果没有问题的话会得到类似如下的结果

1
2
3
4
5
6
7
8
9
$ ./yii workerman-web-socket -s start -d
Workerman[workerman-web-socket] start in DAEMON mode
----------------------- WORKERMAN -----------------------------
Workerman version:3.5.13 PHP version:7.1.16
------------------------ WORKERS -------------------------------
user worker listen processes status
durban none websocket://127.0.0.1:2346 4 [OK]
----------------------------------------------------------------
Input "php workerman-web-socket stop" to stop. Start success.

7、其他

commands/WorkermanWebSocketController.php 完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
<?php
/**
* WorkmanWebSocket 服务相关
*/

namespace app\commands;

use Workerman\Worker;
use yii\console\Controller;
use yii\helpers\Console;

/**
*
* WorkermanWebSocket
*
* @author durban.zhang <[email protected]>
*/

class WorkermanWebSocketController extends Controller
{
public $send;
public $daemon;
public $gracefully;

// 这里不需要设置,会读取配置文件中的配置
public $config = [];
private $ip = '127.0.0.1';
private $port = '2346';

public function options($actionID)
{
return ['send', 'daemon', 'gracefully'];
}

public function optionAliases()
{
return [
's' => 'send',
'd' => 'daemon',
'g' => 'gracefully',
];
}

public function actionIndex()
{
if ('start' == $this->send) {
try {
$this->start($this->daemon);
} catch (\Exception $e) {
$this->stderr($e->getMessage() . "\n", Console::FG_RED);
}
} else if ('stop' == $this->send) {
$this->stop();
} else if ('restart' == $this->send) {
$this->restart();
} else if ('reload' == $this->send) {
$this->reload();
} else if ('status' == $this->send) {
$this->status();
} else if ('connections' == $this->send) {
$this->connections();
}
}

public function initWorker()
{
$ip = isset($this->config['ip']) ? $this->config['ip'] : $this->ip;
$port = isset($this->config['port']) ? $this->config['port'] : $this->port;
$wsWorker = new Worker("websocket://{$ip}:{$port}");

// 4 processes
$wsWorker->count = 4;

// Emitted when new connection come
$wsWorker->onConnect = function ($connection) {
echo "New connection\n";
};

// Emitted when data received
$wsWorker->onMessage = function ($connection, $data) {
// Send hello $data
$connection->send('dddd hello ' . $data);
};

// Emitted when connection closed
$wsWorker->onClose = function ($connection) {
echo "Connection closed\n";
};
}

/**
* workman websocket start
*/
public function start()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'start';
if ($this->daemon) {
$argv[2] = '-d';
}

// Run worker
Worker::runAll();
}

/**
* workman websocket restart
*/
public function restart()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'restart';
if ($this->daemon) {
$argv[2] = '-d';
}

if ($this->gracefully) {
$argv[2] = '-g';
}

// Run worker
Worker::runAll();
}

/**
* workman websocket stop
*/
public function stop()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'stop';
if ($this->gracefully) {
$argv[2] = '-g';
}

// Run worker
Worker::runAll();
}

/**
* workman websocket reload
*/
public function reload()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'reload';
if ($this->gracefully) {
$argv[2] = '-g';
}

// Run worker
Worker::runAll();
}

/**
* workman websocket status
*/
public function status()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'status';
if ($this->daemon) {
$argv[2] = '-d';
}

// Run worker
Worker::runAll();
}

/**
* workman websocket connections
*/
public function connections()
{
$this->initWorker();
// 重置参数以匹配Worker
global $argv;
$argv[0] = $argv[1];
$argv[1] = 'connections';

// Run worker
Worker::runAll();
}
}

workerman websocket支持的其他命令

重启

1
2
3
4
5
6
7
8
9
10
11
$ ./yii workerman-web-socket -s restart -d
Workerman[workerman-web-socket] restart
Workerman[workerman-web-socket] is stopping ...
Workerman[workerman-web-socket] stop success
----------------------- WORKERMAN -----------------------------
Workerman version:3.5.13 PHP version:7.1.16
------------------------ WORKERS -------------------------------
user worker listen processes status
durban none websocket://127.0.0.1:2346 4 [OK]
----------------------------------------------------------------
Input "php workerman-web-socket stop" to stop. Start success.

重载

1
2
$ ./yii workerman-web-socket -s reload   
Workerman[workerman-web-socket] reload

状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ ./yii workerman-web-socket -s status -g
Workerman[workerman-web-socket] status
----------------------------------------------GLOBAL STATUS----------------------------------------------------
Workerman version:3.5.13 PHP version:7.1.16
start time:2018-09-10 11:22:15 run 0 days 0 hours
load average: 1.79, 2, 2 event-loop:\Workerman\Events\Swoole
1 workers 4 processes
worker_name exit_status exit_count
none 0 12
----------------------------------------------PROCESS STATUS---------------------------------------------------
pid memory listening worker_name connections send_fail timers total_request qps status
8283 4M websocket://127.0.0.1:2346 none 0 0 0 0 0 [idle]
8284 4M websocket://127.0.0.1:2346 none 0 0 0 0 0 [idle]
8285 4M websocket://127.0.0.1:2346 none 0 0 0 0 0 [idle]
8286 4M websocket://127.0.0.1:2346 none 0 0 0 0 0 [idle]
----------------------------------------------PROCESS STATUS---------------------------------------------------
Summary 16M - - 0 0 0 0 0 [Summary]

连接数

1
2
3
4
 ./yii workerman-web-socket -s connections
Workerman[workerman-web-socket] connections
--------------------------------------------------------------------- WORKERMAN CONNECTION STATUS --------------------------------------------------------------------------------
PID Worker CID Trans Protocol ipv4 ipv6 Recv-Q Send-Q Bytes-R Bytes-W Status Local Address Foreign Address

我这里暂时连接的,所以没有连接的信息

停止

1
2
3
4
$ ./yii workerman-web-socket -s stop          
Workerman[workerman-web-socket] stop
Workerman[workerman-web-socket] is stopping ...
Workerman[workerman-web-socket] stop success