讨论QQ群:135202158
本文技术参考了sourceforge项目c thread pool,链接:http://sourceforge.net/projects/cthpool/
线程池如上一篇随笔(http://www.cnblogs.com/zzqcn/p/3585003.html)提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,这些线程处于睡眠中;一旦有工作加入工作队列,其中的某些线程就会醒来,处理这些工作,完成后继续睡眠 。
要实现线程池(只针对本文的简单实现而言),应设计和构建3样东西:
- 含N个线程的线程组
- 工作队列
- 工作线程例程
线程组和工作队列表示如下:
/*
* Threads:
*
* +----------+----------+------+------------+
* | thread 0 | thread 1 | .... | thread n-1 |
* +----------+----------+------+------------+
*
* Job Queue:
*
* back front
* | |
* v v
* +-------+ +-------+ +-------+
* | job 0 | -> | job 1 | -> ... -> | job x |
* +-------+ +-------+ +-------+
*
*/
线程组可以用普通数组或者动态分配的数组实现,维数就是池中线程数量,存放的其实是线程ID。工作队列可以直接用C++ queue容器实现。
工作线程例程(线程函数)的大致执行流程如下图所示:
/*
*
* Each Thread Routine:
* Job-Queue
* | ...
* v |
* +-------+ +---------+ EnQueue
* +---> | sleep | (No job) | new job | <--------- Client
* | +-------+ +---------+
* | | |
* | | DeQueue +---------+
* | + <----------- | new job |
* | | +---------+
* | v
* | +---------+
* | | do work |
* | +---------+
* | |
* | |
* +----<----+
*
*/
工作队列中没有工作时它就睡眠 ,有工作时苏醒,从队列首部取出(&删除)一个工作,然后开始执行。
另外,我们还需要一个互斥锁L和一个计数信号量S,互斥锁用来同步工作队列的增删操作,计数信号量用来对工作队列中的工作数量进行记录。工作线程会一直等待S,直到它大于0。
下面给出完整代码。
1. threadpool.h
/*
* Linux线程池的简单实现.
* Author: 赵子清
* Blog: http://www.cnblogs.com/zzqcn
*
**/ #ifndef __THREADPOOL_H__
#define __THREADPOOL_H__ #include <semaphore.h>
#include <pthread.h>
#include <queue> #define DLPTP_MAX_THREADS 1024 struct tp_job_t
{
void (*work) (void*);
void* arg;
}; struct tp_threadpool_t
{
pthread_t* threads;
size_t nthreads;
std::queue<tp_job_t> jobs;
sem_t njobs;
pthread_mutex_t lock;
bool running;
}; tp_threadpool_t* tp_init(size_t _nthreads);
int tp_deinit(tp_threadpool_t* _ptp);
void* tp_worker(void* _ptp);
int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg); #endif
2. threadpool.cpp
/*
* Linux线程池的简单实现.
* Author: 赵子清
* Blog: http://www.cnblogs.com/zzqcn
*
**/ #include "threadpool.h" tp_threadpool_t* tp_init(size_t _nthreads)
{
if(_nthreads < || _nthreads > DLPTP_MAX_THREADS)
return NULL; int err = ;
tp_threadpool_t* ret = NULL;
size_t i, j; ret = new tp_threadpool_t;
if(NULL == ret)
return NULL;
ret->nthreads = _nthreads;
ret->threads = new pthread_t[_nthreads];
if(NULL == ret->threads)
{
delete ret;
return NULL;
}
ret->running = true; err = sem_init(&ret->njobs, , );
if(- == err)
{
delete[] ret->threads;
delete ret;
return NULL;
} err = pthread_mutex_init(&ret->lock, NULL);
if(err)
{
sem_destroy(&ret->njobs);
delete[] ret->threads;
delete ret;
return NULL;
} for(i=; i<_nthreads; ++i)
{
err = pthread_create(&ret->threads[i], NULL, tp_worker, (void*)ret);
if(err)
{
ret->running = false;
for(j=; j<i; ++j)
{
pthread_cancel(ret->threads[j]);
pthread_join(ret->threads[j], NULL);
}
pthread_mutex_destroy(&ret->lock);
sem_destroy(&ret->njobs);
delete[] ret->threads;
delete ret;
return NULL;
}
} return ret;
} int tp_deinit(tp_threadpool_t* _ptp)
{
if(NULL == _ptp)
return -; int err = ;
size_t i, j; // TODO: if now worker has job to handle, do something then exit
while(!_ptp->jobs.empty()); _ptp->running = false; for(i=; i<_ptp->nthreads; ++i)
{
err = sem_post(&_ptp->njobs); /* V, ++ */
if(err)
{
for(j=i; j<_ptp->nthreads; ++j)
pthread_cancel(_ptp->threads[j]);
break;
}
} for(i=; i<_ptp->nthreads; ++i)
pthread_join(_ptp->threads[i], NULL); pthread_mutex_destroy(&_ptp->lock);
sem_destroy(&_ptp->njobs); delete[] _ptp->threads; _ptp->threads = NULL;
delete _ptp; _ptp = NULL; return ;
} void* tp_worker(void* _ptp)
{
if(NULL == _ptp)
return NULL; tp_threadpool_t* p = (tp_threadpool_t*)_ptp; while(p->running)
{
sem_wait(&p->njobs); /* P, -- */ if(!p->running)
return NULL; void (*work) (void*);
void* arg;
tp_job_t job; pthread_mutex_lock(&p->lock); /* LOCK */ job = p->jobs.front();
work = job.work;
arg = job.arg;
p->jobs.pop(); pthread_mutex_unlock(&p->lock); /* UNLOCK */ work(arg);
} return NULL;
} int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg)
{
if(NULL == _ptp || NULL == _work)
return -; tp_job_t job;
job.work = _work;
job.arg = _arg; pthread_mutex_lock(&_ptp->lock); /* LOCK */
_ptp->jobs.push(job);
sem_post(&_ptp->njobs); /* V, ++ */
pthread_mutex_unlock(&_ptp->lock); /* UNLOCK */ return ;
}
3. 测试程序main.cpp
/*
* Linux线程池测试.
* Author: 赵子清
* Blog: http://www.cnblogs.com/zzqcn
*
**/ #include <unistd.h>
#include <stdio.h>
#include "threadpool.h" /* task 1 */
void task1(void* _arg)
{
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 1 running..\n");
usleep();
} /* task 2 */
void task2(void* _arg)
{
printf("# Thread working: %u\n", (int)pthread_self());
printf(" Task 2 running.. ");
printf("%d\n", *((int*)_arg));
usleep();
} #define N_THREADS 4 int main(int argc, char** argv)
{
tp_threadpool_t* ptp = NULL;
int i; ptp = tp_init(N_THREADS);
if(NULL == ptp)
{
fprintf(stderr, "tp_init fail\n");
return -;
} int a = ;
for(i=; i<; ++i)
{
tp_add_job(ptp, task1, NULL);
tp_add_job(ptp, task2, (void*)&a);
} tp_deinit(ptp); return ;
}