前言
任何一种设计方式的引入都会带来额外的开支,是否使用,取决于能带来多大的好处和能带来多大的坏处,好处与坏处包括程序的性能、代码的可读性、代码的可维护性、程序的开发效率等。
线程池适用场合:任务比较多,需要拉起大量线程来处理;任务的处理时间相对比较短,按照线程的周期T1(创建阶段)、T2(执行阶段)、T3(销毁阶段)来算,执行阶段仅占用较少时间。
简单的线程池通常有以下功能:预创建一定数量的线程;管理线程任务,当工作线程没有事情可做时休眠自己;销毁线程池。
复杂一些的线程池有额外的调节功能:管理线程池的上限;动态调节工作线程数量,当大量工作请求到来时增加工作线程,工作请求较少时销毁部分线程。
内容部分
这次实现的是一个简单的线程池模型。
首先是线程池的头文件定义:
1 #include<unistd.h>
2 #include<stdlib.h>
3 #include<iostream>
4 #include<string>
5 #include<string.h>
6 #include<queue>
7 #include<errno.h>
8 #include<pthread.h>
9 using namespace std;
10
11 struct thread_work
12 {
13 void* (*routine)(void*);
14 void* arg;
15 };
16
17 struct thread_pool
18 {
19 bool ShutDown;
20 unsigned int iMaxThreadNum;
21 pthread_mutex_t pool_mutex;
22 pthread_cond_t pool_cond;
23 queue<pthread_t> Pth_IdQueue;
24 queue<thread_work> Pth_workQueue;
25 };
26
27 thread_pool* thread_pool_create(int iThreadNum);
28 void thread_pool_destroy(thread_pool* thpool);
29 int thread_pool_add_task(thread_pool* thpool, void*(*routine)(void*), void* arg);
30 void* thread_routine(void* arg);
下面是线程池的实现:
1 #include "ThreadPool.h"
2
3 thread_pool* thread_pool_create(int iThreadMaxNum)
4 {
5 int iRet = 0;
6 thread_pool* pool = new thread_pool;
7 if(NULL == pool)
8 {
9 cout << "new thread_pool failed! procedure exit" << endl;
10 return NULL;
11 }
12 pool->iMaxThreadNum = iThreadMaxNum;
13 pool->ShutDown = false;
14 if((iRet=pthread_mutex_init(&pool->pool_mutex, NULL)) != 0)
15 {
16 cout << __FUNCTION__ << "thread_pool init failed! error: " << strerror(iRet) << endl;
17 delete pool;
18 return NULL;
19 }
20 if((iRet=pthread_cond_init(&pool->pool_cond, NULL)) != 0)
21 {
22 cout << __FUNCTION__ << "thread_cond init failed! error: " << strerror(iRet) << endl;
23 delete pool;
24 return NULL;
25 }
26
27 for(int i = 0; i < iThreadMaxNum; i++)
28 {
29 pthread_t thid = 0;
30 if((iRet=pthread_create(&thid, NULL, thread_routine, (void*)pool)) != 0)
31 {
32 cout << __FUNCTION__ << " pthread_create failed, error: " << strerror(iRet) << endl;
33 delete pool;
34 return NULL;
35 }
36 else
37 {
38 pool->Pth_IdQueue.push(thid);
39 }
40 }
41 return pool;
42 }
43
44 int thread_pool_add_task(thread_pool* thpool, void*(*routine)(void*), void* arg)
45 {
46 if(routine == NULL || thpool == NULL)
47 return -1;
48 thread_work temp;
49 temp.routine = routine;
50 temp.arg = arg;
51 pthread_mutex_lock(&(thpool->pool_mutex));
52 thpool->Pth_workQueue.push(temp);
53 pthread_mutex_unlock(&(thpool->pool_mutex));
54 pthread_cond_signal(&(thpool->pool_cond));
55 return 0;
56 }
57
58 void thread_pool_destroy(thread_pool* thpool)
59 {
60 if(thpool == NULL)
61 return;
62 pthread_mutex_lock(&(thpool->pool_mutex));
63 thpool->ShutDown = true;
64 while(!thpool->Pth_workQueue.empty())
65 {
66 thpool->Pth_workQueue.pop();
67 }
68 pthread_cond_broadcast(&(thpool->pool_cond));
69 pthread_mutex_unlock(&(thpool->pool_mutex));
70 while(!thpool->Pth_IdQueue.empty())
71 {
72 pthread_join(thpool->Pth_IdQueue.front(), NULL);
73 thpool->Pth_IdQueue.pop();
74 }
75 pthread_mutex_destroy(&(thpool->pool_mutex));
76 pthread_cond_destroy(&(thpool->pool_cond));
77 delete thpool;
78 }
79
80 void* thread_routine(void* pool)
81 {
82 if(pool == NULL)
83 {
84 cout << "thread_routine params is empty, thread_exit" << endl;
85 return NULL;
86 }
87 while(1)
88 {
89 thread_pool* p = (thread_pool*)pool;
90 pthread_mutex_lock(&(p->pool_mutex));
91 while(p->Pth_workQueue.empty() && !p->ShutDown)
92 {
93 pthread_cond_wait(&(p->pool_cond), &(p->pool_mutex));
94 }
95 if(p->ShutDown)
96 {
97 pthread_mutex_unlock(&(p->pool_mutex));
98 return NULL;
99 }
100 thread_work work_f;
101 work_f = p->Pth_workQueue.front();
102 p->Pth_workQueue.pop();
103 pthread_mutex_unlock(&(p->pool_mutex));
104 work_f.routine(work_f.arg);
105 }
106 }
2
3 thread_pool* thread_pool_create(int iThreadMaxNum)
4 {
5 int iRet = 0;
6 thread_pool* pool = new thread_pool;
7 if(NULL == pool)
8 {
9 cout << "new thread_pool failed! procedure exit" << endl;
10 return NULL;
11 }
12 pool->iMaxThreadNum = iThreadMaxNum;
13 pool->ShutDown = false;
14 if((iRet=pthread_mutex_init(&pool->pool_mutex, NULL)) != 0)
15 {
16 cout << __FUNCTION__ << "thread_pool init failed! error: " << strerror(iRet) << endl;
17 delete pool;
18 return NULL;
19 }
20 if((iRet=pthread_cond_init(&pool->pool_cond, NULL)) != 0)
21 {
22 cout << __FUNCTION__ << "thread_cond init failed! error: " << strerror(iRet) << endl;
23 delete pool;
24 return NULL;
25 }
26
27 for(int i = 0; i < iThreadMaxNum; i++)
28 {
29 pthread_t thid = 0;
30 if((iRet=pthread_create(&thid, NULL, thread_routine, (void*)pool)) != 0)
31 {
32 cout << __FUNCTION__ << " pthread_create failed, error: " << strerror(iRet) << endl;
33 delete pool;
34 return NULL;
35 }
36 else
37 {
38 pool->Pth_IdQueue.push(thid);
39 }
40 }
41 return pool;
42 }
43
44 int thread_pool_add_task(thread_pool* thpool, void*(*routine)(void*), void* arg)
45 {
46 if(routine == NULL || thpool == NULL)
47 return -1;
48 thread_work temp;
49 temp.routine = routine;
50 temp.arg = arg;
51 pthread_mutex_lock(&(thpool->pool_mutex));
52 thpool->Pth_workQueue.push(temp);
53 pthread_mutex_unlock(&(thpool->pool_mutex));
54 pthread_cond_signal(&(thpool->pool_cond));
55 return 0;
56 }
57
58 void thread_pool_destroy(thread_pool* thpool)
59 {
60 if(thpool == NULL)
61 return;
62 pthread_mutex_lock(&(thpool->pool_mutex));
63 thpool->ShutDown = true;
64 while(!thpool->Pth_workQueue.empty())
65 {
66 thpool->Pth_workQueue.pop();
67 }
68 pthread_cond_broadcast(&(thpool->pool_cond));
69 pthread_mutex_unlock(&(thpool->pool_mutex));
70 while(!thpool->Pth_IdQueue.empty())
71 {
72 pthread_join(thpool->Pth_IdQueue.front(), NULL);
73 thpool->Pth_IdQueue.pop();
74 }
75 pthread_mutex_destroy(&(thpool->pool_mutex));
76 pthread_cond_destroy(&(thpool->pool_cond));
77 delete thpool;
78 }
79
80 void* thread_routine(void* pool)
81 {
82 if(pool == NULL)
83 {
84 cout << "thread_routine params is empty, thread_exit" << endl;
85 return NULL;
86 }
87 while(1)
88 {
89 thread_pool* p = (thread_pool*)pool;
90 pthread_mutex_lock(&(p->pool_mutex));
91 while(p->Pth_workQueue.empty() && !p->ShutDown)
92 {
93 pthread_cond_wait(&(p->pool_cond), &(p->pool_mutex));
94 }
95 if(p->ShutDown)
96 {
97 pthread_mutex_unlock(&(p->pool_mutex));
98 return NULL;
99 }
100 thread_work work_f;
101 work_f = p->Pth_workQueue.front();
102 p->Pth_workQueue.pop();
103 pthread_mutex_unlock(&(p->pool_mutex));
104 work_f.routine(work_f.arg);
105 }
106 }
代码都是基础知识,大家应该都能理解。
下面是一个测试的demo:
1 #include "ThreadPool.h"
2
3 void* task(void* p)
4 {
5 int ptI = *(int*)p;
6 //cout << "ptr address : " << p << "ptr value : " << ptI << endl;
7 cout << ptI << endl;
8 return NULL;
9 }
10
11 int main()
12 {
13 struct thread_pool* pool = thread_pool_create(100);
14 int iArg[1000];
15 for(int i = 0; i < 1000; i++)
16 {
17 iArg[i] = i;
18 thread_pool_add_task(pool, task, &iArg[i]);
19 //cout << "thread_pool_add_task id: " << iArg[i] << endl;
20 //cout << __FUNCTION__ << " success, current task id is: " << iArg[i] << "iArg[" << i << "] address : " << &iArg[i] << endl;
21 }
22 thread_pool_destroy(pool);
23 }
小结
很简单很好理解的一段代码,我却写了大半天时间,手残党鉴定完毕。