workerman-todpole 执行流程(2)

时间:2022-06-04 05:20:14

上一篇文章 workerman-todpole 执行流程(1),我们已经分析完了主进程的执行流程,这篇文章主要分析一下子进程的 run() 流程。

有必要提一下,在 run() 开始之前,其实针对角色对象的构造属性 $socket_name 已经开始了连接监听;下面开始分析 run() 方法:

run()

此时的 run() 方法虽然还属于 Worker 类的子类对象,但执行已经在子进程中,看一下代码:

public function run()
{
//Update process state.
static::$_status = static::STATUS_RUNNING;
 
// Register shutdown function for checking errors.
register_shutdown_function(array("\\Workerman\\Worker", 'checkErrors'));
 
// Set autoload root path.
Autoloader::setRootPath($this->_autoloadRootPath);
 
// Create a global event loop.
if (!static::$globalEvent) {
$event_loop_class = static::getEventLoopName();
static::$globalEvent = new $event_loop_class;
$this->resumeAccept();
}
 
// Reinstall signal.
static::reinstallSignal();
 
// Init Timer.
Timer::init(static::$globalEvent);
 
// Set an empty onMessage callback.
if (empty($this->onMessage)) {
$this->onMessage = function () {};
}
 
// Try to emit onWorkerStart callback.
if ($this->onWorkerStart) {
try {
call_user_func($this->onWorkerStart, $this);
} catch (\Exception $e) {
static::log($e);
// Avoid rapid infinite loop exit.
sleep(1);
exit(250);
} catch (\Error $e) {
static::log($e);
// Avoid rapid infinite loop exit.
sleep(1);
exit(250);
}
}
 
// Main loop.
static::$globalEvent->loop();
}

首先将 Worker::$_status 标记为 STATUS_RUNNING,这段代码之前执行过一次,但那时还位于主进程空间内,所以这里我们可以理解为标记的是子进程的执行状态;紧接着注册错误处理方法来兜底,捕捉子进程出错的原因并记录日志。

getEventLoopName()

关于 Event Loop 这里不展开讲了,将来会单独发几篇文章,接着上面看一下如何创建 $globalEvent 对象,首先是 getEventLoopName() 代码:

protected static function getEventLoopName()
{
if (static::$eventLoopClass) {
return static::$eventLoopClass;
}
 
$loop_name = '';
foreach (static::$_availableEventLoops as $name=>$class) {
if (extension_loaded($name)) {
$loop_name = $name;
break;
}
}
 
if ($loop_name) {
if (interface_exists('\React\EventLoop\LoopInterface')) {
switch ($loop_name) {
case 'libevent':
static::$eventLoopClass = '\Workerman\Events\React\LibEventLoop';
break;
case 'event':
static::$eventLoopClass = '\Workerman\Events\React\ExtEventLoop';
break;
default :
static::$eventLoopClass = '\Workerman\Events\React\StreamSelectLoop';
break;
}
} else {
static::$eventLoopClass = static::$_availableEventLoops[$loop_name];
}
} else {
static::$eventLoopClass = interface_exists('\React\EventLoop\LoopInterface')? '\Workerman\Events\React\StreamSelectLoop':'\Workerman\Events\Select';
}
return static::$eventLoopClass;
}

整个方法就是为了找到一个合适的 $eventLoopClass(已经有的话直接返回),没有的话则找一个合适的,首先看一下 $_availableEventLoops 有哪些:

protected static $_availableEventLoops = array(
'libevent' => '\Workerman\Events\Libevent',
'event' => '\Workerman\Events\Event'
);

这两个类包括 Workerman\Events\Ev 和 Workerman\Events\Select 都实现自同一个接口:

workerman-todpole 执行流程(2)

除了 Select 之外,每种实现都基于已有的扩展:

libevent 属于老牌实现,libev 是在此基础上的精简和优化版本,由于 libev 不支持 windows,所以在此基础上又有了 libuv(nodejs 的 event-loop 即基于此库)。

回到 getEventLoopName(),首先判断 $_availableEventLoops 对应类的扩展是否安装,优先使用 libevent,如果都没有则使用本地实现 Workerman\Events\Select(死循环)。

除此之外还会判断是否引入了 ReactPHP(接口 \React\EventLoop\LoopInterface 是否存在),存在的话则使用 ReactPHP 相应的封装过的类。

拿到 $eventLoopClass 类之后,创建对象放入 $globalEvent 中,接下来的分析中,我们都以 Ev 实现为例。

reinstallSignal()

该方法将之前安装好的 signal handler 全部 ignore 掉,再通过刚刚创建的 $globalEvent 对象的 add()方法进行注册,和之前的主进程自己 dispatch 不同的是,子进程是通过 signal watcher 来监听信号事件的:

public function add($fd, $flag, $func, $args = null)
{
$callback = function ($event, $socket) use ($fd, $func) {
try {
call_user_func($func, $fd);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
};
switch ($flag) {
case self::EV_SIGNAL:
$event = new \EvSignal($fd, $callback);
$this->_eventSignal[$fd] = $event;
return true;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$repeat = $flag == self::EV_TIMER_ONCE ? 0 : $fd;
$param = array($func, (array)$args, $flag, $fd, self::$_timerId);
$event = new \EvTimer($fd, $repeat, array($this, 'timerCallback'), $param);
$this->_eventTimer[self::$_timerId] = $event;
return self::$_timerId++;
default :
$fd_key = (int)$fd;
$real_flag = $flag === self::EV_READ ? \Ev::READ : \Ev::WRITE;
$event = new \EvIo($fd, $real_flag, $callback);
$this->_allEvents[$fd_key][$flag] = $event;
return true;
}
 
}

这里注册到 EvSignal 的 $callback 还是当初主进程使用的那个 signalHandler(),其实完全可以创建一个新的方法给子进程用的,我们翻代码发现信号处理调用的方法比如 stopAll()/reload() 会判断当前是处于主进程还是子进程,从而执行不同的操作,这也是为什么当初 fork 工作进程时要执行下面的操作:

static::$_workers = array($worker->workerId => $worker)

这样,主进程存放的是所有角色,而子进程只会存放自己所属的角色。

Timer::init()

这一步是将之前创建的 $globalEvent 对象放到定时器中(定时器每个子进程都有)。

回调

判断当前 onMessage() 回调是否设置,没有设置则默认给一个空的匿名闭包;另外 onWorkerStart() 回调正是在这一步被执行的。

loop()

终于到了最后一步,Ev 实现的 loop() 方法:

public function loop()
{
\Ev::run();
}

执行后子进程启动完成,默默的在后台等待 event-loop 给它触发。

至此,包括前一篇文章,主进程和子进程的启动流程已经分析完毕;但有个问题,我们仅仅是分析了 Worker 中的默认实现,并没有考虑到其实 BusinessWorker/Gateway/Register 已经重写了部分方法,因此,在接下来的文章中将依次介绍各个实现的执行流程。