构建C协程之setjmp/long_jmp篇

时间:2022-02-14 17:58:52

原理简介

在标准C中的头文件<setjmp.h>中定义了一组函数 setjmp / long_jmp 用来实现“非本地跳转”的功能,利用 setjmp 可以保存当前执行线索状态,稍后通过 long_jmp 函数可以实现状态的恢复,并且可以跨多层函数调用栈进行跳转。具体接口定义如下:

  • int setjmp(jmp_buf env) 该函数主要用来保存当前执行状态,作为后续跳转的目标。调用时,当前状态会被存放在env指向的结构中,env将被 long_jmp 操作作为参数,以返回调用点 — 跳转的结果看起来就好像刚从setjmp返回一样。 直接调用setjmp保存状态后,返回值是0;而从long_jmp操作返回时,返回值是非0的 — 通过判断setjmp的返回值,就可以判断当前执行状态。

  • void long_jmp(jmp_buf env, int value) 该函数用来恢复env中保存的执行状态,另一参数value用来传递返回值给跳转目标 — 如果value值为0,则跳转后返回setjmp处的值为1;否则,返回setjmp处的值为value

setjmp / long_jmp 这一机制的设计初衷是为了方便程序从较深的调用栈中直接返回到之前调用点 — 这非常有利于实现高效的错误处理机制,比如 C++ 中的异常机制就是如此。假设我们有这样一条调用路径:

fun0() -> fun1() -> fun2() -> ...
-> funN()

假设在 funN 函数中发生了一个错误,需要返回 fun0 函数对错误进行处理,按照惯常的方法需要层层返回错误,效率较低。利用 setjmp / long_jmp 机制,就可以在 fun0 函数调用 fun1 前用 setjmp 保存一个状态,然后一旦调用路径中的某个环节出现错误,就使用 long_jmp 跳回 fun0 函数,通过 setjmp 的返回值就可以判断错误类型并做后续处理,非常简便,与 C++ / Java 等语言中的 try
{...} catch (...) {...}
 结构很类似。

这里需要注意的是,long_jmp 返回后的执行依赖于之前 setjmp 执行时的栈环境,在上面的例子中,由于 funN 执行跳转时,fun0 的执行栈没有释放,因此返回后继续执行没有任何问题。 但假如在 fun0 返回后(更精确的说是执行 setjmp 的作用域退出后),再通过 long_jmp 跳转回 fun0, 由于原先栈帧(stack frame)已被释放,其对应内存空间可能别做他用,因而这时程序的执行就进入了不可知状态,很可能因起错误,这点需要特别注意!

setjmp 会将状态信息保存到一个平台相关的结构 jmp_buf 中,这个结构对于程序员来说一般是透明的,也就是说我们并不知道 jmp_buf 的具体字段及其含义,也就不能做诸如栈空间切换的操作 — 这对于实现”协程”系统来说,就比较麻烦了。因为”协程”间并发执行的性质要求系统对不同”协程”的栈空间进行隔离。

尽管如此,我还是在网上找到了很多利用 setjmp / long_jmp 实现的”协程”系统,现挑其中几个比较有代表性的例子介绍一下。

案例一: setjmp-longjmp-ucontext-snippets

这是一个小型的 “N:1” 的协程系统,代码托管在github。利用 setjmp / long_jmp 实现协程,同时还提供了一个简单的 Channel 实现,以供协程间通信。

实现分析

我们把精力主要放在协程的实现方式上,看看它如何解决“栈切换”的问题。

该库提供了以下几个协程操作的API:

  • void coro_allocate (int num_cores) 在程序开始时调用,静态预分配 num_cores 个协程空间,程序中最大运行的协程数不能超过 num_cores 个。

  • int coro_spawn(coro_callback f, void
    *user_state)
     启动一个协程,入口函数由第一个参数 f 指定, user_state 是 f 的参数。

  • int coro_runnable(int pid) 将编号为 pid 的协程设为可执行态。

  • void coro_yield(int pid) 让出处理器,并切换到以 pid 为编号的其他协程继续执行。

下面来具体看看其中的奥秘,我们将重点集中在 coro_allocate 这个函数,它为每个协程分配一个 jmp_buf 结构和一个指示该协程状态的int型数据。其中0号协程对应于“调度器”或者说“运行时环境”。 最后调用 grow_stack(0,
num_cores)
 完成后续工作,应该说所有的玄机都藏在grow_stack这个函数中,我们看一下它的代码:

// have never exit so we get a pristine stack for our coroutines
static void grow_stack(int n, int num_coros) {
if (n == num_coros + 1) {
longjmp(bufs[0],1);
assert(0);
return;
}

if (!setjmp(bufs[n])) {
char *big_array;
big_array = alloca(STACK_SIZE);
asm volatile("" :: "m" (big_array));

grow_stack(n + 1, num_coros);
} else {
if (n == 0) {
return; //came from coro_allocate; return back there
}
while(1) {
assert(spawned_fun);
coro_callback f = spawned_fun;
spawned_fun = NULL;

assert(n == coro_pid);
f(spawned_user_state);
used_pids[n] = 0;
coro_yield(0);
}
}
}

这是一个递归函数,深度是 num_cores + 1。调用顺序从 0 到 num_cores, 每一层调用都利用 setjmp 将状态保存到相应编号的 jmp_buf 结构中,然后通过alloca在当前栈帧上分配一个较大的空间,作为该编号协程的运行栈。当参数n超过num_coros时,程序逐层返回到调用者。

之后调用 coro_spawn 启动一个协程时,先将参数 f 和 user_state 分别保存到全局变量 spawned_fun 和 spawned_user_state 中,然后找到一个空闲的协程编号,恢复该编号在 grow_stack 时保存的状态 —— 也就是说回到了 grow_stack 的 else 分支,调用全局的函数指针 spawned_fun 指向的入口函数 (由于系统是运行在单线程环境的,因此使用全局变量不会出现问题,但笔者个人认为为每个协程设计一个结构体来保存这些信息更好一些)。

刚才介绍时我们特别强调了跳转到已经释放的“栈帧”可能引发错误,但这里却偏偏这样做,道理是什么呢? 问题的关键就在于grow_stack时用alloca预留栈空间的操作,这个操作本质上将原来主程序的栈空间划分成N份,然后假设每个协程运行时使用的栈都不会超过为它们预留的那段空间。而编号为0 的协程恰好对应于运行时环境,因此运行时环境的栈就位于最低端,如果后面的操作使用的栈空间不“越界”,那就不会影响1号协程的执行;其他协程之间也是同理。

小结

案例一利用了程序栈帧顺序增长的特点,实现非常巧妙。但缺点在于不但需要提前指定系统支持的最大协程数,而且所有协程的栈都必须在原始程序栈空间的基础上分配,栈的大小及支持的最大协程数量(也就是可划分的最大栈数量)都因此受到了限制。

案例二:libconcurrency

这个项目托管在google code,也是采用 setjmp / long_jmp 实现的轻量协程系统,但与案例一的不同之处在于,libconcurrency 使用了“栈拷贝”技术 — 每个协程的运行栈是通过malloc在堆空间动态分配的,然后再将原始的栈帧数据复制到新的栈上。正因如此,其系统的可扩展性比较好,协程可以动态创建,且理论上没有上限。

实现分析

还是照例先来分析一下协程操作的API,我们最关心以下几个:

  • coro coro_init() 用来在系统启动时对协程环境进行初始化,这里隐藏着系统最关键的部分,稍后详细分析。

  • coro coro_new(_entry fn) 新分配一个协程并指定其入口函数 fn,这个函数非常重要,我们将首先分析。

  • cvalue coro_call(coro target, cvalue
    value)
     启动协程 target,并传入指定参数 value,同时返回协程执行后的结果。

先分析 coro_new 的实现。 该函数本身不难理解:先用malloc分配 struct _coro 对象 c 及其栈空间,然后初始化该对象,指定入口函数、栈指针和栈大小,最后调用 _coro_enter (c) 完成后续操作 —— _coro_enter函数才是我们重点分析的对象,其代码如下:

/*
* This function invokes the start function of the coroutine when the
* coroutine is first called. If it was called from coro_new, then it sets
* up the stack and initializes the saved context.
*/
void _coro_enter(coro c)
{
if (_save_and_resumed(c->ctxt))
{ /* start the coroutine; stack is empty at this point. */
cvalue _return;
_return.p = _cur;
_cur->start(_value);
/* return the exited coroutine to the exit handler */
coro_call(&_on_exit, _return);
}
/* this code executes when _coro_enter is called from coro_new */
INIT_CTXT:
{
/* local and new stack pointers at identical relative positions on the stack */
intptr_t local_sp = (intptr_t)&local_sp;
/* I don't know what the addition "- sizeof(void *)" is for when
the stack grows downards */
intptr_t new_sp = c->stack_base +
(_stack_grows_up
? _frame_offset
: c->stack_size - _frame_offset - sizeof(void *));

/* copy local stack frame to the new stack */
_coro_cpframe(local_sp, new_sp);

/* reset any locals in the saved state to point to the new stack */
_coro_rebase(c, local_sp, new_sp);
}
}

说明一下,代码中的_save_and_resume是一个宏,直接对应于 setjmp。

不难理解,其中的 if 分支是后续经由 long_jmp 恢复后的执行路径,该分支就是调用之前指定的入口函数,调用结束后马上切换回主协程处理返回值。

我们重点来看 INIT_CTXT 标号后的语句块,这部分用来做协程执行状态的初始化,比较难理解,该过程大体分3步:

  1. 利用栈变量 local_sp 的地址值来确定当前栈帧的位置,然后通过之前 malloc 的指针及大小确定 new_sp 的值,计算时需要注意不同的体系结构下,栈的生长方向可能不同,需要区别对待。

  2. 调用 _coro_cpframe 函数将当前栈帧内容复制到新的栈空间上,它本质上就是一个 memcpy,不过和前面一样,需要注意栈的方向。另外,当前栈使用大小,也就是需要复制的栈长度由全局变量 _frame_offset 指定,该变量在 coro_init 时确定,后面再介绍。

  3. 最后,根据新旧SP值,调用 _coro_rebase 函数对之前保存在 jmp_buf 中的状态信息进行修正,使得下次跳回时能落到新的栈帧上执行。 深入到 _coro_rebase 实现中,会发现这个函数首先计算新旧SP的差值,然后将这个差值加回到 jmp_buf (被看作是一个intptr_t类型的数组)的部分元素上,视为修正——具体哪个位置的值需要被修正保存在全局的数组 _offset[] 中,它的值同样是在 coro_init 函数执行阶段被确定的。

先来解释一下这种“线性修正”之所以可行的原因:

一般情况下,我们可以认为 jmp_buf 是一个数组,数组元素的位宽与具体硬件平台相关,比如IA32下是32bit (int),X64下是64bit (long long)。 其中保存的主要信息就是运行 setjmp 点的 PC、SP(栈指针)以及BP(栈底位置)等。 这些信息大致上可以分成两部分:一类与当前的运行栈地址相关,比如SP、BP;另一类与之无关,比如PC。 基于C语言栈“线性生长”的特点,通过弥补新旧栈地址的线性差值,就可以达到切换栈的效果。

然而,具体实现中 jmp_buf 的每个元素位置对应什么信息则是平台相关的,作为一个以可移植性为目的的系统不应该对其实现做任何假设,因此只能在程序启动阶段以某种方式动态计算获取。

这个计算的过程就隐藏在 coro_init 中,具体通过 _probe_arch 函数实现,相关代码如下:

/* This probing code is derived from Douglas Jones' user thread library */
struct _probe_data {
intptr_t low_bound; /* below probe on stack */
intptr_t probe_local; /* local to probe on stack */
intptr_t high_bound; /* above probe on stack */
intptr_t prior_local; /* value of probe_local from earlier call */

jmp_buf probe_env; /* saved environment of probe */
jmp_buf probe_sameAR; /* second environment saved by same call */
jmp_buf probe_samePC; /* environment saved on previous call */

jmp_buf * ref_probe; /* switches between probes */
};

void boundhigh(struct _probe_data *p)
{
int c;
p->high_bound = (intptr_t)&c;
}

void probe(struct _probe_data *p)
{
int c;
p->prior_local = p->probe_local;
p->probe_local = (intptr_t)&c;
__LABEL_0:
_setjmp( *(p->ref_probe) );
p->ref_probe = &p->probe_env;
__LABEL_1:
_setjmp( p->probe_sameAR );
boundhigh(p);
}

void boundlow(struct _probe_data *p)
{
int c;
p->low_bound = (intptr_t)&c;
probe(p);
}

void fill(struct _probe_data *p)
{
boundlow(p);
}

static void _infer_jmpbuf_offsets(struct _probe_data *pb)
{
/* following line views jump buffer as array of long intptr_t */
unsigned i;
intptr_t * p = (intptr_t *)pb->probe_env;
intptr_t * sameAR = (intptr_t *)pb->probe_sameAR;
intptr_t * samePC = (intptr_t *)pb->probe_samePC;
intptr_t prior_diff = pb->probe_local - pb->prior_local;
intptr_t min_frame = pb->probe_local;

for (i = 0; i < sizeof(jmp_buf) / sizeof(intptr_t); ++i) {
intptr_t pi = p[i], samePCi = samePC[i];
if (pi != samePCi) {
if (pi != sameAR[i]) {
perror("No Thread Launch\n" );
exit(-1);
}
if ((pi - samePCi) == prior_diff) {
/* the i'th pointer field in jmp_buf needs to be save/restored */
_offsets[_offsets_len++] = i;
if ((_stack_grows_up && min_frame > pi) || (!_stack_grows_up && min_frame < pi)) {
min_frame = pi;
}
}
}
}

_frame_offset = (_stack_grows_up
? pb->probe_local - min_frame
: min_frame - pb->probe_local);
}

static void _infer_direction_from(int *first_addr)
{
int second;
_stack_grows_up = (first_addr < &second);
}

static void _infer_stack_direction()
{
int first;
_infer_direction_from(&first);
}

static void _probe_arch()
{
struct _probe_data p;
p.ref_probe = &p.probe_samePC;

_infer_stack_direction();

/* do a probe with filler on stack */
fill(&p);
/* do a probe without filler */
boundlow(&p);
_infer_jmpbuf_offsets(&p);
}
  • 首先需要分析栈的生长方向,这个实现很简单,只要比较调用者和被调用者栈变量的地址大小就可以了;

  • 下一步通过调用fill(&p) -> boundlow(p) ->
    probe(p)
    ,将__LABLE_0的状态记录到 _probe_data 的 probe_samePC字段,而将__LABEL_1的状态记录到probe_sameAR字段;probe_local字段则记录执行 probe 函数时的栈顶 —— SP值。

  • 然后,通过调用boundlow(&p) -> probe(p),将__LABLE_0的状态记录到 _probe_data 的probe_env字段,而__LABEL_1的状态仍旧记录到probe_sameAR字段;将上一次记录的执行 probe 函数时记录的SP值保存到prior_local字段,同时更新probe_local字段记录本次执行probe函数的SP值。

  • 最后,调用 _infer_jmpbuf_offsets 函数进行最终计算。 这时,probe_samePC保存了第一次__LABEL_0处的状态,probe_env保存了第二次__LABEL_0处的状态,二者的PC属性相同,栈属性存在线性偏差(由于前次多了一层调用);而 probe_envprobe_sameAR的栈属性相同,PC属性不同(调用setjmp的位置不同)。 通过这三组 jmp_buf 数据的关系,以及之前记录的两次调用过程的SP值之间的偏差,就能求得 jmp_buf 各项的属性是栈相关的还是栈无关的,将所有栈相关量在 jmp_buf 中的索引位置记录在 _offset 数组中即可。

  • 同时,在遍历 jmp_buf 量的时候,还要找到其中与SP相差最大的值记录到 min_frame 变量中,这可能就是BP的值,用SP和BP相减,就得到了当前调用栈帧的大小,这个值最终被保存在全局变量 _frame_offset中,作为后来进行“栈拷贝”时的重要参数。

构建C协程之setjmp/long_jmp篇

小结

本例利用了 setjmp 操作的底层实现原理,特别是 jmp_buf 结构的实现方式,设计了一个“可移植”的方案。 这个方案虽然可行,但是仍在一定程度上对 setjmp / long_jmp 的实现做了一些假设。 项目开发者并没有给出业已经过测试、可正确运行的平台; 笔者也没有在X86之外的系统上做过实验,因此对这种实现的普适性无法给出保证。 但总的来说,案例二的实现的技巧还是颇值得玩味的。

同样,libconcurrency库也仅支持 “N:1” 的映射方式,底层实现中没有用到多线程 —— 尽管从代码实现来看,似乎作者希望提供某种“线程安全”支持,但究竟是否如此,作者也没有提供任何用例及说明。

案例三:Cilkplus 协程实现

综合上面两个案例,我们不难发现,利用 setjmp / long_jmp 机制实现协程系统虽然在理论上具有可移植性好、性能好的优点,但用于实践中,由于 setjmp / long_jmp 实现的不透明性,导致很难构建出一个符合产品级需求的协程框架 — 即使上面介绍的那些具有一定想象力的实现,也几乎难以直接应用于实际产品。

产生上述问题的原因,主要在于不同体系结构的实现存在很大差异。 那么能否退而求其次,放弃“可移植性”这个优势,集中于几种常见的架构,专注于性能方面的提高呢? 答案是肯定的 — 接下来介绍的 Cilkplus 语言 Runtime 库,就是 Intel 基于自家 X86 / X86_64 平台的特点,实现的一个高效的“协程”框架。

定制的 setjmp / long_jmp

Cilkplus 运行时环境所使用的 setjmp / long_jmp 并非 C 库中提供的版本,而是编译器内嵌版本_builtin_setjmp / _builtin_longjmp,对应的 jmp_buf 结构对于开发者 —— 至少是系统开发者是可见的 —— 它本身是一个 void* 型数组,其中存放着运行状态的PC、SP及BP值,而且明确知道每个值在数组中的位置 —— 这样就可以在 setjmp 后,直接对保存的状态值进行“修正”。

实现”协程”栈切换的原理

前面介绍了为每个“协程”分配独立运行栈对于一个“协程”运行时系统的重要性,也了解了 setjmp / long_jmp 机制下实现栈分配,特别是高可用性的栈分配机制存在一定困难。现在来看看 Cilkplus 如何解决这个问题。

由于 Cilkplus 仅针对 Intel X86 / X64 平台,因此在介绍其“协程”栈切换原理之前,有必要先回顾一下 X86 / X64 平台的调用栈规则。以32位的 X86 为例介绍:

  • 调用者规则: 调用者(Caller)首先需要将参数按照从右到左的顺序依次压栈,然后调用相应的函数,返回后再将参数栈释放。如下代码所示:
push [var] ; Push last parameter first
push 216 ; Push the second parameter
push eax ; Push first parameter last

call _myFunc ; Call the function (assume C naming)

add esp, 12
  • 被调用者规则: 被调用者(Callee)首先将当前BP(也就是Caller的BP)压栈,然后将当前SP(Caller的SP) 赋值给BP。之后如果遇到分配新的栈变量、创建调用参数或者用 alloca 动态分配空间时,则将 SP 减去新分配空间大小,并用“BP +/– 偏移量”的方式访问这些局部变量。访问当前栈帧的局部变量只需要BP即可,这点非常重要!
 push ebp
mov ebp, esp
  • 栈帧结构图如下所示:

构建C协程之setjmp/long_jmp篇

了解了这些基本知识后,就可以分析 Cilkplus 的代码了,其中最关键的部分如下所示:

NORETURN cilk_fiber_sysdep::run()
{
// Only fibers created from a pool have a proc method to run and execute.
CILK_ASSERT(m_start_proc);
CILK_ASSERT(!this->is_allocated_from_thread());
CILK_ASSERT(!this->is_resumable());

// TBD: This setjmp/longjmp pair simply changes the stack pointer.
// We could probably replace this code with some assembly.
if (! CILK_SETJMP(m_resume_jmpbuf))
{
// Change stack pointer to fiber stack
JMPBUF_SP(m_resume_jmpbuf) = m_stack_base;
CILK_LONGJMP(m_resume_jmpbuf);
}

// Verify that 1) 'this' is still valid and 2) '*this' has not been
// corrupted.
CILK_ASSERT(magic_number == m_magic);

// If the fiber that switched to me wants to be deallocated, do it now.
do_post_switch_actions();

// Now call the user proc on the new stack
m_start_proc(this);

// alloca() to force generation of frame pointer. The argument to alloca
// is contrived to prevent the compiler from optimizing it away. This
// code should never actually be executed.
int* dummy = (int*) alloca(sizeof(int) + (std::size_t) m_start_proc & 0x1);
*dummy = 0xface;

// User proc should never return.
__cilkrts_bug("Should not get here");
}

上面是启动一个新“协程”的过程,在该调用之前,新的栈空间已经通过malloc分配好了,指针保存在变量m_stack_base中。下面的任务就是如何将运行栈切换到这个新的空间。

  • 首先来看 if 分支,也就是直接从 CILK_SETJMP 返回的情形:通过 JMPBUF_SP 这个宏,可以访问到刚刚保存的SP值,将其修改并指向新的地址 —— 注意这里只对SP进行了修改,没有修改BP值,也没有进行”栈拷贝”。

  • 然后,通过 CILK_LONGJMP 跳转到修改后的状态 —— 执行线索切换到 if 分支后的代码 —— 注意,这时候 SP 已经切换成新值了,而 BP 还是原来的值 —— 根据之前的介绍,当前栈上的局部变量,包括函数的调用参数都通过“BP +/– 偏移量”进行访问,因此新的协程依然能访问到原来的栈变量、参数(注意:虽然run()方法本身没有参数,但由于是C++类方法,所以隐含了参数 this 指针,同时该对象的所有成员变量也依赖this指针才能访问到)。

  • 新协程继续执行,调用 m_start_proc(this) 进入新的入口函数 —— 这时由于使用新的 SP,调用参数及新函数的执行栈就都在新的栈空间上分配了,也就完成了“栈切换”。

Fast Clone / Slow Clone

如果说上面介绍的是 Cilkplus Runtime 协程机制的“普通”玩法,那么所谓的 “Fast Clone” / “Slow Clone” 就堪称 Cilkplus 协程机制的“文艺”玩法了。 该机制与Cilkplus的“Work Stealing”调度器一起,作为系统实现的精华,被各种介绍Cilkplus的文章反复提及,可以说是这门新语言的核心技术创新之一。

要分析 “Fast Clone” / “Slow Clone”,首先要了解一下 Cilkplus 的并行开发模型。 在Cilkplus中,主要提供了 cilk_spawn 和 cilk_sync 这两个关键字来处理并行任务的创建、同步执行线索的功能。

cilk_spawn 关键字的语法主要有两种形式:

int a = cilk_spawn foo(123); // 变量a记录foo(123)的返回值
cilk_spawn bar(); // bar() 没有返回值或忽略其返回值

cilk_spawn 关键字后面的函数调用会以单独的线索(“协程”)与原先的主线程并行执行,主线程需要在后面必要的位置插入 cilk_sync 语句进行显式同步操作: 比如上面例子中,主线程需要访问变量 a 的值时。 如果程序员没有显式提供cilk_sync 语句,编译器会在适当的位置插入同步语句。

对于一个程序,在多个线程上并行执行不同的部分是否能带来真正的性能提升,需要综合考量并行任务的粒度及其创建、同步等操作引入的额外开销,同时也要考量当前系统的负载情况 —— 这需要在运行过程中动态的判断。 如果一味将所有 spawn 的任务都分配新的线程执行,可能带来较大的开销而得不偿失。 为此,Cilkplus提出了一种动态优化的方案,即所谓的 “Fast Clone” / “Slow Clone”。

该方案的核心思想是,在 Spawn 一个新的执行线索后,并不马上为其创建新的执行线程,而是仅仅创建一个任务的执行状态(也就是“协程”); 系统后台有若干个执行线程,会根据负载情况获取任务并执行。

具体来说,Cilkplus采用了一种“Work First”的执行策略,在Spawn时刻,首先将当前状态(通过setjmp机制)保存起来,然后当前线程直接去执行Spawn的任务,而原先“主线程”的执行状态就挂到当前工作线程的任务队列中:

  • 若此时有其他空闲的工作线程,则“窃取”挂起的“主线程”任务,(通过long_jmp机制)恢复其执行,这样就实现了真正的并行 —— 这就是所谓的“Slow Clone”(“慢版本”);并行的任务通过“同步”操作合并,类似传统的“fork-join”模型,新任务(即原来的“主线程”)也就在同步点被释放了。

  • 相反,如果挂起的任务没有被其他工作线程“窃取”,则当前线程执行完 Spawn 的任务后,会恢复之前挂起的任务,直接返回了“主线程”继续后续操作。 由于 Spawn 任务本身就是在原来执行线程上运行的,因此可以跳过同步操作,看起来好像与不使用 cilk_spawn 关键字时的效果一样,是一个串行的版本。这就是所谓的 “Fast Clone”(即“快版本”)。

下面我们通过一个小例子来具体看一下 Cilkplus 编译器所作的“翻译”工作。

以下是原始的 cilkplus 程序:

int fib(int n) {
if (n < 2)
return n;
int a = cilk_spawn fib(n-1);
int b = fib(n-2);
cilk_sync;
return a + b;
}

下面是经编译器处理生成的程序,为了方便起见,这里用了伪代码表示:

struct struct_anon {
int n;
};
static void __cilk_spawn_helper_fib(struct_anon *agg, int *ret) {
... ...
__cilk_helper_prologue();
*ret = fib(agg->n);
__cilk_helper_epilogue();
... ...
}
int fib (int n) {
... ...
__cilkrts_stack_frame sf;
__cilk_parent_prologue(&sf);
... ...
cilk.spawn.savestate:
int a;
if (! SETJMP(&sf.ctx))
goto cilk.spawn.helpercall;
else
goto cilk.spawn.continuation;
cilk.spawn.helpercall:
struct struct_anon agg = { n - 1 };
__cilk_spawn_helper_fib(&agg, &a);
cilk.spawn.continuation:
int b = fib(n-2);
cilk.sync.savestate:
if (sf.flags & CILK_FRAME_UNSYNCHED) {
if (! SETJMP(sf.ctx))
goto cilk.sync.call;
}
cilk.sync.exit:
return a + b;
cilk.sync.call:
__cilkrts_sync(&sf);
goto cilk.sync.exit;
}
  • 首先,利用 setjmp 将当前“主线程”执行状态保存,然后 if 分支直接进入 _cilk_spawn_helper_fib 函数执行。

  • 在_cilk_spawn_helper_fib 函数中,先调用 _cilk_helper_prologue,执行工作包括将刚才保存的任务放到任务队列中;然后执行真正的计算任务 fib;结束计算后,执行 _cilk_helper_epilogue 函数检查“主线程”任务是否已被“窃取”,决定后续执行的路径。

  • 若任务没有被”窃取”,即执行“Fast Clone”,那么函数直接返回,继续计算 fib(n-2) …

  • 否则,通过 long_jmp 机制跳转到 cilk.sync.savestate 处,由新任务保存的状态,完成最后的返回操作;另一方面,被窃取执行的新任务则执行 fib(n-2),然后在 cilk.sync.savestate 处用 setjmp 保存执行状态,再执行 sync 操作 —— 之后,这个新任务就结束了,等待与之并行的那个 Spawn 的任务返回完成后续操作。

我把上面这个流程花了一个草图,供大家参考:

构建C协程之setjmp/long_jmp篇

总结

Cilkplus的运行时是我目前所知利用 setjmp / long_jmp 机制实现 “N:M” 协程系统的唯一实现,并且经过多年发展已经非常成熟。 目前,Cilkplus不仅为Intel自家的ICC编译器所支持,同时已合并到GCC主干,成为了GCC支持的语言。另外,基于Clang/LLVM的编译器也已经开源并已初具规模。

由于Cilkplus主要面向的高性能计算领域目前还是被Intel架构服务器所主宰,所以仅支持Intel X86 / X64 架构的策略暂时还无伤大雅。 但如果要实现一套更加通用的协程系统,那么依靠“setjmp / long_jmp”机制本身可能就比较困难了。

好了,今天的内容就是这些。下次将介绍基于“ucontext”的协程实现机制,敬请关注!