Build a thread pool in C

时间:2022-06-19 21:02:10
想找个轻便的 thread pool 实现,结果发现网上能找到的都是一些很重量级的,如 boost,ACE 里面的。唯有自己照着下面的需求实现了一个
http://paul.rutgers.edu/~csgates/CS416/proj2/index.html

源代码下载:
http://code.google.com/p/spserver/downloads/list
http://spserver.googlecode.com/files/threadpool-0.1.src.tar.gz

从 0.2 版开始,移植到了 windows 平台,直接使用 windows 的 thread api
http://spserver.googlecode.com/files/threadpool-0.2.src.tar.gz

这个 threadpool 是基于 pthread api 写的。除了 linux/unix 上有 pthread 之外,windows 下也有 pthread 的库。

尝试把设计的过程写一下。由于设计过程中,思路是很跳跃的,有很多的思考细节可能都漏写了。

线程池也是一种对象池,和其他的对象池(如连接池)有相同的地方,
目的是在使用相关的对象时能够避免创建和销毁对象带来的资源消耗,使得程序响应更快。
但线程和其他普通的对象也有不同,线程包含一个指令指针,还包括其他资源,如运行时的函数激活记录的堆栈、一组寄存器和线程特有的数据。所以线程池的管理和其他对象池的管理就有很多的不同。

1.在开始设计线程池的时候,首先可以先考虑一下连接池的使用场景。
一般是由主线程主动地从池里面获取一个连接,主线程用完再把连接返回给池。
主线程对连接对象的使用,就是调用连接对象的方法(按 OO 的说法,就是给对象发送消息 )。

2.按连接池的使用场景,要实现线程池,首先要有一个线程对象。
当主线程从池里面获得一个线程之后,主线程要能够向这个线程发送消息,以达到主线程使用线程的目的。
那么这个线程对象需要能够支持主线程向线程池的线程发送消息。
通过阅读 pthread 的规范,可以了解到 pthread_cond_wait 是 POSIX 线程信号发送系统的核心。
主线程要发送给线程池线程的信息还包括这次任务的参数:一个函数指针和提供给这个函数的参数。

关于 pthread_cond_wait 的有关资料
http://www-128.ibm.com/developerworks/cn/linux/thread/posix_thread3/index.html

3.在上面的第二点中,只考虑了如何从线程池获得线程,并使用线程,而没有考虑如何在使用完线程之后如何把线程归还给池。
连接池的使用场景中,通常是有主线程来完成这个操作。但是在线程池的使用场景中,主线程在发送消息之后,
通常就不再等待这个线程完成任务。即主线程通常只负责取,但不负责还。
考虑到这一点,就需要在每个线程对象中记录自己所属的池,当完成任务之后,线程主动把自己归还到池里面。

至此我们可以设计出线程对象的数据结构

typedef struct _thread_st {   
        pthread_t id;   
        pthread_mutex_t mutex;   
        pthread_cond_t cond;   
        dispatch_fn fn;   
        void *arg;   
        threadpool parent;   
} _thread;   

4.有了这个数据结构,对于 dispatch_threadpool 和 wrapper_fn 的设计也就比较容易想到了。

在 dispatch_threadpool 中,从池里面获得一个线程,然后设置要发送给线程的信息。
包括:thread->fn,thread->arg,thread->parent;接着正式发送
pthread_cond_signal( &thread->cond ) ;

int dispatch_threadpool(threadpool from_me, dispatch_fn dispatch_to_here, void *arg)     
{    
......   
       } else {     
                 pool->tp_index--;     
                 thread = pool->tp_list[ pool->tp_index ];     
                 pool->tp_list[ pool->tp_index ] = NULL;     
      
                 thread->fn = dispatch_to_here;     
                 thread->arg = arg;     
                 thread->parent = pool;     
      
                 pthread_mutex_lock( &thread->mutex );     
                 pthread_cond_signal( &thread->cond ) ;     
                 pthread_mutex_unlock ( &thread->mutex );     
        }    
......   
}   

在 wrapper_fn 中,每次执行 thread->fn( thread->arg ) 之后,主动把自己归还到池里面,
然后等待主线程发送下一次消息

void * wrapper_fn( void * arg )     
{     
......   
        for( ; 0 == ((_threadpool*)thread->parent)->tp_stop; ) {    
                         thread->fn( thread->arg );    
......   
                         save_thread( thread->parent, thread );   
                         pthread_cond_wait( &thread->cond, &thread->mutex );     
......   
        }   
......   
}     

更多细节请参考完整的 实现代码。