官方手册

  • workerman手册 高性能PHP应用容器,提供强大的功能及扩展能力。

  • GatewayWorker手册 基于workerman容器的分布式即时通讯系统,可用于聊天室、物联网等即时通讯领域的服务。

  • webman手册 基于workerman容器的超高性能HTTP框架,可用于WEB站点、HTTP接口等互联网领域。

  • webman-admin手册 基于webman开发的admin管理后台,支持各种应用插件安装。

workerman

Workerman是什么?

  • Workerman,高性能PHP应用容器.
  • 是一个更底层更通用的服务框架,你可以用它开发tcp代理、梯子代理、做游戏服务器、邮件服务器、ftp服务器、甚至开发一个php版本的redis、php版本的数据库、php版本的nginx、php版本的php-fpm等等。Workerman可以说是PHP领域的一次创新,让开发者彻底摆脱了PHP只能做WEB的束缚。
  • 实际上Workerman类似一个PHP版本的nginx,核心也是多进程+Epoll+非阻塞IO。Workerman每个进程能维持上万并发连接。由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能。同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。
  • Workerman不同于传统MVC框架,Workerman不仅可以用于Web开发,同时还有更广阔的应用领域,例如即时通讯类、物联网、游戏、服务治理、其它服务器或者中间件,这无疑大大提高了PHP开发者的视野.

Workerman的一些应用方向如下:

  1. 即时通讯类
    例如网页即时聊天、即时消息推送、微信小程序、手机app消息推送、PC软件消息推送等等
    [示例 workerman-chat聊天室 、 web消息推送 、 小蝌蚪聊天室]

  2. 物联网类
    例如Workerman与打印机通讯、与单片机通讯、智能手环、智能家居、共享单车等等。
    [客户案例如 易联云、易泊时代等]

  3. 游戏服务器类
    例如棋牌游戏、MMORPG游戏等等。[示例 browserquest-php]

  4. HTTP服务
    例如 写高性能HTTP接口、高性能网站。如果想要做HTTP相关的服务或者站点强烈推荐 webman

  5. SOA服务化
    利用Workerman将现有业务不同功能单元封装起来,以服务的形式对外提供统一的接口,达到系统松耦合、易维护、高可用、易伸缩。[示例 workerman-json-rpc、 workerman-thrift]

  6. 其它服务器软件
    例如 GatewayWorker,PHPSocket.IO,http代理,sock5代理,分布式通讯组件,分布式变量共享组件,消息队列、DNS服务器、WebServer、CDN服务器、FTP服务器等等

  7. 组件
    例如异步redis,异步http客户端,物联网mqtt客户端,消息队列 workerman/redis-queue 、 workerman/stomp、workerman/rabbitmq ,文件监控组件,还有很多第三方开发的组件框架等等

workerman不依赖apache或者nginx
workerman本身已经是一个类似apache/nginx的容器,只要PHP环境OK workerman就可以运行。

不要使用exit die sleep语句
业务执行exit die语句会导致进程退出,并显示WORKER EXIT UNEXPECTED错误。
sleep语句会让进程睡眠,睡眠过程中不会执行任何业务,框架也会停止运行,会导致该进程的所有客户端请求都无法处理。

改代码要重启
workerman是常驻内存的框架,改代码要重启workerman才能看到新代码的效果。

支持更高并发
如果业务并发连接数超过1000同时在线,请务必优化linux内核,并安装event扩展。

简单的开发实例

  1. 使用HTTP协议对外提供Web服务

创建start.php文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个Worker监听2345端口,使用http协议通讯
$http_worker = new Worker("http://0.0.0.0:2345");

// 启动4个进程对外提供服务
$http_worker->count = 4;

// 接收到浏览器发送的数据时回复hello world给浏览器
$http_worker->onMessage = function(TcpConnection $connection, Request $request)
{
// 向浏览器发送hello world
$connection->send('hello world');
};

// 运行worker
Worker::runAll();

命令行运行

1
php start.php start

测试
在浏览器中访问url http://127.0.0.1:2345

  1. 使用WebSocket协议对外提供服务
    创建ws_test.php文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 注意:这里与上个例子不同,使用的是websocket协议
$ws_worker = new Worker("websocket://0.0.0.0:2000");

// 启动4个进程对外提供服务
$ws_worker->count = 4;

// 当收到客户端发来的数据后返回hello $data给客户端
$ws_worker->onMessage = function(TcpConnection $connection, $data)
{
// 向客户端发送hello $data
$connection->send('hello ' . $data);
};

// 运行worker
Worker::runAll();

命令行运行

1
php ws_test.php start

测试
在浏览器中访问url ws://127.0.0.1:2000

打开chrome浏览器,按F12打开调试控制台,在Console一栏输入(或者把下面代码放入到html页面用js运行)

1
2
3
4
5
6
7
8
9
10
// 假设服务端ip为127.0.0.1
ws = new WebSocket("ws://127.0.0.1:2000");
ws.onopen = function() {
alert("连接成功");
ws.send('tom');
alert("给服务端发送一个字符串:tom");
};
ws.onmessage = function(e) {
alert("收到服务端的消息:" + e.data);
};
  1. 直接使用TCP传输数据
    创建tcp_test.php文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个Worker监听2347端口,不使用任何应用层协议
$tcp_worker = new Worker("tcp://0.0.0.0:2347");

// 启动4个进程对外提供服务
$tcp_worker->count = 4;

// 当客户端发来数据时
$tcp_worker->onMessage = function(TcpConnection $connection, $data)
{
// 向客户端发送hello $data
$connection->send('hello ' . $data);
};

// 运行worker
Worker::runAll();

命令行运行

1
php tcp_test.php start

测试 :命令行运行

(以下是linux命令行效果,与windows下效果有所不同) 多开几个终端可以发信息

1
2
3
4
5
6
telnet 127.0.0.1 2347
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
tom
hello tom

安装

  1. Linux环境检查脚本
    Linux用户可以运行以下脚本检查本地环境是否满足Workerman要求
    curl -Ss https://www.workerman.net/check | php
    如果脚本中全部提示ok,则代表满足Workerman运行环境。

  2. 安装说明
    Workerman实际上就是一个PHP代码包,如果你的PHP环境已经装好,只需要把Workerman源代码或者demo下载下来即可运行。
    Composer安装:
    composer require workerman/workerman

有些composer代理镜像不全,使用以上命令composer config -g –unset repos.packagist 移除代理

  1. 启动与停止
  • 启动
    以debug(调试)方式启动
    php start.php start

以daemon(守护进程)方式启动
php start.php start -d

  • 停止
    php start.php stop

  • 重启
    php start.php restart

  • 平滑重启
    php start.php reload

  • 查看状态
    php start.php status

  • 查看连接状态(需要Workerman版本>=3.5.0)
    php start.php connections

区分主进程和子进程

有必要注意下代码是运行在主进程还是子进程,一般来说在Worker::runAll();调用前运行的代码都是在主进程运行的,onXXX回调运行的代码都属于子进程。
注意写在Worker::runAll();后面的代码永远不会被执行。

目录

orkerman // workerman内核代码
├── Connection // socket连接相关
│ ├── ConnectionInterface.php// socket连接接口
│ ├── TcpConnection.php // Tcp连接类
│ ├── AsyncTcpConnection.php // 异步Tcp连接类
│ └── UdpConnection.php // Udp连接类
├── Events // 网络事件库
│ ├── EventInterface.php // 网络事件库接口
│ ├── Event.php // Libevent网络事件库
│ ├── Ev.php // Libev网络事件库
│ ├── Swoole.php // Swoole网络事件库
│ └── Select.php // Select网络事件库
├── Lib // 常用的类库
│ ├── Constants.php // 常量定义
│ └── Timer.php // 定时器
├── Protocols // 协议相关
│ ├── ProtocolInterface.php // 协议接口类
│ ├── Http // http协议相关
│ │ ├── Chunk.php // http chunk类
│ │ ├── Request.php // http 请求类
│ │ ├── Response.php // http响应类
│ │ ├── ServerSentEvents.php // SSE类
│ │ ├── Session
│ │ │ ├── FileSessionHandler.php // session文件存储
│ │ │ └── RedisSessionHandler.php // session redis存储
│ │ ├── Session.php // session类
│ │ └── mime.types // mime映射文件
│ ├── Http.php // http协议实现
│ ├── Text.php // Text协议实现
│ ├── Frame.php // Frame协议实现
│ └── Websocket.php // websocket协议的实现
├── Worker.php // Worker
├── WebServer.php // WebServer
└── Autoloader.php // 自动加载类

通讯协议

Workerman已经支持的协议

Workerman目前已经支持HTTP、websocket、text协议(见附录说明)、frame协议(见附录说明),ws协议(见附录说明),需要基于这些协议通讯时可以直接使用,使用方法为:在初始化Worker时指定协议,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// websocket://0.0.0.0:2345 表明用websocket协议监听2345端口
$websocket_worker = new Worker('websocket://0.0.0.0:2345');

// text协议
$text_worker = new Worker('text://0.0.0.0:2346');

// frame协议
$frame_worker = new Worker('frame://0.0.0.0:2347');

// tcp Worker,直接基于socket传输,不使用任何应用层协议
$tcp_worker = new Worker('tcp://0.0.0.0:2348');

// udp Worker,不使用任何应用层协议
$udp_worker = new Worker('udp://0.0.0.0:2349');

// unix domain Worker,不使用任何应用层协议
$unix_worker = new Worker('unix:///tmp/wm.sock');

Worker类

  • Workerman中有两个重要的类Worker与Connection。
  • Worker类用于实现端口的监听,并可以设置客户端连接事件、连接上消息事件、连接断开事件的回调函数,从而实现业务处理。
  • 可以设置Worker实例的进程数(count属性),Worker主进程会fork出count个子进程同时监听相同的端口,并行的接收客户端连接,处理连接上的事件。

属性

id

当前worker进程的id编号,范围为 0 到 $worker->count-1。
这个属性对于区分worker进程非常有用,例如1个worker实例有多个进程,开发者只想在其中一个进程中设置定时器,则可以通过识别进程编号id来做到这一点

  • 进程重启后id编号值是不变的。

  • 进程编号id的分配是基于每个worker实例的。每个worker实例都从0开始给自己的进程编号,所以worker实例间进程编号会有重复,但是一个worker实例中的进程编号不会重复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

// worker实例1有4个进程,进程id编号将分别为0、1、2、3
$worker1 = new Worker('tcp://0.0.0.0:8787');
// 设置启动4个进程
$worker1->count = 4;
// 每个进程启动后打印当前进程id编号即 $worker1->id
$worker1->onWorkerStart = function($worker1)
{
echo "worker1->id={$worker1->id}\n";
};

// worker实例2有两个进程,进程id编号将分别为0、1
$worker2 = new Worker('tcp://0.0.0.0:80');
// 设置启动2个进程
$worker2->count = 2;
// 每个进程启动后打印当前进程id编号即 $worker2->id
$worker2->onWorkerStart = function($worker2)
{
echo "worker2->id={$worker2->id}\n";
};

// 运行worker
Worker::runAll();

count

设置当前Worker实例启动多少个进程,不设置时默认为1。
如何设置进程数,请参考这里 。
注意:此属性必须在Worker::runAll();运行前设置才有效

1
2
3
$worker = new Worker('websocket://0.0.0.0:8484');
// 启动8个进程,同时监听8484端口,以websocket协议提供服务
$worker->count = 8;

name

设置当前Worker实例的名称,方便运行status命令时识别进程。不设置时默认为none。

1
2
3
$worker = new Worker('websocket://0.0.0.0:8484');
// 启动8个进程,同时监听8484端口,以websocket协议提供服务
$worker->name = 'Worker实例的名称';

reusePort

  • 设置当前worker是否开启监听端口复用(socket的SO_REUSEPORT选项)。
  • 开启监听端口复用后允许多个无亲缘关系的进程监听相同的端口,并且由系统内核做负载均衡,决定将socket连接交给哪个进程处理,避免了惊群效应,可以提升多进程短连接应用的性能。

workerman多端口(多协议)监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('text://0.0.0.0:2015');
$worker->count = 4;
// 每个进程启动后在当前进程新增一个监听
$worker->onWorkerStart = function($worker)
{
$inner_worker = new Worker('http://0.0.0.0:2016');
/**
* 多个进程监听同一个端口(监听套接字不是继承自父进程)
* 需要开启端口复用,不然会报Address already in use错误
*/
$inner_worker->reusePort = true;
$inner_worker->onMessage = 'on_message';
// 执行监听
$inner_worker->listen();
};

$worker->onMessage = 'on_message';

function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}

// 运行worker
Worker::runAll();

connections

此属性中存储了当前进程的所有的客户端连接对象,其中id为connection的id编号,详情见手册TcpConnection的id属性
$connections 在广播时或者根据连接id获得连接对象非常有用。
如果得知connection的编号为$id,可以很方便的通过$worker->connections[$id]获得对应的connection对象,从而操作对应的socket连接,例如通过$worker->connections[$id]->send('...') 发送数据。
注意:如果连接关闭(触发onClose),对应的connection会从$connections数组里删除。
注意:开发者不要对这个属性做修改操作,否则可能造成不可预知的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use Workerman\Worker;
use Workerman\Timer;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('text://0.0.0.0:8787');
// 进程启动时设置一个定时器,定时向所有客户端连接发送数据
$worker->onWorkerStart = function($worker)
{
// 定时,每10秒一次
Timer::add(10, function()use($worker)
{
// 遍历当前进程所有的客户端连接,发送当前服务器的时间
foreach($worker->connections as $connection)
{
$connection->send(time());
}
});
};
// 运行worker
Worker::runAll();

stdoutFile

此属性为全局静态属性,如果以守护进程方式(-d启动)运行,则所有向终端的输出(echo var_dump等)都会被重定向到stdoutFile指定的文件中。
如果不设置,并且是以守护进程方式运行,则所有终端输出全部重定向到/dev/null (也就是默认丢弃所有输出)

注意:/dev/null是linux下一个特殊文件,它实际上是一个黑洞,所有数据写入到这个文件都会被丢弃。如果不想丢弃输出,可以将Worker::$stdoutFile设置成一个正常文件路径。
注意:此属性必须在Worker::runAll();运行前设置才有效。windows系统不支持此特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

Worker::$daemonize = true;
// 所有的打印输出全部保存在/tmp/stdout.log文件中
Worker::$stdoutFile = '/tmp/stdout.log';
$worker = new Worker('text://0.0.0.0:8484');
$worker->onWorkerStart = function($worker)
{
echo "Worker start\n";
};
// 运行worker
Worker::runAll();

logFile

用来指定workerman日志文件位置。
此文件记录了workerman自身相关的日志,包括启动、停止等。
如果没有设置,文件名默认为workerman.log,文件位置位于Workerman的上一级目录中。

这个日志文件中仅仅记录workerman自身相关启动停止等日志,不包含任何业务日志。
业务日志类可以利用file_put_contents 或者 error_log 等函数自行实现。

1
Worker::$logFile = '/tmp/workerman.log';

reloadable

执行php start.php reload时会向所有子进程发送reload信号(SIGUSR1)。
子进程收到reload信号后会自动退出然后主进程会自动拉起一个新的进程,一般用于更新业务代码。
当进程$reloadable为false时,收到reload信号后只会触发 onWorkerReload , 并不会重启当前进程。

例如Gateway/Worker模型中的gateway进程负责维持客户端连接工作,worker进程负责处理请求。
设置gateway进程的reloadable属性为false则在reload可以做到在不断开客户端连接的情况下更新业务代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('websocket://0.0.0.0:8484');
// 设置此实例收到reload信号后是否重启
$worker->reloadable = false;
$worker->onWorkerStart = function($worker)
{
echo "Worker starting...\n";
};

// 执行reload后告诉所有客户端服务端执行了reload
$worker->onWorkerReload = function(Worker $worker)
{
echo "11111 starting...\n";
};
// 运行worker
Worker::runAll();

回调属性

  • onWorkerStart
    设置Worker子进程启动时的回调函数,每个子进程启动时都会执行。
    注意:onWorkerStart是在子进程启动时运行的,如果开启了多个子进程($worker->count > 1),每个子进程运行一次,则总共会运行$worker->count次。

  • onWorkerReload
    设置Worker收到reload信号后执行的回调。
    可以利用onWorkerReload回调做很多事情,例如在不需要重启进程的情况下重新加载业务配置文件。

  • onConnect
    当客户端与Workerman建立连接时(TCP三次握手完成后)触发的回调函数。每个连接只会触发一次onConnect回调。

注意:onConnect事件仅仅代表客户端与Workerman完成了TCP三次握手,这时客户端还没有发来任何数据,此时除了通过$connection->getRemoteIp()获得对方ip,没有其他可以鉴别客户端的数据或者信息,所以在onConnect事件里无法确认对方是谁

  • onMessage
    当客户端通过连接发来数据时(Workerman收到数据时)触发的回调函数
    回调函数的参数
    $connection
    连接对象,即TcpConnection实例,用于操作客户端连接,如发送数据,关闭连接等

$data
客户端连接上发来的数据,如果Worker指定了协议,则$data是对应协议decode(解码)了的数据。数据类型与协议decode()实现有关,websocket text frame 为字符串,HTTP协议为 Workerman\Protocols\Http\Request对象。

1
2
3
4
5
6
7
8
9
10
11
12
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('websocket://0.0.0.0:8484');
$worker->onMessage = function(TcpConnection $connection, $data)
{
var_dump($data);
$connection->send('receive success');
};
// 运行worker
Worker::runAll();
  • onClose
    当客户端连接与Workerman断开时触发的回调函数。不管连接是如何断开的,只要断开就会触发onClose。每个连接只会触发一次onClose

注意:如果对端是由于断网或者断电等极端情况断开的连接,这时由于无法及时发送tcp的fin包给workerman,workerman就无法得知连接已经断开,也就无法及时触发onClose。
这种情况需要通过应用层心跳来解决,workerman中连接的心跳实现参见这里

-onBufferFull
每个连接都有一个单独的应用层发送缓冲区,如果客户端接收速度小于服务端发送速度,数据会在应用层缓冲区暂存,如果缓冲区满则会触发onBufferFull回调。

-onBufferDrain
该回调在应用层发送缓冲区数据全部发送完毕后触发。一般与onBufferFull配合使用,例如在onBufferFull时停止向对端继续send数据,在onBufferDrain恢复写入数据。

-onError
当客户端的连接上发生错误时触发。

接口

  1. runAll 运行所有Worker实例。
    Worker::runAll()执行后将永久阻塞,也就是说位于Worker::runAll()后面的代码将不会被执行。所有Worker实例化应该都在Worker::runAll()前进行。

  2. stipAll 停止当前进程并退出。

Worker::stopAll()用于停止当前进程,当前进程退出后主进程会立刻拉起一个新的进程。如果你想停止整个workerman服务,请调用posix_kill(posix_getppid(), SIGINT)

  1. listen 用于实例化Worker后执行监听。
    此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。

例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
// 4个进程
$worker->count = 2;
// 每个进程启动后在当前进程新增一个Worker监听
$worker->onWorkerStart = function($worker)
{
/**
* 4个进程启动的时候都创建2016端口的Worker
* 当执行到worker->listen()时会报Address already in use错误
* 如果worker->count=1则不会报错
*/
if($worker->id == 0) {
$inner_worker = new Worker('http://0.0.0.0:8787');
$inner_worker->onMessage = 'on_message';
// 执行监听。这里会报Address already in use错误

} elseif($worker->id == 1) {
$inner_worker = new Worker('http://0.0.0.0:80');
$inner_worker->onMessage = 'on_message';
// 执行监听。这里会报Address already in use错误
}
$inner_worker->listen();

};

$worker->onMessage = 'on_message';

function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}

// 运行worker
Worker::runAll();

TcpConnection类

  • Workerman中有两个重要的类Worker与Connection。
  • 每个客户端连接对应一个Connection对象,可以设置对象的onMessage、onClose等回调,同时提供了向客户端发送数据send接口与关闭连接close接口,以及其它一些必要的接口。
  • 可以说Worker是一个监听容器,负责接受客户端连接,并把连接包装成connection对象式提供给开发者操作。

属性

worker

此属性为只读属性,即当前connection对象所属的worker实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('websocket://0.0.0.0:8787');

// 当一个客户端发来数据时,转发给当前进程所维护的其它所有客户端
$worker->onMessage = function(TcpConnection $connection, $data)
{
foreach($connection->worker->connections as $con)
{
$con->send($data);
}
};
// 运行worker
Worker::runAll();

接口

  • send 向客户端发送数据。

返回值
true 表示数据已经成功写入到该连接的操作系统层的socket发送缓冲区
null 表示数据已经写入到该连接的应用层发送缓冲区,等待向系统层socket发送缓冲区写入
false 表示发送失败,失败原因可能是客户端连接已经关闭,或者该连接的应用层发送缓冲区已满

1
2
3
4
5
6
$worker = new Worker('websocket://0.0.0.0:8484');
$worker->onMessage = function(TcpConnection $connection, $data)
{
// 会自动调用\Workerman\Protocols\Websocket::encode打包成websocket协议数据后发送
$connection->send("hello\n");
};
  • getRemoteIp 获得该连接的客户端ip

  • getRemotePort 获得该连接的客户端端口.

  • close($data[可选参数]) 安全的关闭连接.调用close会等待发送缓冲区的数据发送完毕后才关闭连接,并触发连接的onClose回调。

  • destroy 立刻关闭连接。与close不同之处是,调用destroy后即使该连接的发送缓冲区还有数据未发送到对端,连接也会立刻被关闭,并立刻触发该连接的onClose回调。

  • pauseRecv 使当前连接停止接收数据。该连接的onMessage回调将不会被触发。此方法对于上传流量控制非常有用

  • resumeRecv 使当前连接继续接收数据。此方法与Connection::pauseRecv配合使用,对于上传流量控制非常有用

Http服务

响应-发送文件

发送文件需要使用Workerman\Protocols\Http\Response响应类。
发送文件时用以下方式:

1
2
$response = (new Response())->withFile($file);
$connection->send($response);
  • workerman支持发送超大文件
  • 对于大文件(超过2M),workerman不会将整个文件一次性读入内存,而是在合适的时机分段读取文件并发送
  • workerman会根据客户端接收速度来优化文件读取发送速度,保证最快速发送文件的同时将内存占用减少到最低
  • 数据发送是非阻塞的,不会影响其它请求处理
  • 发送文件时会自动加上Last-Modified头,以便下次请求时服务端判断是否发送304响应以节省文件传输提高性能
  • 发送的文件会自动使用合适的Content-Type头发送给浏览器
  • 如果文件不存在,会自动转为404响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Response;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('http://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $connection, Request $request)
{
$file = '/your/path/of/file';
// 检查if-modified-since头判断文件是否修改过
if (!empty($if_modified_since = $request->header('if-modified-since'))) {
$modified_time = date('D, d M Y H:i:s', filemtime($file)) . ' ' . \date_default_timezone_get();
// 文件未修改则返回304
if ($modified_time === $if_modified_since) {
$connection->send(new Response(304));
return;
}
}
// 文件修改过或者没有if-modified-since头则发送文件
$response = (new Response())->withFile($file);
$connection->send($response);
};

// 运行worker
Worker::runAll();

响应-SSE

SSE也就是Server-sent Events,是一种服务端推送技术。它的本质是客户端发送一个携带Accept: text/event-stream 头的http请求后,连接不关闭,服务端可以在这个连接上不断的给客户端推送数据。
它与websocket的区别是:

  • SSE只能服务端向客户端推;Websocket可以双向通讯。
  • SSE 默认支持断线重连;WebSocket 需要自己实现。
  • SSE 只能传输utf8文本,二进制数据需要编码成utf8后传送;WebSocket 默认支持传送utf8和二进制数据。
  • SSE 自带消息类型;WebSocket 需要自己实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\ServerSentEvents;
use Workerman\Protocols\Http\Response;
use Workerman\Timer;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('http://0.0.0.0:80');
$worker->onMessage = function(TcpConnection $connection, Request $request)
{
// 如果Accept头是text/event-stream则说明是SSE请求
if ($request->header('accept') === 'text/event-stream') {
// 首先发送一个 Content-Type: text/event-stream 头的响应
$connection->send(new Response(200, ['Content-Type' => 'text/event-stream'], "\r\n"));
// 定时向客户端推送数据
$timer_id = Timer::add(2, function () use ($connection, &$timer_id){
// 连接关闭的时候要将定时器删除,避免定时器不断累积导致内存泄漏
if ($connection->getStatus() !== TcpConnection::STATUS_ESTABLISHED) {
Timer::del($timer_id);
return;
}
// 发送message事件,事件携带的数据为hello,消息id可以不传
$connection->send(new ServerSentEvents(['event' => 'message', 'data' => 'hello', 'id'=>1]));
});
return;
}
$connection->send('ok');
};

// 运行worker
Worker::runAll();

客户端javascript代码

1
2
3
4
5
var source = new EventSource('http://127.0.0.1:8080');
source.addEventListener('message', function (event) {
var data = event.data;
console.log(data); // 输出 hello
}, false);

常用组件

GlobalData变量共享组件

1
composer require workerman/globaldata

实现进程之间共享数据,实际变量存储在GlobalData服务端。例如当给客户端类设置一个不存在的属性时,会触发__set魔术方法,客户端类在__set方法中向GlobalData服务端发送请求,存入一个变量。当访问客户端类一个不存在的变量时,会触发类的__get方法,客户端会向GlobalData服务端发起请求,读取这个值,从而完成进程间变量共享。

GlobalData 组件服务端

实例化一个\GlobalData\Server服务

监听端口
new GlobalData\Server(‘127.0.0.1’, 2207);
参数
listen_ip:监听的本机ip地址,不传默认是0.0.0.0
listen_port:监听的端口,不传默认是2207

GlobalData 组件客户端

实例化一个\GlobalData\Client客户端对象。通过在客户端对象上赋值属性来进程间共享数据。

监听端口
// 初始化一个全局的global data client
global $global;
$global = new \GlobalData\Client(‘127.0.0.1:2207’);
参数
GlobalData server 服务端地址,格式<ip地址>:<端口>,例如127.0.0.1:2207
如果是GlobalData server集群,则传入一个地址数组,例如array('10.0.0.10:2207', '10.0.0.0.11:2207')

Channel分布式通讯组件

Channel是一个分布式通讯组件,用于完成进程间通讯或者服务器间通讯。
特点

  1. 基于订阅发布模型
  2. 非阻塞式IO

原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 初始化一个Channel服务端
$channel_server = new Channel\Server('0.0.0.0', 8787);
// websocket服务端
$worker = new Worker('websocket://0.0.0.0:80');
$worker->count=2;
$worker->name = 'pusher';
$worker->onWorkerStart = function($worker)
{
// Channel客户端连接到Channel服务端
Channel\Client::connect('0.0.0.0', 8787);

// 订阅worker->id事件并注册事件处理函数
Channel\Client::on($event_name, function($event_data)use($worker){
// var_dump($event_data);
$to_connection_id = $event_data['to_connection_id'];
$message = $event_data['content'];
if(!isset($worker->connections[$to_connection_id]))
{
echo "connection not exists\n";
return;
}
$to_connection = $worker->connections[$to_connection_id];
$to_connection->send($message);
});

// 订阅广播事件
$event_name = '广播';
// 收到广播事件后向当前进程内所有客户端连接发送广播数据
Channel\Client::on($event_name, function($event_data)use($worker){
$msg = "workerID:{$worker->id} \n";
echo $msg;
$message = $event_data['content'];
foreach($worker->connections as $connection)
{
$connection->send($message);
}
});
}

通过以上代码解释:

  1. $worker->count 配置了多个子进程,在匿名函数$worker->onWorkerStart 毁进行 $worker->count 次遍历。
  2. 所有的子进程都共享一个 Channel服务端new Channel\Server('0.0.0.0', 8787);。
  3. 在子进程在时 onWorkerStart 遍历时,每一个子进程 的 Channel 客户端 Channel\Client::connect('0.0.0.0', 8787) 连接到 Channel服务端
  4. 在子进程在时 onWorkerStart 遍历时 ,每一个子进程 都会 订阅worker->id事件并注册事件处理函数。 如果想订阅事件根据进程id处理不同事务,可以if判断。订阅的事件名称 一般都是子进程id,也可不是 用其他标识可以替换进程id也可,内部信息传递也是指能处理子进程内的通讯。
  5. 订阅广播事件 向当前进程内所有客户端连接发送广播数据,其本质 对所有的子进程进行遍历,向每个子进程的客户端连接发送广播数据。发送完一个子进程的后在发送一个子进程的客户端连接。 即 每一个子进程都 订阅了一个 订阅广播事件 但这个 广播事件 不是所有子进程共享一个,而是每一个子进程 都有自己一个 订阅广播事件。

安装
composer require workerman/channel

Channel组件服务端

void \Channel\Server::__construct([string $listen_ip = '0.0.0.0', int $listen_port = 2206])
参数
listen_ip监听的本机ip地址,不传默认是0.0.0.0
listen_port监听的端口,不传默认是2206

channelClient

/Channel/Client 客户端

connect

void \Channel\Client::connect([string $listen_ip = '127.0.0.1', int $listen_port = 2206])
连接Channel/Server
参数
listen_ipChannel/Server 监听的ip地址,不传默认是127.0.0.1
listen_portChannel/Server监听的端口,不传默认是2206

1
2
3
4
5
6
7
8
9
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

$http_worker = new Worker('http://0.0.0.0:4237');
$http_worker->onWorkerStart = function()
{
Channel\Client::connect('127.0.0.1', 2206);
};
Worker::runAll();
on

void \Channel\Client::on(string $event_name, callback $callback_function)
订阅$event_name事件并注册事件发生时的回调$callback_function
回调函数的参数
$event_name订阅的事件名称,可以是任意的字符串。

$callback_function事件发生时触发的回调函数。函数原型为callback_function(mixed $event_data)$event_data是事件发布(publish)时传递的事件数据。
范例
多进程Worker(多服务器),一个客户端发消息,广播给所有客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 初始化一个Channel服务端
$channel_server = new Channel\Server('0.0.0.0', 8787);

// websocket服务端
$worker = new Worker('websocket://0.0.0.0:80');
$worker->name = 'websocket';
$worker->count = 6;
// 每个worker进程启动时
$worker->onWorkerStart = function($worker)
{
// Channel客户端连接到Channel服务端
Channel\Client::connect('127.0.0.1', 8787);
// 订阅broadcast事件,并注册事件回调
Channel\Client::on('broadcast', function($event_data)use($worker){
// 向当前worker进程的所有客户端广播消息
foreach($worker->connections as $connection)
{
$connection->send($event_data);
}
});
};

$worker->onMessage = function(TcpConnection $connection, $data)
{
// 将客户端发来的数据当做事件数据
$event_data = $data;
// 向所有worker进程发布broadcast事件
\Channel\Client::publish('broadcast', $event_data);
};

Worker::runAll();

测试
打开chrome浏览器,按F12打开调试控制台,在Console一栏输入(或者把下面代码放入到html页面用js运行)
接收消息的连接

1
2
3
4
ws = new WebSocket("ws://127.0.0.1:80");
ws.onmessage = function(e) {
alert("收到服务端的消息:" + e.data);
};

广播消息

1
ws.send('hello world');
publish

void \Channel\Client::publish(string $event_name, mixed $event_data)
$event_name事件发布$event_data数据
发布某个事件,所有这个事件的订阅者会收到这个事件并触发on($event_name, $callback)注册的回调$callback

unsubscribe

void \Channel\Client::unsubscribe(string $event_name)
取消订阅某个事件,这个事件发生时将不会再触发on($event_name, $callback)注册的回调$callback

集群推送-例子

基于Worker的多进程(分布式集群)推送系统,集群群发、集群广播。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 初始化一个Channel服务端
$channel_server = new Channel\Server('0.0.0.0', 8787);

// websocket服务端
$worker = new Worker('websocket://0.0.0.0:80');

$worker->count=2;
$worker->name = 'pusher';
$worker->onWorkerStart = function($worker)
{
// Channel客户端连接到Channel服务端
Channel\Client::connect('0.0.0.0', 8787);
// 以自己的进程id为事件名称
$event_name = $worker->id;
// 订阅worker->id事件并注册事件处理函数
Channel\Client::on($event_name, function($event_data)use($worker){
// var_dump($event_data);
$to_connection_id = $event_data['to_connection_id'];
$message = $event_data['content'];
if(!isset($worker->connections[$to_connection_id]))
{
echo "connection not exists\n";
return;
}
$to_connection = $worker->connections[$to_connection_id];
$to_connection->send($message);
});

// 订阅广播事件
$event_name = '广播';
// 收到广播事件后向当前进程内所有客户端连接发送广播数据
Channel\Client::on($event_name, function($event_data)use($worker){
$message = $event_data['content'];
foreach($worker->connections as $connection)
{
$connection->send($message);
}
});
};

$worker->onConnect = function(TcpConnection $connection)use($worker)
{
$msg = "workerID:{$worker->id} connectionID:{$connection->id} connected\n";
echo $msg;
$connection->send($msg);
};

$worker->onMessage = function(TcpConnection $connection, $data)
{
list($event_name, $to_connection_id, $content) = explode('|', $data);
if ($event_name == '广播') {
Channel\Client::publish($event_name, array(
'content' => $content
));
} else {

//$event_name 即 进程id
Channel\Client::publish($event_name, array(
'to_connection_id' => $to_connection_id,
'content' => $content
));
}
};

Worker::runAll();

测试

  1. 打开chrome浏览器,按F12打开调试控制台,在Console一栏输入(或者把下面代码放入到html页面用js运行)
    1
    2
    3
    4
    5
    6
    ws = new WebSocket("ws://127.0.0.1");
    ws.onmessage = function(e) {
    console.log("收到服务端的消息:" + e.data);
    //收到的信息 workerID|to_connection_id 即当前所属的那个一子进程的id,与客户端连接对象id
    };
    ws.send('workerID|to_connection_id|content');
  2. 打开另一个浏览器窗口,按F12打开调试控制台,在Console一栏输入(或者把下面代码放入到html页面用js运行)
    1
    2
    3
    4
    5
    6
    ws = new WebSocket("ws://127.0.0.1");
    ws.onmessage = function(e) {
    console.log("收到服务端的消息:" + e.data);
    //收到的信息 workerID|to_connection_id 即当前所属的那个一子进程的id,与客户端连接对象id
    };
    ws.send('workerID|to_connection_id|content');
    设计:把每个用户聊天登陆 记录,保存当前连接进程id与客户端id 然后与在线其他用户沟通, 发送消息内容 + 其他用户所属进程id + 客户端id 即完成内容沟通。

分组推送-例子

基于Worker的多进程分组推送系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$channel_server = new Channel\Server('0.0.0.0', 2206);

$worker = new Worker('websocket://0.0.0.0:1234');
$worker->count = 8;
// 全局群组到连接的映射数组
$group_con_map = array();
$worker->onWorkerStart = function(){
// Channel客户端连接到Channel服务端
Channel\Client::connect('127.0.0.1', 2206);

// 监听全局分组发送消息事件
Channel\Client::on('send_to_group', function($event_data){
$group_id = $event_data['group_id'];
$message = $event_data['message'];
global $group_con_map;
var_dump(array_keys($group_con_map));
if (isset($group_con_map[$group_id])) {
foreach ($group_con_map[$group_id] as $con) {
$con->send($message);
}
}
});
};
$worker->onMessage = function(TcpConnection $con, $data){
// 加入群组消息{"cmd":"add_group", "group_id":"123"}
// 或者 群发消息{"cmd":"send_to_group", "group_id":"123", "message":"这个是消息"}
$data = json_decode($data, true);
var_dump($data);
$cmd = $data['cmd'];
$group_id = $data['group_id'];
switch($cmd) {
// 连接加入群组
case "add_group":
global $group_con_map;
// 将连接加入到对应的群组数组里
$group_con_map[$group_id][$con->id] = $con;
// 记录这个连接加入了哪些群组,方便在onclose的时候清理group_con_map对应群组的数据
$con->group_id = isset($con->group_id) ? $con->group_id : array();
$con->group_id[$group_id] = $group_id;
break;
// 群发消息给群组
case "send_to_group":
// Channel\Client给所有服务器的所有进程广播分组发送消息事件
Channel\Client::publish('send_to_group', array(
'group_id'=>$group_id,
'message'=>$data['message']
));
break;
}
};
// 这里很重要,连接关闭时把连接从全局群组数据中删除,避免内存泄漏
$worker->onClose = function(TcpConnection $con){
global $group_con_map;
// 遍历连接加入的所有群组,从group_con_map删除对应的数据
if (isset($con->group_id)) {
foreach ($con->group_id as $group_id) {
unset($group_con_map[$group_id][$con->id]);
if (empty($group_con_map[$group_id])) {
unset($group_con_map[$group_id]);
}
}
}
};

Worker::runAll();
```
**测试 (假设都是本机127.0.0.1运行)**
1. 运行服务端
```bash
php start.php start
Workerman[del.php] start in DEBUG mode
----------------------- WORKERMAN -----------------------------
Workerman version:3.4.2 PHP version:7.1.3
------------------------ WORKERS -------------------------------
user worker listen processes status
liliang ChannelServer frame://0.0.0.0:2206 1 [OK]
liliang none websocket://0.0.0.0:1234 12 [OK]
----------------------------------------------------------------
Press Ctrl-C to quit. Start success.
  1. 客户端连接服务端
    打开chrome浏览器,按F12打开调试控制台,在Console一栏输入(或者把下面代码放入到html页面用js运行)
    1
    2
    3
    4
    5
    6
    7
    // 假设服务端ip为127.0.0.1,测试时请改成实际服务端ip
    ws = new WebSocket('ws://127.0.0.1:1234');
    ws.onmessage = function(data){console.log(data.data)};
    ws.onopen = function() {
    ws.send('{"cmd":"add_group", "group_id":"123"}');
    ws.send('{"cmd":"send_to_group", "group_id":"123", "message":"这个是消息"}');
    };

常见问题

心跳

注意:长连接应用必须加心跳,否则连接可能由于长时间未通讯被路由节点强行断开。
心跳作用主要有两个:

  1. 客户端定时给服务端发送点数据,防止连接由于长时间没有通讯而被某些节点的防火墙关闭导致连接断开的情况。
  2. 服务端可以通过心跳来判断客户端是否在线,如果客户端在规定时间内没有发来任何数据,就认为客户端下线。这样可以检测到客户端由于极端情况(断电、断网等)下线的事件。
    心跳间隔建议值:
    建议客户端发送心跳间隔小于60秒,比如55秒。

    心跳的数据格式没有要求,服务端能识别即可。

心跳示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 心跳间隔55秒
define('HEARTBEAT_TIME', 20);

$worker = new Worker('text://0.0.0.0:8787');

$worker->onMessage = function(TcpConnection $connection, $msg) {
// 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
$connection->lastMessageTime = time();
// 其它业务逻辑...
};

// 进程启动后设置一个每10秒运行一次的定时器
$worker->onWorkerStart = function($worker) {
Timer::add(10, function()use($worker){
$time_now = time();
foreach($worker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
echo "心跳检测\n";
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
echo "心跳检测发现客户端已经下线,关闭连接 被关闭的客户端ip:{$connection->getRemoteIp()}\n";
$connection->close();
}
}
});
};

Worker::runAll();

以上配置为如果客户端超过55秒没有发送任何数据给服务端,则服务端认为客户端已经掉线,服务端关闭连接并触发onClose。

断线重连(重要)

不管是客户端发送心跳还是服务端发送心跳,连接都有断开的可能。例如浏览器最小化js被暂停、浏览器切换到其它tab页面js被暂停、电脑进入睡眠等等、移动端切换网络、信号变弱、手机黑屏、手机应用切换到后台、路由故障、业务主动断开等。尤其是外网环境复杂,很多路由节点会清理1分钟内不活跃的连接,这也是为什么心跳间隔推荐小于1分钟的原因。

连接在外网环境很容易被断开,所以断线重连是长连接应用必须具备的功能(断线重连只能客户端做,服务端无法实现)。例如浏览器websocket需要监听onclose事件,当发生onclose时建立新的连接(为避免需崩可延建立连接)。更严格一点,服务端也应该定时发起心跳数据,并且客户端需要定时监测服务端的心跳数据是否超时,超过规定时间未收到服务端心跳数据应该认定连接已经断开,需要执行close关闭连接,并重新建立新的连接。

应该开启多少进程

进程数设置需要考虑以下条件

  1. cpu核数
  2. 内存大小
  3. 业务偏向IO密集还是CPU密集型 (workerman IO都是非阻塞 CPU密集型)

进程数设置原则

  1. 每个进程占用内存之和需要小于总内存(一般来说每个业务进程占用内存大概40M左右)
  2. 如果是IO密集型,也就是业务中涉及到一些阻塞式IO,比如一般的访问Mysql、Redis等存储都是阻塞式访问的,进程数可以开大一些,如配置成CPU核数的3倍。如果业务中涉及的阻塞等待非常多,可以再适当加大进程数,例如CPU核数的8倍甚至更高。注意非阻塞式IO属于CPU密集型,而不属于IO密集型。
  3. 如果是CPU密集型,也就是业务中没有阻塞式IO开销,例如使用异步IO读取网络资源,进程不会被业务代码阻塞的情况下,可以把进程数设置成和CPU核数一样

进程数设置参考值

如果业务代码偏向IO密集型,则根据IO密集程度设置进程数,例如CPU核数的3-8倍。
如果业务代码偏向CPU密集型,则可以将进程数设置成cpu核数。

Workerman支持多少并发

并发概念太模糊,这里以两种可以量化的指标并发连接数并发请求数来说明。

并发连接数是指服务器当前时刻一共维持了多少TCP连接,而这些连接上是否有数据通讯并不关注,例如一台消息推送服务器上可能维持了百万的设备连接,由于连接上很少有数据通讯,所以这台服务器上负载可能几乎为0,只要内存足够,还可以继续接受连接。

并发请求数一般用QPS(服务器每秒处理多少请求)来衡量,而当前时刻服务器上有多少个tcp连接并不十分关注。例如一台服务器只有10个客户端连接,每个客户端连接上每秒有1W个请求,那么要求服务端需要至少能支撑10*1W=10W每秒的吞吐量(QPS)。假设10W吞吐量每秒是这台服务器的极限,如果每个客户端每秒发送1个请求给服务端,那么这台服务器能够支撑10W个客户端。

并发连接数受限于服务器内存,一般24G内存workerman服务器可以支持大概120W并发连接。
并发请求数受限于服务器cpu处理能力,一台24核workerman服务器可以达到45W每秒的吞吐量(QPS),实际值根据业务复杂度以及代码质量有所变化。

注意
高并发场景必须安装event扩展,参考安装配置章节。另外需要优化linux内核,尤其是进程打开文件数限制,请参考附录内核调优章节。

Workerman中如何向某个特定客户端发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 初始化一个worker容器,监听1234端口
$worker = new Worker('websocket://workerman.net:1234');
// ====这里进程数必须必须必须设置为1====
$worker->count = 1;
// 新增加一个属性,用来保存uid到connection的映射(uid是用户id或者客户端唯一标识)
$worker->uidConnections = array();
// 当有客户端发来消息时执行的回调函数
$worker->onMessage = function(TcpConnection $connection, $data)
{
global $worker;
// 判断当前客户端是否已经验证,即是否设置了uid
if(!isset($connection->uid))
{
// 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
$connection->uid = $data;
/* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
* 实现针对特定uid推送数据
*/
$worker->uidConnections[$connection->uid] = $connection;
return $connection->send('login success, your uid is ' . $connection->uid);
}
// 其它逻辑,针对某个uid发送 或者 全局广播
// 假设消息格式为 uid:message 时是对 uid 发送 message
// uid 为 all 时是全局广播
list($recv_uid, $message) = explode(':', $data);
// 全局广播
if($recv_uid == 'all')
{
broadcast($message);
}
// 给特定uid发送
else
{
sendMessageByUid($recv_uid, $message);
}
};

// 当有客户端连接断开时
$worker->onClose = function(TcpConnection $connection)
{
global $worker;
if(isset($connection->uid))
{
// 连接断开时删除映射
unset($worker->uidConnections[$connection->uid]);
}
};

// 向所有验证的用户推送数据
function broadcast($message)
{
global $worker;
foreach($worker->uidConnections as $connection)
{
$connection->send($message);
}
}

// 针对uid推送数据
function sendMessageByUid($uid, $message)
{
global $worker;
if(isset($worker->uidConnections[$uid]))
{
$connection = $worker->uidConnections[$uid];
$connection->send($message);
}
}

// 运行所有的worker(其实当前只定义了一个)
Worker::runAll();

说明:
以上例子可以针对uid推送,虽然是单进程,但是支持个10W在线是没问题的。
注意这个例子只能单进程,也就是$worker->count 必须是1。
要支持多进程或者服务器集群的话需要Channel组件完成进程间通讯

如何实现异步任务

问:
如何异步处理繁重的业务,避免主业务被长时间阻塞。例如我要给1000用户发送邮件,这个过程很慢,可能要阻塞数秒,这个过程中因为主流程被阻塞,会影响后续的请求,如何将这样的繁重任务交给其它进程异步处理。

答:
可以在本机或者其它服务器甚至服务器集群预先建立一些任务进程处理繁重的业务,任务进程数可以开多一些,例如cpu的10倍,然后调用方利用AsyncTcpConnection将数据异步发送给这些任务进程异步处理,异步得到处理结果。

任务进程服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:8787');
// task进程数可以根据需要多开一些
$task_worker->count = 5;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// 假设发来的是json数据
$task_data = json_decode($task_data, true);
// 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
$task_result = [
'success' => 'ok'
];
// 发送结果
$connection->send(json_encode($task_result));
};
Worker::runAll();

在workerman中调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// websocket服务
$worker = new Worker('websocket://0.0.0.0:80');

$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:8787');
// 任务及参数数据
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// 发送数据
$task_connection->send(json_encode($task_data));
// 异步获得结果
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
{
// 结果
var_dump($task_result);
// 获得结果后记得关闭异步连接
$task_connection->close();
// 通知对应的websocket客户端任务完成
$ws_connection->send('task complete');
};
// 执行异步连接
$task_connection->connect();
};

Worker::runAll();

传输加密-ssl/tls 与 创建https服务

通讯协议之上增加一层SSL加密层,比如wss、https协议都是基于SSL加密传输的,非常安全。

所以只需要在http协议的基础上开启SSL即可支持https协议。
让workerman支持https有两种通用方案,一种是workerman直接开启SSL,另外一种是用nginx代理SSL。两种方案选其一即可,不可同时设置。
Workerman开启ssl方法如下:

准备工作:

  1. Workerman版本不小于3.3.7
  2. PHP安装了openssl扩展
  3. 已经申请了证书(pem/crt文件及key文件)放在了/etc/nginx/conf.d/ssl下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <?php
    use Workerman\Worker;
    use Workerman\Connection\TcpConnection;
    require_once __DIR__ . '/vendor/autoload.php';

    // 证书最好是申请的证书
    $context = array(
    'ssl' => array(
    'local_cert' => '/etc/nginx/conf.d/ssl/server.pem', // 也可以是crt文件
    'local_pk' => '/etc/nginx/conf.d/ssl/server.key',
    'verify_peer' => false,
    'allow_self_signed' => true, //如果是自签名证书需要开启此选项
    )
    );
    // 这里设置的是websocket协议,也可以http协议或者其它协议
    $worker = new Worker('websocket://0.0.0.0:443', $context);
    // 设置transport开启ssl
    $worker->transport = 'ssl';
    $worker->onMessage = function(TcpConnection $con, $msg) {
    $con->send('ok');
    };

    Worker::runAll();

返回 php 系列
avatar
懒觉猫先生
欢迎你们的到来!
关注我吧
最新文章
最新评论
正在加载中...
网站资讯
文章数目 :
177
已运行时间 :
本站总字数 :
120.4k
本站访客数 :
本站总访问量 :
最后更新时间 :