PHP 使用协同程序实现合作多任务

如果阅读了上面的logger()例子,那么你认为“为了双向通信我为什么要使用协程呢? 为什么我不能只用常见的类呢?”,你这么问完全正确。上面的例子演示了基本用法,然而上下文中没有真正的展示出使用协程的优点。这就是列举许多协程例子的理由。正如上面介绍里提到的,协程是非常强大的概念,不过这样的应用很稀少而且常常十分复杂。给出一些简单而真实的例子很难。

在这篇文章里,我决定去做的是使用协程实现多任务协作。我们尽力解决的问题是你想并发地运行多任务(或者“程序”)。不过处理器在一个时刻只能运行一个任务(这篇文章的目标是不考虑多核的)。因此处理器需要在不同的任务之间进行切换,而且总是让每个任务运行 “一小会儿”。

多任务协作这个术语中的“协作”说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样它就可以运行其他任务了。这与“抢占”多任务相反,抢占多任务是这样的:调度器可以中断运行了一段时间的任务,不管它喜欢还是不喜欢。协作多任务在Windows的早期版本(windows95)和Mac OS中有使用,不过它们后来都切换到使用抢先多任务了。理由相当明确:如果你依靠程序自动传回 控制的话,那么坏行为的软件将很容易为自身占用整个CPU,不与其他任务共享。 



我们的目的是 对 “任务”用更轻量级的包装的协程函数:


class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;

    public function getTaskId() {
        return $this->taskId;

    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;

    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;

    public function isFinished() {
        return !$this->coroutine->valid();
一个任务是用 任务ID标记一个协程。使用setSendValue()方法,你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个)。 run()函数确实没有做什么,除了调用send()方法的协同程序。要理解为什么添加beforeFirstYieldflag,需要考虑下面的代码片段:

function gen() {
    yield \'foo\';
    yield \'bar\';

$gen = gen();

// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:

// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!
通过添加 beforeFirstYieldcondition 我们可以确定 first yield 的值 被返回。



class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

    public function __construct() {
        $this->taskQueue = new SplQueue();

    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        return $tid;

    public function schedule(Task $task) {

    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();

            if ($task->isFinished()) {
            } else {

function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";

function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";

$scheduler = new Scheduler;


This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.







class SystemCall {
    protected $callback;

    public function __construct(callable $callback) {
        $this->callback = $callback;

    public function __invoke(Task $task, Scheduler $scheduler) {
        $callback = $this->callback; // Can\'t call it directly in PHP :/
        return $callback($task, $scheduler);
 它将像其他任何可调用那样(使用_invoke)运行,不过它要求调度器把正在调用的任务和自身传递给这个函数。为了解决这个问题 我们不得不微微的修改调度器的run方法:
public function run() {
    while (!$this->taskQueue->isEmpty()) {
        $task = $this->taskQueue->dequeue();
        $retval = $task->run();

        if ($retval instanceof SystemCall) {
            $retval($task, $this);

        if ($task->isFinished()) {
        } else {
function getTaskId() {
    return new SystemCall(function(Task $task, Scheduler $scheduler) {

function task($max) {
    $tid = (yield getTaskId()); // <-- here\'s the syscall!
    for ($i = 1; $i <= $max; ++$i) {
        echo "This is task $tid iteration $i.\n";

$scheduler = new Scheduler;



function newTask(Generator $coroutine) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($coroutine) {

function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {



public function killTask($tid) {
    if (!isset($this->taskMap[$tid])) {
        return false;


    // This is a bit ugly and could be optimized so it does not have to walk the queue,
    // but assuming that killing tasks is rather rare I won\'t bother with it now
    foreach ($this->taskQueue as $i => $task) {
        if ($task->getTaskId() === $tid) {

    return true;

function childTask() {
    $tid = (yield getTaskId());
    while (true) {
        echo "Child task $tid still alive!\n";

function task() {
    $tid = (yield getTaskId());
    $childTid = (yield newTask(childTask()));

    for ($i = 1; $i <= 6; ++$i) {
        echo "Parent task $tid iteration $i.\n";

        if ($i == 3) yield killTask($childTid);

$scheduler = new Scheduler;


Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.