有时我们希望把一部分工作通过创建线程的方式异步执行,这样我们可以在执行任务的同时,继续执行其他任务。但是如果这种需求比较多的话,频繁的创建和销毁线程带来很大的性能损耗。如果我们能创建一个或一些线程,然后重复使用它们,就可以避免这个问题。
Qemu的实现
qemu模仿glib实现了线程池的功能,目前qemu中线程池主要应用在raw文件的支持上,当linux-aio不可用时,就像glibc,通过线程实现aio机制。我们也可看到,代表线程池中的线程成员的数据结构 ThreadPoolElement 就包含了用来描述aio的BlockAIOCB结构。相关数据结构如下:
首先看一下线程池相关的数据结构:
typedef struct AIOCBInfo {
void (*cancel_async)(BlockAIOCB *acb);
AioContext *(*get_aio_context)(BlockAIOCB *acb);
size_t aiocb_size;
} AIOCBInfo;
struct BlockAIOCB {
const AIOCBInfo *aiocb_info;
BlockDriverState *bs;
BlockCompletionFunc *cb;
void *opaque;
int refcnt;
};
struct ThreadPoolElement {
BlockAIOCB common;
ThreadPool *pool;
ThreadPoolFunc *func;
void *arg;
/* Moving state out of THREAD_QUEUED is protected by lock. After
* that, only the worker thread can write to it. Reads and writes
* of state and ret are ordered with memory barriers.
*/
enum ThreadState state;
int ret;
/* Access to this list is protected by lock. */
QTAILQ_ENTRY(ThreadPoolElement) reqs;
/* Access to this list is protected by the global mutex. */
QLIST_ENTRY(ThreadPoolElement) all;
};
struct ThreadPool {
AioContext *ctx;
QEMUBH *completion_bh;
QemuMutex lock;
QemuCond worker_stopped;
QemuSemaphore sem;
int max_threads;
QEMUBH *new_thread_bh;
/* The following variables are only accessed from one AioContext. */
QLIST_HEAD(, ThreadPoolElement) head;
/* The following variables are protected by lock. */
QTAILQ_HEAD(, ThreadPoolElement) request_list;
int cur_threads;
int idle_threads;
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
bool stopping;
};
ThreadPool 数据结构负责维护线程池里面的线程成员,线程的创建是通过下半部来实现的;ThreadPool 中有5个负责维护不同状态下的线程成员的计数器,max_threads负责统计线程池中允许创建的线程的最大值; new_threads负责统计需要创建的线程数;pending_threads负责统计已创建但还没有运行的线程数;idle_threads负责统计空闲的线程数;cur_threads负责统计当前线程池中线程的个数;注意cur_threads包含new_threads中尚未创建的线程。
线程池创建
首先通过thread_pool_new函数为特定的AioContext实例创建一个新的线程池。在这个函数中初始化ThreadPool数据结构的各个成员,包括负责创建新线程的new_thread_bh和线程执行完毕后用来调度执行任务完成回调函数的completion_bh。
任务提交
我们通过调用thread_pool_submit_aio函数来提交任务,这个函数的原型是:
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
ThreadPoolFunc *func, void *arg,
BlockCompletionFunc *cb, void *opaque)
这个函数为提交的任务创建一个ThreadPoolElement实例添加到ThreadPool中,同时调用spawn_thread函数来创建一个qemu线程。spawn_thread函数是通过调度pool->new_thread_bh来创建qemu线程的。
线程执行
创建的线程执行worker_thread函数,这个函数从pool->request_list链表中取下第一个ThreadPoolElement节点,执行其任务函数,然后调度执行pool->completion_bh;这个bh遍历pool->head链表,根据其ThreadPoolElement成员的状态来决定是否执行实例上注册的完成回调函数。
线程执行完一个任务后,也就是一个ThreadPoolElement实例被执行后,线程就出在idle状态,等待下一个任务提交动作。任务提交与线程执行之间的同步是通过pool->sem来实现的。thread_pool_submit_aio中任务提交后会调用qemu_sem_post(&pool->sem)来增加pool->sem的计数,worker_thread在pool->sem上醒来后从pool->request_list链表上获取下一个要执行的ThreadPoolElement节点。
总结
线程池计数提供了线程重复使用的功能,这在qemu有大量io操作的时候提高了性能;同时,也提供了除了linux-aio之外的aio实现。
参考:
https://developer.gnome.org/glib/2.46/glib-Thread-Pools.html