关于协程coroutine前面的文章已经介绍过了,本文总结对qemu中coroutine机制的分析,qemu 协程coroutine基于:setcontext函数族以及函数间跳转函数siglongjmp和sigsetjmp实现。使用setcontext函数族来实现用户态进程栈的切换,使用函数间跳转siglongjmp和sigsetjmp实现协程coroutine不退出以及多次进入,即使coroutine执行的任务已经完成,这实现了协程池的功能,避免大量协程创建和销毁带来的系统开销。
qemu coroutine主要提供了5个接口,用于协程创建、协程进入、协程让出,下面首次介绍qemu 实现协程使用的主要数据结构,然后将依次介绍qemu coroutine 这5个接口的实现。
1.qemu协程实现使用的主要数据结构 coroutine和CoroutineUContext:
/* 协程coroutine */
struct Coroutine {
CoroutineEntry *entry; /* 协程入口函数 */
void *entry_arg; /* 协程入口函数的参数 */
Coroutine *caller;
QSLIST_ENTRY(Coroutine) pool_next; /* 协程池挂链 */ /* Coroutines that should be woken up when we yield or terminate */
QTAILQ_HEAD(, Coroutine) co_queue_wakeup;
QTAILQ_ENTRY(Coroutine) co_queue_next; /* co_queue_wakeup挂链 */
};
typedef struct {
Coroutine base; /* 协程coroutine */
void *stack; /* 当前上下文的进程栈 */
sigjmp_buf env; #ifdef CONFIG_VALGRIND_H
unsigned int valgrind_stack_id;
#endif } CoroutineUContext; /* coroutine上下文 */
coroutine数据结构主要封装协程,coroutineUContext封装协程上下文,是对coroutine的进一步包装。
2. qemu协程创建函数 qemu_coroutine_create,其实现如下:
Coroutine *qemu_coroutine_create(CoroutineEntry *entry)
{
Coroutine *co = NULL; if (CONFIG_COROUTINE_POOL) { /* 判断是否使用了coroutine池 */
qemu_mutex_lock(&pool_lock);
co = QSLIST_FIRST(&pool); /* 从池子里取出第一个协程 */
if (co) {
QSLIST_REMOVE_HEAD(&pool, pool_next);
pool_size--;
}
qemu_mutex_unlock(&pool_lock);
} if (!co) { /* co为NULL,表示没有使用coroutine池或者池子已空 */
co = qemu_coroutine_new(); /* 创建一个新的coroutine,这里只是一个空的协程 */
} co->entry = entry; /* 设置协程的入口函数 */
QTAILQ_INIT(&co->co_queue_wakeup); /* 初始化协程线性队列 */
return co;
}
qemu_coroutine_create首先尝试从coroutine池中取出一个coroutine,如果没有获取到,则通过qemu_coroutine_new函数创建一个新的coroutine,qemu_coroutine_new的实现如下:
Coroutine *qemu_coroutine_new(void)
{
const size_t stack_size = << ; /* ucontext_t使用的栈大小 */
CoroutineUContext *co; /* 协程上下文 */
ucontext_t old_uc, uc; /* 进程执行上下文 */
sigjmp_buf old_env; /* 函数间跳转-环境 */
union cc_arg arg = {}; /* The ucontext functions preserve signal masks which incurs a
* system call overhead. sigsetjmp(buf, 0)/siglongjmp() does not
* preserve signal masks but only works on the current stack.
* Since we need a way to create and switch to a new stack, use
* the ucontext functions for that but sigsetjmp()/siglongjmp() for
* everything else.
*/ if (getcontext(&uc) == -) {
abort();
}
/* 协程上下文CoroutineUContext初始化 */
co = g_malloc0(sizeof(*co));
co->stack = g_malloc(stack_size);
co->base.entry_arg = &old_env; /* stash away our jmp_buf */ /* 进程执行上下文ucontext_t初始化 */
uc.uc_link = &old_uc;
uc.uc_stack.ss_sp = co->stack;
uc.uc_stack.ss_size = stack_size;
uc.uc_stack.ss_flags = ; #ifdef CONFIG_VALGRIND_H
co->valgrind_stack_id =
VALGRIND_STACK_REGISTER(co->stack, co->stack + stack_size);
#endif
/* co的传递为什么要以arg的方式?????? */
arg.p = co;
/* 创建一个进程执行上下文uc,进程执行上下文的入口函数为coroutine_trampoline */
makecontext(&uc, (void (*)(void))coroutine_trampoline,
, arg.i[], arg.i[]); /* swapcontext() in, siglongjmp() back out */
if (!sigsetjmp(old_env, )) { /* 保存当前堆栈环境,sigsetjmp为一次调用多次返回的函数 */
swapcontext(&old_uc, &uc);/* 进入uc进程执行上下文,并保存当前上下文到old_uc */
}
return &co->base; /* 返回coroutine */
}
qemu_coroutine_new的主要动作:
- 3-7行定义堆栈大小、进程上下文、协程上下文、函数间跳转变量等。
- 20-23行初始化coroutine上下文。
- 25-29行初始化进程上下文。
- 38行创建一个新的进程上下文uc。
- 42-43行首先通过sigsetjmp保存当前栈环境,sigsetjmp是一种一次调用可以多次返回的函数,第一次返回值为0,之后的返回值取决于导致其返回的siglongjmp的参数,因此第一sigsetjmp返回时将执行43行,进入uc进程执行上下文,38行将uc的入口函数设置为coroutine_trampoline,因此43行将进入coroutine_trampoline函数的执行。
- 45行返回协程上下文中的coroutine。
上面的注释提到了一个疑问:38行将协程上下文co作为参数传递给了新创建的协程uc,但是co的传递为什么要转换成arg,并以两个int变量的形式传递?cc_arg联合体的定义给出了说明:
/*
* va_args to makecontext() must be type 'int', so passing
* the pointer we need may require several int args. This
* union is a quick hack to let us do that
*/
union cc_arg {
void *p;
int i[];
};
主要原因是makecontext的va_args参数只接受int类型,因此作为指针传递的协程上下文co等价于两个int类型的变量,64位系统上int类型占用4个字节,指针类型占用8个字节。
上面qemu_coroutine_new函数43行的执行将导致进入coroutine_trampoline函数,下面分析coroutine_trampoline函数的实现:
/*
* qemu coroutine入口函数,
* 函数参数i0为协程上下文指针的低8位,
* i1为协程上下文指针的高八位。
*/
static void coroutine_trampoline(int i0, int i1)
{
union cc_arg arg;
CoroutineUContext *self;
Coroutine *co; arg.i[] = i0;
arg.i[] = i1;
self = arg.p;/* 获取协程上下文对象指针 */
co = &self->base;/* 获取协程上下文的协程对象指针 */ /* Initialize longjmp environment and switch back the caller */
if (!sigsetjmp(self->env, )) { /* 保存当前堆栈信息,为了再一次进入该协程上下文 */
/* 函数间跳转,跳转到qemu_coroutine_new函数的42行 */
siglongjmp(*(sigjmp_buf *)co->entry_arg, );
} while (true) {
/* 执行协程的入口函数 */
co->entry(co->entry_arg);
/* 协程入口函数退出,协程退出到调用者 */
qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
}
}
coroutine_trampoline的主要动作:
- 取得协程上下文对象self,并通过协程上下文对象获取相应的协程对象co,12-15行。
- 通过sigsetjmp保存当前堆栈到协程上下文的env中成员变量中,env作为协程再一次进入的点,18行。
- 第一次执行sigsetjmp时,sigsetjmp返回0,因此通过siglongjmp,跳出到qemu_coroutine_new的42行,进而导致qemu_coroutine_new返回,然后qemu_coroutine_create的返回,19行。
- 当再一次通过siglongjmp进入coroutine_trampoline函数,也即18行时,将进入while循环,在循环中调用协程入口函数开始执行,执行完成后通过qemu_coroutine_switch进行协程上下文切换,切换到协程调用的上下文中,23-28行。
注意这里的co->caller将在进入该协程时被赋值,上面即是qemu中创建一个协程对象的过程,从上面的分析可以看出qemu中每一协程coroutine对象对应一个协程上下文对象,通过makecontext创建一个新的进程执行上下文,可以看做协程的主体,协程上下文对象的env成员保存了进入执行上下文的点,通过siglongjmp跳出该执行上下文,qemu协程的创建也即创建了一个新的进程执行上下文,并且保存了再次进入该执行上下文的堆栈信息,下面将分析协程进入函数qemu_coroutine_enter。
3. qemu协程进入函数 qemu_coroutine_enter,其实现如下:
/* 功能:切换到co执行上下文,也即开始执行co的入口函数,opaque为入口函数的参数 */
void qemu_coroutine_enter(Coroutine *co, void *opaque)
{
Coroutine *self = qemu_coroutine_self(); /* 获取当前的进程执行上下文-当前协程 */ trace_qemu_coroutine_enter(self, co, opaque); if (co->caller) { /* qemu 协程不允许递归,也即协程内创建协程 */
fprintf(stderr, "Co-routine re-entered recursively\n");
abort();
}
/* 调用co协程的协程,也即进入co上下文之前的进程上下文 */
co->caller = self;
/* co协程入口函数的参数 */
co->entry_arg = opaque;
/* 将进程上下文从self切换到co */
coroutine_swap(self, co);
}
qemu_coroutine_enter函数的实现主要为:获取当前进程执行上下文并保存到co->caller中,然后设置co入口函数的参数,之后做上下文切换coroutine_swap()。coroutine_swap的实现如下:
/* 协程切换:从from切换到to */
static void coroutine_swap(Coroutine *from, Coroutine *to)
{
CoroutineAction ret;
/* 协程切换,切换到to */
ret = qemu_coroutine_switch(from, to, COROUTINE_YIELD);
/* to协程让出,依次唤醒co->co_queue_wakeup列表中排队的协程 */
qemu_co_queue_run_restart(to);
/* 根据返回值,决定是否删除协程co还是仅仅退出 */
switch (ret) {
case COROUTINE_YIELD:
return;
case COROUTINE_TERMINATE:
trace_qemu_coroutine_terminate(to);
coroutine_delete(to);
return;
default:
abort();
}
}
coroutine_swap的实现主要:首先切换到to协程上下文执行,当to协程让出后依次唤醒排队的协程,之后根据to协程退出的返回值来决定是否删除to,下面是qemu_coroutine_switch函数的实现:
CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
CoroutineAction action)
{
CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_);
CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_);
CoroutineThreadState *s = coroutine_get_thread_state();
int ret; s->current = to_; /* s在这里起什么作用呢? */ ret = sigsetjmp(from->env, ); /* 保存当前堆栈到from->env,用于协程的让出 */
if (ret == ) {
siglongjmp(to->env, action);/* 跳转到coroutine_trampoline中第18行 */
}
return ret;
}
qemu_coroutine_switch值得注意的两个地方:
- 首先11行保存了当前堆栈到from->env, to协程的让出时的返回点,前面的coroutine_trampoline函数25行-当前协程执行完成时,执行27行将导致执行上下文切换到此处。
- 其次是13行执行函数间跳转,在创建协程时在coroutine_trampoline函数的18行我们保存了堆栈信息到所创建协程的env成员中,因此13行的跳转导致直接切换到coroutine_trampoline的18行执行,在coroutine_trampoline中执行co->entry开始执行协程的入口函数,也即开始了协程上下文的执行。
有两种方式可以退出当前协程:协程入口函数返回、协程上下文主动执行qemu_coroutine_yield函数,前面已经说明了在coroutine_trampoline函数中协程入口函数返回时,将通过siglongjmp的方式来退出当前协程的执行上下文,下面介绍qemu_coroutine_yield的实现。
4. qemu协程让出函数 qemu_coroutine_yield,其实现如下