返回 php 系列

GatewayWorker手册 基于workerman容器的分布式即时通讯系统,可用于聊天室、物联网等即时通讯领域的服务。

GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等.

工作原理

  1. Register、Gateway、BusinessWorker进程启动
  2. Gateway、BusinessWorker进程启动后向Register服务进程发起长连接注册自己
  3. Register服务收到Gateway的注册后,把所有Gateway的通讯地址保存在内存中
  4. Register服务收到BusinessWorker的注册后,把内存中所有的Gateway的通讯地址发给BusinessWorker
  5. BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway
  6. 如果运行过程中有新的Gateway服务注册到Register(一般是分布式部署加机器),则将新的Gateway内部通讯地址列表将广播给所有BusinessWorker,BusinessWorker收到后建立连接
  7. 如果有Gateway下线,则Register服务会收到通知,会将对应的内部通讯地址删除,然后广播新的内部通讯地址列表给所有BusinessWorker,BusinessWorker不再连接下线的Gateway
  8. 至此Gateway与BusinessWorker通过Register已经建立起长连接
  9. 客户端的事件及数据全部由Gateway转发给BusinessWorker处理,BusinessWorker默认调用Events.php中的onConnect onMessage onClose处理业务逻辑。
  10. BusinessWorker的业务逻辑入口全部在Events.php中,包括onWorkerStart进程启动事件(进程事件)、onConnect连接事件(客户端事件)、onMessage消息事件(客户端事件)、onClose连接关闭事件(客户端事件)、onWorkerStop进程退出事件(进程事件)

Gateway/Worker 的进程模型

特点:
从图上我们可以看出Gateway负责接收客户端的连接以及连接上的数据,然后Worker接收Gateway发来的数据做处理,然后再经由Gateway把结果转发给其它客户端。每个客户端都有很多的路由到达另外一个客户端,例如client⑦与client①可以经由蓝色路径完成数据通讯

优点:

  1. 可以方便的实现客户端之间的通讯
  2. Gateway与Worker之间是基于socket长连接通讯,也就是说Gateway、Worker可以部署在不同的服务器上,非常容易实现分布式部署,扩容服务器
  3. Gateway进程只负责网络IO,业务实现都在Worker进程上,可以reload Worker进程,实现在不影响用户的情况下完成代码热更新。

注意

  • 业务开发只需要关注 Applications/项目/Events.php一个文件即可。
  • 客户端只能连接Gateway端口,不要连接Register端口
  • 长连接应用切记需要开启应用层心跳(GatewayWorker提供了设置,参加心跳检测),心跳间隔20-30秒最佳,为了避免长连接因为长时间不通讯被节点防火墙断开。

目录结构

├── Applications // 这里是所有开发者应用项目
│ └── YourApp // 其中一个项目目录,目录名可以自定义
│ ├── Events.php // 开发者只需要关注这个文件
│ ├── start_gateway.php // gateway进程启动脚本,包括端口号等设置
│ ├── start_businessworker.php // businessWorker进程启动脚本
│ └── start_register.php // 注册服务启动脚本

├── start.php // 全局启动脚本,此脚本会依次加载Applications/项目/start_*.php启动脚本

└── vendor // GatewayWorker框架和Workerman框架源码目录,此目录开发者不用关心

适用范围:适用于客户端与客户端需要实时通讯的项目。

Register类的使用

  • register端口千万不能开放给外网,否则可能遭受攻击
  • register服务只能开一个进程,不能开启多个进程
  • Register类只能定制监听的ip和端口,并且目前只能使用text协议

Gateway类的使用

  • Gateway类用于初始化Gateway进程。
  • Gateway进程是暴露给客户端的让其连接的进程。
  • 所有客户端的请求都是由Gateway接收然后分发给BusinessWorker处理,同样BusinessWorker也会将要发给客户端的响应通过Gateway转发出去。
  • 可以给Gateway对象的onWorkerStart、onWorkerStop、onConnect、onClose设置回调函数,但是无法给设置onMessage回调。Gateway的onMessage行为固定为将客户端数据转发给

初始化

$gateway = new Gateway('protocol://ip:port');

Gateway类可以定制的内容

  1. 协议 : websocket协议;text协议; Frame协议; tcp,直接裸tcp,不推荐,见通讯协议作用。

  2. name: 和Worker一样,可以设置Gateway进程的名称,方便status命令中查看统计

  3. count: 和Worker一样,可以设置Gateway进程的数量,一般一台服务器设置1-2个足够,设置多了对性能有一定影响

  4. lanIp: lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可。多服务器分布式部署的时候需要填写真实的内网ip,不能填写127.0.0.1。注意:lanIp只能填写真实ip,不能填写域名或者其它字符串,无论如何都不能写0.0.0.0 .

  5. startPort: Gateway进程启动后会监听一个本机端口,用来给BusinessWorker提供链接服务,然后Gateway与BusinessWorker之间就通过这个连接通讯。这里设置的是Gateway监听本机端口的起始端口;则每个Gateway进程分别启动的本地端口一般为2000、2001、2002、2003。

  6. registerAddress: 注册服务地址,格式类似于 ‘127.0.0.1:1236’。如果是部署了多个register服务则格式是数组,类似[‘192.168.0.1:1236’,’192.168.0.2:1236’]

  7. onWorkerStart: 和Worker一样,可以设置Gateway进程启动后的回调函数,一般在这个回调里面初始化一些全局数据

  8. onWorkerStop: 和Worker一样,可以设置Gateway进程关闭的回调函数,一般在这个回调里面做数据清理或者保存数据工作

BusinessWorker类使用

  1. name: 和Worker一样,可以设置BusinessWorker进程的名称,方便status命令中查看统计
  2. count: 和Worker一样,可以设置BusinessWorker进程的数量,以便充分利用多cpu资源
  3. registerAddress: 注册服务地址,格式类似于 ‘127.0.0.1:1236’。如果是部署了多个register服务则格式是数组,类似[‘192.168.0.1:1236’,’192.168.0.2:1236’]
  4. onWorkerStart: 和Worker一样,可以设置BusinessWorker启动后的回调函数,一般在这个回调里面初始化一些全局数据
  5. onWorkerStop: 和Worker一样,可以设置BusinessWorker关闭的回调函数,一般在这个回调里面做数据清理或者保存数据工作
  6. eventHandler: 设置使用哪个类来处理业务,默认值是Events,即默认使用Events.php中的Events类来处理业务。业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现。

Events类的回调属性

onWorkerStart

void Event::onWorkerStart(BusinessWorker $businessWorker);
businessWorker进程启动时触发。每个进程生命周期内都只会触发一次。
注意:$businessworker->onWorkerStartEvent::onWorkerStart不会互相覆盖,如果两个回调都设置则都会运行。
参数 $businessWorker : businessWorker进程实例
onWorkerStart范例

1
2
3
4
5
6
7
8
use \GatewayWorker\Lib\Gateway;
class Events
{
public static function onWorkerStart($businessWorker)
{
echo "WorkerStart\n";
}
}

onConnect

void Event::onConnect($client_id);
当客户端连接上gateway进程时(TCP三次握手完毕时)触发的回调函数。
参数 $client_id

  • 用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id
  • 如果client_id对应的客户端连接断开了,那么这个client_id也就失效了。当这个客户端再次连接到Gateway时,将会获得一个新的client_id。也就是说client_id和客户端的socket连接生命周期是一致的.
  • client_id一旦被使用过,将不会被再次使用,也就是说client_id是不会重复的,即使分布式部署也不会重复.
  • 只要有client_id,并且对应的客户端在线,就可以调用Gateway::sendToClient($client_id, $data)等方法向这个客户端发送数据。

注意

  • $client_id是服务端自动生成的并且无法自定义。
  • 如果开发者有自己的id系统,可以用过Gateway::bindUid($client_id, $uid)把自己系统的id与client_id绑定,绑定后就可以通过Gateway::sendToUid($uid)发送数据,通过Gateway::isUidOnline($uid)用户是否在线了。
1
2
3
4
5
6
7
8
use \GatewayWorker\Lib\Gateway;
class Events
{
public static function onConnect($client_id)
{
Gateway::sendToCurrentClient("Your client_id is $client_id");
}
}

onWebSocketConnect

void Events::onWebSocketConnect(string $client_id, array $data);
当客户端连接上gateway完成websocket握手时触发的回调函数。
注意:此回调只有gatewaywebsocket协议并且gateway没有设置onWebSocketConnect时才有效。
参数
$client_id 每个客户端连接都会被分配一个全局唯一的client_id
$data websocket握手时的http头数据,包含get、server等变量

onMessage

void Events::onMessage(string $client_id, mixed $recv_data);
当客户端发来数据(Gateway进程收到数据)后触发的回调函数
参数
$client_id 每个客户端连接都会被分配一个全局唯一的client_id
$recv_data 完整的客户端请求数据,数据类型取决于Gateway所使用协议的decode方法返的回值类型

onClose

void Events::onClose(string $client_id);
客户端与Gateway进程的连接断开时触发。不管是客户端主动断开还是服务端主动断开,都会触发这个回调。一般在这里做一些数据清理工作。

onWorkerStop

void Event::onWorkerStop(BusinessWorker $businessWorker);
当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次。
可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。

Lib\Gateway类提供的接口

sendToAll

void Gateway::sendToAll(string $send_data [, array $client_id_array = null [, array $exclude_client_id = null [, bool $raw = false]]]);
向所有客户端或者client_id_array指定的客户端发送$send_data数据。如果指定的$client_id_array中的client_id不存在则自动丢弃

sendToClient

void Gateway::sendToClient(string $client_id, string $send_data);
向客户端client_id发送$send_data数据。如果client_id对应的客户端不存在或者不在线则自动丢弃发送数据

closeClient

void Gateway::closeClient(string $client_id);
断开与client_id对应的客户端的连接

isOnline

int Gateway::isOnline(string $client_id);
判断$client_id是否还在线,在线返回1,不在线返回0
是否在线取决于对应client_id是否触发过onClose回调。

bindUid

void Gateway::bindUid(string $client_id, string $uid);
将client_id与uid绑定,以便通过Gateway::sendToUid($uid)发送数据,通过Gateway::isUidOnline($uid)用户是否在线。
uid解释:这里uid泛指用户id或者设备id,用来唯一确定一个客户端用户或者设备。
注意:

  1. uid与client_id是一对多的关系,系统允许一个uid下有多个client_id。
  2. 但是一个client_id只能绑定一个uid,如果绑定多次uid,则只有最后一次绑定有效。
  3. client_id下线(连接断开)时会自动执行解绑,开发者无需调用Gateway::unbindUid解绑。
  4. 如果某个uid对应的所有client_id都下线了,则调用Gateway::isUidOnline($uid)将返回0,即uid不在线
  5. uid和client_id映射关系存储在Gateway进程内存中。

unbindUid

void Gateway::unbindUid(string $client_id, mixed $uid);
将client_id与uid解绑。
注意:当client_id下线(连接断开)时会自动与uid解绑,开发者无需在onClose事件调用Gateway::unbindUid。

isUidOnline

int Gateway::isUidOnline(mixed $uid);
判断$uid是否在线,此方法需要配合Gateway::bindUid($client_uid, $uid)使用。
uid在线返回1,不在线返回0

getAllUidList

array Gateway::getAllUidList(void);
获取全局所有在线uid列表。
返回值
返回uid为key同时uid为值的数组。

1
2
3
4
array(
'123' => '123',
'456' => '456'
)

getClientIdByUid

array Gateway::getClientIdByUid(mixed $uid);
返回一个数组,数组元素为与uid绑定的所有在线的client_id。如果没有在线的client_id则返回一个空数组。

getUidByClientId

string Gateway::getUidByClientId(string $client_id);
返回client_id绑定的uid,如果client_id没有绑定uid,则返回null。

sendToUid

void Gateway::sendToUid(mixed $uid, string $message);
向uid绑定的所有在线client_id发送数据。(默认uid与client_id是一对多的关系)

joinGroup

void Gateway::joinGroup(string $client_id, mixed $group);
将client_id加入某个组,以便通过Gateway::sendToGroup发送数据。
可以通过Gateway::getClientSessionsByGroup($group)获得该组所有在线成员数据。
可以通过Gateway::getClientCountByGroup($group)获得该组所有在线连接数(人数)。
该方法对于分组发送数据例如房间广播非常有用。

leaveGroup

void Gateway::leaveGroup(string $client_id, mixed $group);
将client_id从某个组中删除,不再接收该分组广播(Gateway::sendToGroup)发送的数据。

ungroup

void Gateway::ungroup(mixed $group);
取消分组,或者说解散分组。
取消分组后所有属于这个分组的用户的连接将被移出分组,此分组将不再存在,除非再次调用

sendToGroup

void Gateway::sendToGroup(mixed $group, string $message [, array $exclude_client_id = null [, bool $raw = false]]);
向某个分组的所有在线client_id发送数据。

getClientIdCountByGroup

int Gateway::getClientIdCountByGroup(mixed $group);
获取某分组当前在线成连接数(多少client_id在线)。

getClientSessionsByGroup

array Gateway::getClientSessionsByGroup(mixed $group);
获取某个分组所有在线client_id信息。
返回值
返回值为client_idkeyclient_id对应的$_SESSION为值的数组。
类似下面的格式

1
2
3
4
array(
'7f00000108fc00000008' => array(...),
'7f00000108fc00000009' => array(...),
)

getAllClientIdCount

int Gateway::getAllClientIdCount(void);
获取当前在线连接总数(多少client_id在线)。

getAllClientSessions

array Gateway::getAllClientSessions(void);
获取当前所有在线client_id信息。
类似下面的格式

1
2
3
4
array(
'7f00000108fc00000008' => array(...),
'7f00000108fc00000009' => array(...),
)

心跳检测

心跳检测的原理是什么?

客户端定时每X秒(推荐小于60秒)向服务端发送特定数据(任意数据都可),服务端设定为X秒没有收到客户端心跳则认为客户端掉线,并关闭连接触发onClose回调。这样即通过心跳检测请求维持了连接(避免连接因长时间不活跃而被网关防火墙关闭),也能让服务端比较及时的知道客户端是否异常掉线。

客户端定时发送心跳(推荐)

客户端定时向服务端发送心跳。服务端类似以下配置:

1
2
3
4
$gateway = new Gateway("Websocket://0.0.0.0:8585");
$gateway->pingInterval = 55;
$gateway->pingNotResponseLimit = 1;
$gateway->pingData = '';

以上配置含义是客户端连接 pingInterval*pingNotResponseLimit=55 秒内没有任何数据传输给服务端则服务端认为对应客户端已经掉线,服务端关闭连接并触发onClose回调。

由于心跳是周期性检测,实际执行onClose的时间一般会大于pingInterval*pingNotResponseLimit=55,误差在pingInterval内。
客户端的心跳周期应该要比服务端设置的$gateway->pingInterval=55小一些,例如客户端50秒发送一段数给服务端

服务端主动发送心跳

GatewayWorer支持服务端向客户端发送心跳检测,可以像下面这样设置。

1
2
3
4
5
$gateway = new Gateway("Websocket://0.0.0.0:8585");
$gateway->pingInterval = 55;
$gateway->pingNotResponseLimit = 0;
// 服务端定时向客户端发送的数据
$gateway->pingData = '{"type":"ping"}';

以上服务端会定时55秒给客户端发心跳数据{"type":"ping"},而客户端不需要定时向服务端发送心跳数据。
其中pingNotResponseLimit = 0代表服务端允许客户端不发送心跳,服务端不会因为客户端长时间没发送数据而断开连接。
如果pingNotResponseLimit = 1,则代表客户端必须定时发送数据给服务端,否则pingNotResponseLimit*pingInterval=55秒内没有任何数据发来则关闭对应连接,并触发onClose。

开启多少进程

Gateway进程数不是开得越多越好,Gateway进程增多会导致进程间通讯开销变大。
每个Gateway进程可以轻松处理5000-10000连接的请求转发,业务同时在线连接数少于10000时可以只开2个Gateway进程。每增加10000个连接增加一个Gateway进程,依次类推。

BusinessWorker进程中根据业务是否有阻塞式IO设置进程数为CPU核数的1倍-4倍即可。 即start_businessworker.php$worker->count = cpu核数的1-4倍;


返回 php 系列
avatar
懒觉猫先生
欢迎你们的到来!
关注我吧
最新文章
最新评论
正在加载中...
网站资讯
文章数目 :
175
已运行时间 :
本站总字数 :
118.7k
本站访客数 :
本站总访问量 :
最后更新时间 :