GatewayWorker手册 基于workerman容器的分布式即时通讯系统,可用于聊天室、物联网等即时通讯领域的服务。
GatewayWorker
基于Workerma
n开发的一个项目框架,用于快速开发TCP长连接应用
,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等.
工作原理
- Register、Gateway、BusinessWorker进程启动
- Gateway、BusinessWorker进程启动后向Register服务进程发起长连接注册自己
- Register服务收到Gateway的注册后,把所有Gateway的通讯地址保存在内存中
- Register服务收到BusinessWorker的注册后,把内存中所有的Gateway的通讯地址发给BusinessWorker
- BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway
- 如果运行过程中有新的Gateway服务注册到Register(一般是分布式部署加机器),则将新的Gateway内部通讯地址列表将广播给所有BusinessWorker,BusinessWorker收到后建立连接
- 如果有Gateway下线,则Register服务会收到通知,会将对应的内部通讯地址删除,然后广播新的内部通讯地址列表给所有BusinessWorker,BusinessWorker不再连接下线的Gateway
- 至此Gateway与BusinessWorker通过Register已经建立起长连接
- 客户端的事件及数据全部由Gateway转发给BusinessWorker处理,BusinessWorker默认调用Events.php中的onConnect onMessage onClose处理业务逻辑。
- BusinessWorker的业务逻辑入口全部在Events.php中,包括onWorkerStart进程启动事件(进程事件)、onConnect连接事件(客户端事件)、onMessage消息事件(客户端事件)、onClose连接关闭事件(客户端事件)、onWorkerStop进程退出事件(进程事件)
Gateway/Worker 的进程模型
特点:
从图上我们可以看出Gateway负责接收客户端的连接以及连接上的数据,然后Worker接收Gateway发来的数据做处理,然后再经由Gateway把结果转发给其它客户端。每个客户端都有很多的路由到达另外一个客户端,例如client⑦与client①可以经由蓝色路径完成数据通讯
优点:
- 可以方便的实现客户端之间的通讯
- Gateway与Worker之间是基于socket长连接通讯,也就是说Gateway、Worker可以部署在不同的服务器上,非常容易实现分布式部署,扩容服务器
- Gateway进程只负责网络IO,业务实现都在Worker进程上,可以reload Worker进程,实现在不影响用户的情况下完成代码热更新。
注意
- 业务开发只需要关注
Applications/项目/Events.php
一个文件即可。 - 客户端只能连接
Gateway端口
,不要连接Register端口
。 - 长连接应用切记需要开启应用层心跳(GatewayWorker提供了设置,参加心跳检测),心跳间隔20-30秒最佳,为了避免长连接因为长时间不通讯被节点防火墙断开。
目录结构
├── Applications // 这里是所有开发者应用项目
│ └── YourApp // 其中一个项目目录,目录名可以自定义
│ ├── Events.php // 开发者只需要关注这个文件
│ ├── start_gateway.php // gateway进程启动脚本,包括端口号等设置
│ ├── start_businessworker.php // businessWorker进程启动脚本
│ └── start_register.php // 注册服务启动脚本
│
├── start.php // 全局启动脚本,此脚本会依次加载Applications/项目/start_*.php启动脚本
│
└── vendor // GatewayWorker框架和Workerman框架源码目录,此目录开发者不用关心
适用范围:适用于客户端与客户端需要实时通讯的项目。
Register类的使用
- register端口千万不能开放给外网,否则可能遭受攻击
- register服务只能开一个进程,不能开启多个进程
- Register类只能定制监听的ip和端口,并且目前只能使用text协议
Gateway类的使用
- Gateway类用于初始化Gateway进程。
- Gateway进程是暴露给客户端的让其连接的进程。
- 所有客户端的请求都是由Gateway接收然后分发给BusinessWorker处理,同样BusinessWorker也会将要发给客户端的响应通过Gateway转发出去。
- 可以给Gateway对象的onWorkerStart、onWorkerStop、onConnect、onClose设置回调函数,但是无法给设置onMessage回调。Gateway的onMessage行为固定为将客户端数据转发给
初始化
$gateway = new Gateway('protocol://ip:port');
Gateway类可以定制的内容
协议 : websocket协议;text协议; Frame协议; tcp,直接裸tcp,不推荐,见通讯协议作用。
name: 和Worker一样,可以设置Gateway进程的名称,方便status命令中查看统计
count: 和Worker一样,可以设置Gateway进程的数量,一般一台服务器设置1-2个足够,设置多了对性能有一定影响
lanIp: lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可。多服务器分布式部署的时候需要填写真实的内网ip,不能填写127.0.0.1。注意:lanIp只能填写真实ip,不能填写域名或者其它字符串,无论如何都不能写0.0.0.0 .
startPort: Gateway进程启动后会监听一个本机端口,用来给BusinessWorker提供链接服务,然后Gateway与BusinessWorker之间就通过这个连接通讯。这里设置的是Gateway监听本机端口的起始端口;则每个Gateway进程分别启动的本地端口一般为2000、2001、2002、2003。
registerAddress: 注册服务地址,格式类似于 ‘127.0.0.1:1236’。如果是部署了多个register服务则格式是数组,类似[‘192.168.0.1:1236’,’192.168.0.2:1236’]
onWorkerStart: 和Worker一样,可以设置Gateway进程启动后的回调函数,一般在这个回调里面初始化一些全局数据
onWorkerStop: 和Worker一样,可以设置Gateway进程关闭的回调函数,一般在这个回调里面做数据清理或者保存数据工作
BusinessWorker类使用
- name: 和Worker一样,可以设置BusinessWorker进程的名称,方便status命令中查看统计
- count: 和Worker一样,可以设置BusinessWorker进程的数量,以便充分利用多cpu资源
- registerAddress: 注册服务地址,格式类似于 ‘127.0.0.1:1236’。如果是部署了多个register服务则格式是数组,类似[‘192.168.0.1:1236’,’192.168.0.2:1236’]
- onWorkerStart: 和Worker一样,可以设置BusinessWorker启动后的回调函数,一般在这个回调里面初始化一些全局数据
- onWorkerStop: 和Worker一样,可以设置BusinessWorker关闭的回调函数,一般在这个回调里面做数据清理或者保存数据工作
- eventHandler: 设置使用哪个类来处理业务,默认值是
Events
,即默认使用Events.php中的Events类来处理业务。业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现。
Events类的回调属性
onWorkerStart
void Event::onWorkerStart(BusinessWorker $businessWorker);
当businessWorker进程
启动时触发。每个进程生命周期内都只会触发一次。
注意:$businessworker->onWorkerStart
和Event::onWorkerStart
不会互相覆盖,如果两个回调都设置则都会运行。
参数 $businessWorker
: businessWorker进程实例
onWorkerStart范例
1 | use \GatewayWorker\Lib\Gateway; |
onConnect
void Event::onConnect($client_id);
当客户端连接上gateway进程时(TCP三次握手完毕时)触发的回调函数。
参数 $client_id
- 用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id
- 如果client_id对应的客户端连接断开了,那么这个client_id也就失效了。当这个客户端再次连接到Gateway时,将会获得一个新的client_id。也就是说client_id和客户端的socket连接生命周期是一致的.
- client_id一旦被使用过,将不会被再次使用,也就是说client_id是不会重复的,即使分布式部署也不会重复.
- 只要有client_id,并且对应的客户端在线,就可以调用
Gateway::sendToClient($client_id, $data)
等方法向这个客户端发送数据。
注意
- $client_id是服务端自动生成的并且无法自定义。
- 如果开发者有自己的id系统,可以用过
Gateway::bindUid($client_id, $uid)
把自己系统的id与client_id绑定,绑定后就可以通过Gateway::sendToUid($uid)
发送数据,通过Gateway::isUidOnline($uid)
用户是否在线了。
1 | use \GatewayWorker\Lib\Gateway; |
onWebSocketConnect
void Events::onWebSocketConnect(string $client_id, array $data);
当客户端连接上gateway
完成websocket
握手时触发的回调函数。
注意:此回调只有gateway
为websocket协议
并且gateway
没有设置onWebSocketConnec
t时才有效。
参数$client_id
每个客户端连接都会被分配一个全局唯一的client_id$data
websocket握手时的http头数据,包含get、server等变量
onMessage
void Events::onMessage(string $client_id, mixed $recv_data);
当客户端发来数据(Gateway进程收到数据)后触发的回调函数
参数$client_id
每个客户端连接都会被分配一个全局唯一的client_id$recv_data
完整的客户端请求数据,数据类型取决于Gateway
所使用协议的decode
方法返的回值类型
onClose
void Events::onClose(string $client_id);
客户端与Gateway进程的连接断开时触发。不管是客户端主动断开还是服务端主动断开,都会触发这个回调。一般在这里做一些数据清理工作。
onWorkerStop
void Event::onWorkerStop(BusinessWorker $businessWorker);
当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次。
可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。
Lib\Gateway类提供的接口
sendToAll
void Gateway::sendToAll(string $send_data [, array $client_id_array = null [, array $exclude_client_id = null [, bool $raw = false]]]);
向所有客户端或者client_id_array指定的客户端发送$send_data
数据。如果指定的$client_id_array中的client_id不存在则自动丢弃
sendToClient
void Gateway::sendToClient(string $client_id, string $send_data);
向客户端client_id发送$send_data
数据。如果client_id对应的客户端不存在或者不在线则自动丢弃发送数据
closeClient
void Gateway::closeClient(string $client_id);
断开与client_id对应的客户端的连接
isOnline
int Gateway::isOnline(string $client_id);
判断$client_id是否还在线,在线返回1,不在线返回0
是否在线取决于对应client_id是否触发过onClose回调。
bindUid
void Gateway::bindUid(string $client_id, string $uid);
将client_id与uid绑定,以便通过Gateway::sendToUid($uid)
发送数据,通过Gateway::isUidOnline($uid)
用户是否在线。
uid解释:这里uid泛指用户id或者设备id,用来唯一确定一个客户端用户或者设备。
注意:
- uid与client_id是一对多的关系,系统允许一个uid下有多个client_id。
- 但是一个client_id只能绑定一个uid,如果绑定多次uid,则只有最后一次绑定有效。
- client_id下线(连接断开)时会自动执行解绑,开发者无需调用Gateway::unbindUid解绑。
- 如果某个uid对应的所有client_id都下线了,则调用Gateway::isUidOnline($uid)将返回0,即uid不在线
- uid和client_id映射关系存储在Gateway进程内存中。
unbindUid
void Gateway::unbindUid(string $client_id, mixed $uid);
将client_id与uid解绑。
注意:当client_id下线(连接断开)时会自动与uid解绑,开发者无需在onClose事件调用Gateway::unbindUid。
isUidOnline
int Gateway::isUidOnline(mixed $uid);
判断$uid是否在线,此方法需要配合Gateway::bindUid($client_uid, $uid)使用。
uid在线返回1,不在线返回0
getAllUidList
array Gateway::getAllUidList(void);
获取全局所有在线uid列表。
返回值
返回uid为key同时uid为值的数组。
1 | array( |
getClientIdByUid
array Gateway::getClientIdByUid(mixed $uid);
返回一个数组,数组元素为与uid绑定的所有在线的client_id。如果没有在线的client_id则返回一个空数组。
getUidByClientId
string Gateway::getUidByClientId(string $client_id);
返回client_id绑定的uid,如果client_id没有绑定uid,则返回null。
sendToUid
void Gateway::sendToUid(mixed $uid, string $message);
向uid绑定的所有在线client_id发送数据。(默认uid与client_id是一对多的关系)
joinGroup
void Gateway::joinGroup(string $client_id, mixed $group);
将client_id加入某个组,以便通过Gateway::sendToGroup发送数据。
可以通过Gateway::getClientSessionsByGroup($group)
获得该组所有在线成员数据。
可以通过Gateway::getClientCountByGroup($group)
获得该组所有在线连接数(人数)。
该方法对于分组发送数据例如房间广播非常有用。
leaveGroup
void Gateway::leaveGroup(string $client_id, mixed $group);
将client_id从某个组中删除,不再接收该分组广播(Gateway::sendToGroup)发送的数据。
ungroup
void Gateway::ungroup(mixed $group);
取消分组,或者说解散分组。
取消分组后所有属于这个分组的用户的连接将被移出分组,此分组将不再存在,除非再次调用
sendToGroup
void Gateway::sendToGroup(mixed $group, string $message [, array $exclude_client_id = null [, bool $raw = false]]);
向某个分组的所有在线client_id发送数据。
getClientIdCountByGroup
int Gateway::getClientIdCountByGroup(mixed $group);
获取某分组当前在线成连接数(多少client_id在线)。
getClientSessionsByGroup
array Gateway::getClientSessionsByGroup(mixed $group);
获取某个分组所有在线client_id信息。
返回值
返回值为client_id
为key
,client_id
对应的$_SESSION
为值的数组。
类似下面的格式
1 | array( |
getAllClientIdCount
int Gateway::getAllClientIdCount(void);
获取当前在线连接总数(多少client_id在线)。
getAllClientSessions
array Gateway::getAllClientSessions(void);
获取当前所有在线client_id信息。
类似下面的格式
1 | array( |
心跳检测
心跳检测的原理是什么?
客户端定时每X秒(推荐小于60秒)向服务端发送特定数据(任意数据都可),服务端设定为X秒没有收到客户端心跳则认为客户端掉线,并关闭连接触发onClose回调。这样即通过心跳检测请求维持了连接(避免连接因长时间不活跃而被网关防火墙关闭),也能让服务端比较及时的知道客户端是否异常掉线。
客户端定时发送心跳(推荐)
客户端定时向服务端发送心跳。服务端类似以下配置:
1 | $gateway = new Gateway("Websocket://0.0.0.0:8585"); |
以上配置含义是客户端连接 pingInterval*pingNotResponseLimit=55
秒内没有任何数据传输给服务端则服务端认为对应客户端已经掉线,服务端关闭连接并触发onClose
回调。
由于心跳是周期性检测,实际执行onClose的时间一般会大于pingInterval*pingNotResponseLimit=55,误差在pingInterval内。
客户端的心跳周期应该要比服务端设置的$gateway->pingInterval=55小一些,例如客户端50秒发送一段数给服务端
服务端主动发送心跳
GatewayWorer支持服务端向客户端发送心跳检测,可以像下面这样设置。
1 | $gateway = new Gateway("Websocket://0.0.0.0:8585"); |
以上服务端会定时55秒给客户端发心跳数据{"type":"ping"}
,而客户端不需要定时向服务端发送心跳数据。
其中pingNotResponseLimit = 0
代表服务端允许客户端不发送心跳,服务端不会因为客户端长时间没发送数据而断开连接。
如果pingNotResponseLimit = 1
,则代表客户端必须定时发送数据给服务端,否则pingNotResponseLimit*pingInterval=55秒内没有任何数据发来则关闭对应连接,并触发onClose。
开启多少进程
Gateway进程数不是开得越多越好,Gateway进程增多会导致进程间通讯开销变大。
每个Gateway进程可以轻松处理5000-10000连接的请求转发,业务同时在线连接数少于10000时可以只开2个Gateway进程。每增加10000个连接增加一个Gateway进程,依次类推。
BusinessWorker进程中根据业务是否有阻塞式IO设置进程数为CPU核数的1倍-4倍即可。 即start_businessworker.php
中$worker->count = cpu
核数的1-4倍;
返回 php 系列