workerman-todpole 执行流程(1)

时间:2023-03-09 19:40:13
workerman-todpole 执行流程(1)

该系列文章主要是彻底扒一下 workerman todpole 游戏的实现原理。

提前打个预防针:

由于 Worker 类的静态属性和子类对象的混用(当前类的静态属性存放当前类对象,静态方法循环静态属性遍历调用子类对象的方法),特别再加上 Master-Worker 进程组织模型,对一些刚接触的人来说,很容易造成理解上的误差,当造成理解上的混乱时,我们需要明确:

进程和类对象没有直接关系,进程内不管是静态方法、普通对象都属于该进程空间,静态属性也不会跨进程共享。当 fork 之后,我们所阅读的代码已经被复制到了另一个进程空间内(栈/堆/常量等),不要被那些 Worker 子类误导了。

另外,Master-Worker 进程模型中的 Worker 是子进程的表述,属于进程概念,跟 Worker 类不是一回事。

入口

入口文件 start.php,包含了 composer autoload,并执行下面的入口,创建相应的对象:

  • start_web.php(WebServer)
  • start_register.php(Register)
  • start_gateway.php(Gateway)
  • start_businessworker.php(BusinessWorker)

这 4 个子类其实都继承自 Worker 类(我们暂且把它们叫做 “角色”):

workerman-todpole 执行流程(1)

看一下 Worker 的构造方法:

public function __construct($socket_name = '', $context_option = array())
{
// Save all worker instances.
$this->workerId = spl_object_hash($this);
static::$_workers[$this->workerId] = $this;
static::$_pidMap[$this->workerId] = array();
// Get autoload root path.
$backtrace = debug_backtrace();
$this->_autoloadRootPath = dirname($backtrace[0]['file']);
// Context for socket.
if ($socket_name) {
$this->_socketName = $socket_name;
if (!isset($context_option['socket']['backlog'])) {
$context_option['socket']['backlog'] = static::DEFAULT_BACKLOG;
}
$this->_context = stream_context_create($context_option);
}
}

可以看到入口中创建对象时,其实是放到了当前类的静态属性 $_workers 中,这里的 workerId 用 get_class($this) 也是一样的效果,为了方便表述,暂且把 workerId 叫做 “角色”。另外,子进程的 pid 在静态属性 $_pidMap 中是以子类名进行归类的。

创建完对象,所有的子类对象都到了 $_workers 中,接下来就是执行静态方法 runAll

runAll

该静态方法把功能都拆了出去,子方法都为 protected static 方便 override 重写:

public static function runAll()
{
static::checkSapiEnv();
static::init();
static::parseCommand();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::displayUI();
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
}

看一下每个方法的大体功能:

checkSapiEnv()

主要是限制只能在 cli 模式下运行。

init()

完成一些初始化操作:指定 .pid 文件、创建日志文件、设置主进程状态为已开始、记录开始时间、设置主进程名称、调用静态方法 initId() 按照每个角色的数量创建 $_idMap 静态属性(用来占位),最后通过 Timer::init() 安装一个 alarm 闹钟信号处理(定时器)。

parseCommand()

该方法用来解析命令行参数,start 并没有做太多事情,只是判断了一下 .pid 文件是否存在,防止重复启动,并判断是否有 -d 确定是否在后台执行。

daemonize()

该方法是让一个进程成为守护进程的核心,由于 linux 作为分时多用户系统,需要标记每个登陆的用户,所以需要 “会话” 的概念,当用户通过终端登陆后,一般会存在以下几类进程:

workerman-todpole 执行流程(1)

看一下 daemonize() 的代码:

protected static function daemonize()
{
if (!static::$daemonize || static::$_OS !== 'linux') {
return;
}
umask(0);
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception('fork fail');
} elseif ($pid > 0) {
exit(0);
}
if (-1 === posix_setsid()) {
throw new Exception("setsid fail");
}
// Fork again avoid SVR4 system regain the control of terminal.
$pid = pcntl_fork();
if (-1 === $pid) {
throw new Exception("fork fail");
} elseif (0 !== $pid) {
exit(0);
}
}

该方法的核心是 posix_setsid(),它可以创建一个会话,由于会话通常是独占终端的,导致当前会话期内的某个进程创建会话后无法 “抢到” 终端,进而变成后端进程,也就是我们所说的 “守护进程”。

另外,在 php 中可以通过 pcntl 扩展的 pcntl_fork() 方法从当前进程 fork 一个新的进程,daemonize() 方法之所以需要在创建会话前后各 fork 一次,是由于:

  • 组长进程无法创建会话,所以会在创建会话之前先 fork 一次,这样保证子进程一定不是组长,之后退出主进程,在子进程中创建会话
  • 在子进程中创建会话之后,此时的子进程已经是会话的首进程(session leader),为了避免当前的 leader 再次连接终端,因此需要再 fork 一次之后退出主进程

经过这些步骤之后,得到的子进程即 Master 进程。

initWorkers()

首先遍历所有 workers 对象,对一些名称和权限做下判断,接着如果端口非复用($reusePort 属性),会直接调用对象方法 listen() 开启监听:

if (!$worker->reusePort) {
$worker->listen();
}

注意这里的 listen() 是循环出的 Worker 对象调用的,例如当前有 2 个 WebServer、1 个 Register、4 个 Gateway、4 个 BusinessWorker 进程,如果端口是非复用,在 forkWorkers() 之前,只会创建 3 个 socket 监听(BusinessWorker 没有),这一点从端口号上就可以看出来(图片来自 WebSocket实战之————Workerman服务器的安装启动):

workerman-todpole 执行流程(1)

这些监听地址对应的是创建角色对象时的构造参数 $socket_name(后面我们遇到的内部监听地址基于 lanIp 属性)。

现在以 Gateway 为例,此时端口是非复用的,那为什么 4 个 Gateway 进程依然可以监听同一个端口?事实上这 4 个 socket 是在 forkWorkers() 之后子进程从父进程的 Gateway 对象那里继承来的。

这种情况下,当监听的端口有客户端请求过来时,就会造成 “惊群” 现象(Linux网络编程“惊群”问题总结),即这 4 个 Gateway 进程都会被唤醒,但经过内核调度之后,只有一个进程能够处理请求,其余的失败后继续休眠。

而开启监听端口复用后,可以允许多个无亲缘关系的进程监听相同的端口,并且由系统内核做负载均衡,决定将 socket 连接交给哪个进程处理,避免了惊群效应,可以提升多进程短连接应用的性能,listen() 中开启复用的代码:

if ($this->reusePort) {
stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
}

这段代码其实是在下面的 forkWorkers() 子进程中开启端口复用的情况下才会被调用;另外,开启端口复用需要 php 版本 >=7.0。

installSignal()

接下来就是为当前主进程安装信号处理,handler 的代码:

public static function signalHandler($signal)
{
switch ($signal) {
// Stop.
case SIGINT:
static::$_gracefulStop = false;
static::stopAll();
break;
// Graceful stop.
case SIGTERM:
static::$_gracefulStop = true;
static::stopAll();
break;
// Reload.
case SIGQUIT:
case SIGUSR1:
if($signal === SIGQUIT){
static::$_gracefulStop = true;
}else{
static::$_gracefulStop = false;
}
static::$_pidsToRestart = static::getAllWorkerPids();
static::reload();
break;
// Show status.
case SIGUSR2:
static::writeStatisticsToStatusFile();
break;
// Show connection status.
case SIGIO:
static::writeConnectionsStatisticsToStatusFile();
break;
}
}

可以看到 SIGINT/SIGTERM 都是停止,SIGQUIT/SIGUSR1 为重启,SIGUSR2 为输出进程状态,SIGIO输出连接状态。

saveMasterPid()

创建当前主进程的 .pid 文件。

displayUI()

输出命令行下的界面。

forkWorkers()

这里只讨论在 linux 下的情况,通过遍历 $_workers 所有的角色对象,判断 $_pidMap 中当前角色已经有的 pid 数量和角色对象 count 属性需要的工作进程数量,直到每个角色都创建出 count 需要的数量。

看一下 linux 下创建工作进程的逻辑:

protected static function forkOneWorkerForLinux($worker)
{
// Get available worker id.
$id = static::getId($worker->workerId, 0);
if ($id === false) {
return;
}
$pid = pcntl_fork();
// For master process.
if ($pid > 0) {
static::$_pidMap[$worker->workerId][$pid] = $pid;
static::$_idMap[$worker->workerId][$id] = $pid;
} // For child processes.
elseif (0 === $pid) {
if ($worker->reusePort) {
$worker->listen();
}
if (static::$_status === static::STATUS_STARTING) {
static::resetStd();
}
static::$_pidMap = array();
static::$_workers = array($worker->workerId => $worker);
Timer::delAll();
static::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
$worker->setUserAndGroup();
$worker->id = $id;
$worker->run();
$err = new Exception('event-loop exited');
static::log($err);
exit(250);
} else {
throw new Exception("forkOneWorker fail");
}
}

首先会调用 getId() 判断:

protected static function getId($worker_id, $pid)
{
return array_search($pid, static::$_idMap[$worker_id]);
}

在之前的静态方法 init() 中已经为每个角色对象开辟了相应 count 数量的位置,这里判断 $_idMap就是占座的过程;并且之后如果子进程退出,$_idMap 对应 id 的位置就会空缺,重新创建子进程就是填坑的过程。

回到 forkOneWorkerForLinux() 的逻辑,fork 之后,主进程记录下 pid 就结束了,子进程首先会判断端口是否可复用,可复用则调用 listen() 开启监听,不过需要注意的是,之前的 initWorkers() 中非复用情况下创建的是属于角色对象的 socket(3 个),而这里如果开启复用,是每个子进程中的当前角色对象自己创建出来的 socket,而不是像非复用那样从主进程的相应对象继承;也就是说有几个子进程就创建几个 socket(前提是子进程对应角色对象创建时有 $socket_name),请注意和非复用情况进行区分。

接着重定向 std 输出和错误,由于 $_pidMap 是提供给主进程监控用的,对子进程来说没什么意义,所以直接清空;紧接着调用 Timer::delAll() 结束可能的闹钟信号,并清除所有可能存在的 event loop。

设置完当前进程的一些属性之后,调用 $worker->run() 子进程就正式执行它自己的逻辑了,run() 里面的流程我们放到后面来解析,先接着原有的方法往下看。

resetStd()

重定向主进程的 STDOUT 和 STDERR,因为进程已经脱离了终端,输出可能导致一些未知的异常,所以需要重定向到 /dev/null 这台黑洞设备上;从上面的 forkWorkers() 可以看到,这个方法也会在子进程中使用。

monitorWorkers()

经过 forkWorkers() 之后,子进程已经使用定时器或者进入 event loop 处理自己的业务去了,因此这个方法只会在主进程内执行。

该方法的作用就是通过 wait 挂起,直到收到子进程退出信号之后,来决定是补上还是直接退出,看一下监控逻辑:

protected static function monitorWorkersForLinux()
{
static::$_status = static::STATUS_RUNNING;
while (1) {
// Calls signal handlers for pending signals.
pcntl_signal_dispatch();
// Suspends execution of the current process until a child has exited, or until a signal is delivered
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
// Calls signal handlers for pending signals again.
pcntl_signal_dispatch();
// If a child has already exited.
if ($pid > 0) {
// Find out witch worker process exited.
foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
if (isset($worker_pid_array[$pid])) {
$worker = static::$_workers[$worker_id];
// Exit status.
if ($status !== 0) {
static::log("worker[" . $worker->name . ":$pid] exit with status $status");
}
// For Statistics.
if (!isset(static::$_globalStatistics['worker_exit_info'][$worker_id][$status])) {
static::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
}
static::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
// Clear process data.
unset(static::$_pidMap[$worker_id][$pid]);
// Mark id is available.
$id = static::getId($worker_id, $pid);
static::$_idMap[$worker_id][$id] = 0;
break;
}
}
// Is still running state then fork a new worker process.
if (static::$_status !== static::STATUS_SHUTDOWN) {
static::forkWorkers();
// If reloading continue.
if (isset(static::$_pidsToRestart[$pid])) {
unset(static::$_pidsToRestart[$pid]);
static::reload();
}
} else {
// If shutdown state and all child processes exited then master process exit.
if (!static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
} else {
// If shutdown state and all child processes exited then master process exit.
if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
static::exitAndClearAll();
}
}
}
}

这里先提一下 pcntl_signal_dispatch(),该方法是为了去信号队列看一下有没有发送给当前进程的信号,并触发主进程的 signalHandler(),当然我们也可以使用 declare(ticks) 来决定执行 ticks 行基础代码后自动调用 handler,但毕竟不是每次自动调用时都会有信号,所以我们需要主动调用 pcntl_signal_dispatch() 来触发以减少性能损失。

回到 monitorWorkersForLinux() 的逻辑,该方法为一个死循环,核心方法为 pcntl_wait($status, WUNTRACED),通过该方法将主进程挂起,直到收到子进程退出的信号;引用类型的参数 $status 可以拿到 exit 值,比如代码里经常看到的 exit(250)

拿到退出的子进程号之后,将 $_pidMap 中相应的记录删除,并通过 pid 和 getId() 找到 id 之后把 $_idMap 中的占位腾出来(置为 0)。

紧接着判断当前进程状态,由于 shutdown 会标记主进程状态为 STATUS_SHUTDOWN,所以如果是非结束状态,会继续 fork 工作进程来填补 $_idMap 之前腾出来的空缺,并且如果 $_pidsToRestart 中有此 pid,即代表该工作进程需要重启。

如果此时主进程状态被标记为 STATUS_SHUTDOWN 并且 getAllWorkerPids() 存在工作进程,则调用 exitAndClearAll() 来结束掉所有进程。

至此,主进程逻辑分析完毕。

listen()

该方法既有可能在主进程,也可能在子进程中调用,但不管调用时机如何,一定是在 run() 方法开始前调用的,所以还是放到本文来分析,看一下代码:

public function listen()
{
if (!$this->_socketName) {
return;
}
// Autoload.
Autoloader::setRootPath($this->_autoloadRootPath);
if (!$this->_mainSocket) {
// Get the application layer communication protocol and listening address.
list($scheme, $address) = explode(':', $this->_socketName, 2);
// Check application layer protocol class.
if (!isset(static::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
if (!isset(static::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
}
} else {
$this->transport = $scheme;
}
$local_socket = static::$_builtinTransports[$this->transport] . ":" . $address;
// Flag.
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = 0;
$errmsg = '';
// SO_REUSEPORT.
if ($this->reusePort) {
stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
}
// Create an Internet or Unix domain server socket.
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if (!$this->_mainSocket) {
throw new Exception($errmsg);
}
if ($this->transport === 'ssl') {
stream_socket_enable_crypto($this->_mainSocket, false);
} elseif ($this->transport === 'unix') {
$socketFile = substr($address, 2);
if ($this->user) {
chown($socketFile, $this->user);
}
if ($this->group) {
chgrp($socketFile, $this->group);
}
}
// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') {
$socket = socket_import_stream($this->_mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
}
// Non blocking.
stream_set_blocking($this->_mainSocket, 0);
}
$this->resumeAccept();
}

可以看到主要工作是根据创建对象时的 $socket_name 参数值,创建对应的 socket-server 放到当前对象的 $this->_mainSocket 属性中,再调用 resumeAccept() 方法:

public function resumeAccept()
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
if ($this->transport !== 'udp') {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
} else {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
array($this, 'acceptUdpConnection'));
}
$this->_pauseAccept = false;
}
}

可以看到该方法是向 event-loop 注册当前 $this->_mainSocket 的可读事件,即当有客户端连接上时,根据连接类型调用 acceptConnection() 或 acceptUdpConnection(),由于本系列文章分析的是 todpole游戏,所以这里暂且只分析 tcp 长链接:

public function acceptConnection($socket)
{
// Accept a connection on server socket.
$new_socket = @stream_socket_accept($socket, 0, $remote_address);
// Thundering herd.
if (!$new_socket) {
return;
}
// TcpConnection.
$connection = new TcpConnection($new_socket, $remote_address);
$this->connections[$connection->id] = $connection;
$connection->worker = $this;
$connection->protocol = $this->protocol;
$connection->transport = $this->transport;
$connection->onMessage = $this->onMessage;
$connection->onClose = $this->onClose;
$connection->onError = $this->onError;
$connection->onBufferDrain = $this->onBufferDrain;
$connection->onBufferFull = $this->onBufferFull;
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
call_user_func($this->onConnect, $connection);
} catch (\Exception $e) {
static::log($e);
exit(250);
} catch (\Error $e) {
static::log($e);
exit(250);
}
}
}

该方法主要工作是通过 stream_socket_accept() 包装一下之前的 socket-server,并针对每个 socket-accept 创建一个 Connection 对象放到 $this->connections 属性中,并且这里遇到了第二个重要的回调 onConnect(),该回调每个连接事件只会触发一次。

ConnectionInterface

到了这里,我们已经可以监听客户端的连接,但是当客户端上传数据时,应该如何处理呢?这时我们需要沿着上面创建的 Connection 对象继续分析下去。

上面方法中的 TcpConnection 继承自 ConnectionInterface(看名字以为是个接口,其实是个抽象类),看一下 TcpConnection 的构造方法:

public function __construct($socket, $remote_address = '')
{
self::$statistics['connection_count']++;
$this->id = $this->_id = self::$_idRecorder++;
if(self::$_idRecorder === PHP_INT_MAX){
self::$_idRecorder = 0;
}
$this->_socket = $socket;
stream_set_blocking($this->_socket, 0);
// Compatible with hhvm
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($this->_socket, 0);
}
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
$this->_remoteAddress = $remote_address;
static::$connections[$this->id] = $this;
}

该方法中我们找到了 socket-accept 的读取回调 baseRead(),现在我们知道数据的读取是在创建 Connection 对象时通过注册 event-loop 可读事件来触发的。

baseRead() 逻辑很长,这里就不贴出来了,不过在代码中找到了第三个重要的回调 onMessage(),回顾一下 listen() 的主要工作,简单的概括为:

  1. 向 event-loop 注册 socket-server 的可读事件,回调 acceptConnection() 通过 stream_socket_accept() 得到 socket-accept 并创建 Connection 对象;
  2. 创建 Connection 对象的构造方法中向 event-loop 注册 socket-accept 的可读事件,回调 baseRead()中解析数据,并触发回调 onMessage()

需要注意的是,event-loop 在子进程调用 run() 之前还没有开始循环,所以上面分析的回调逻辑只是注册,还不会触发。

本文结束,下一篇文章开始分析子进程的工作流程。