基本功能
1. 实现一个线程的队列,队列中的线程启动后不再释放;
2. 没有任务执行时,线程处于pending状态,等待唤醒,不占cpu;
3. 当有任务需要执行时,从线程队列中取出一个线程执行任务;
4. 任务执行完成后线程再次进入pending状态,等待唤醒;
扩展功能
1. 线程的队列大小可设置;
2. 最大可创建的线程数可设置;
3. 根据运行需求,按需步进启动线程,避免大量线程一直处于pending状态,占用资源;
关键代码分析
数据结构
1 /* 线程执行的任务参数 */
2 typedef struct
3 {
4 void (*func)(void*, void*); /* 任务函数指针 */
5 void *arg1; /* 任务函数第一个参数 */
6 void *arg2; /* 任务函数第二个参数 */
7 }tThreadTaskInfo;
8
9 /* 线程池参数 */
10 typedef struct
11 {
12 pthread_mutex_t lock; /* 线程池互斥锁 */
13 pthread_cond_t cond; /* 线程池同步信号 */
14
15 pthread_t *threads; /* 保存线程池创建的所有线程 */
16 int32_t threadMaxNum; /* 最大可创建线程数 */
17 int32_t threadStartStep; /* 一次启动线程的个数 */
18 int32_t threadStartCnt; /* 已启动线程个数 */
19 int32_t threadPendCnt; /* 已启动但是处于Pending状态的线程 */
20
21 tThreadTaskInfo *taskQueue; /* 等待执行的任务队列 */
22 int32_t taskQueueSize; /* 任务队列的大小 */
23 int32_t taskQueueHead; /* 当前任务队列头索引 */
24 int32_t taskQueueTail; /* 当前任务队列尾索引 */
25 int32_t taskPendCnt; /* 等待执行的任务个数 */
26
27 int32_t isShutdown; /* 线程池正在关闭 */
28 }tThreadpoolInfo;
创建线程池
- 创建线程池时只分配了存储pthread_t的空间,但是不启动线程,后面根据需求步进启动;
1 /************************************
2 * 创建线程池
3 *
4 * @threadMaxNum -- 最大可创建线程个数
5 * @threadStartStep -- 一次启动线程的个数
6 * @taskQueueSize -- 任务队列的大小
7 *
8 * @Retuen -- 成功:线程池的引用
9 * 失败:NULL
10 * **********************************/
11 tThreadpoolInfo* threadpool_create(
12 int32_t threadMaxNum,
13 int32_t threadStartStep,
14 int32_t taskQueueSize)
15 {
16 tThreadpoolInfo *threadpool = NULL;
17
18 if ((0 >= threadMaxNum)
19 || (0 >= threadStartStep)
20 || (0 >= taskQueueSize))
21 {
22 THREADPOOL_ERR("invalid param.\r\n");
23 goto error_exit;
24 }
25
26 threadpool = (tThreadpoolInfo *)malloc(sizeof(tThreadpoolInfo));
27 if (NULL == threadpool)
28 {
29 THREADPOOL_ERR("malloc threadpool failed.\r\n");
30 goto error_exit;
31 }
32
33 memset(threadpool, 0, sizeof(tThreadpoolInfo));
34 threadpool->threadMaxNum = threadMaxNum;
35 threadpool->threadStartStep = threadStartStep;
36 threadpool->taskQueueSize = taskQueueSize;
37
38 /* 分配线程存储资源 */
39 threadpool->threads = (pthread_t *)calloc(threadMaxNum, sizeof(pthread_t));
40 if (NULL == threadpool->threads)
41 {
42 THREADPOOL_ERR("malloc threads failed.\r\n");
43 goto error_exit;
44 }
45
46 /* 分配任务队列 */
47 threadpool->taskQueue = (tThreadTaskInfo *)calloc(taskQueueSize, sizeof(tThreadTaskInfo));
48 if (NULL == threadpool->taskQueue)
49 {
50 THREADPOOL_ERR("malloc task queue failed.\r\n");
51 goto error_exit;
52 }
53
54 /* 初始化互斥信号量和同步信号 */
55 if (0 != THREADPOOL_LOCK_INIT(threadpool))
56 {
57 THREADPOOL_ERR("mutex init failed.\r\n");
58 goto error_exit;
59 }
60
61 if (0 != THREADPOOL_COND_INIT(threadpool))
62 {
63 THREADPOOL_ERR("cond init failed.\r\n");
64 goto error_exit;
65 }
66
67 return threadpool;
68
69 error_exit:
70
71 if (threadpool != NULL)
72 {
73 threadpool_free(threadpool);
74 }
75
76 return NULL;
77 }
向线程池添加任务
- 查看等待队列是否有空闲,如果没有空闲则返回错误;
- 查看当前有没有处于pending的线程,如果没有则按照步进启动新的线程,如果已达到最大线程数则返回错误;
- 将任务添加到队列中,并唤醒一个线程执行任务;
1 /************************************
2 * 向线程池添加任务
3 *
4 * @threadpool -- 线程池引用
5 * @taskfunc -- 任务回调函数
6 * @arg1 -- 任务第一个参数
7 * @arg1 -- 任务第二个参数
8 *
9 * @Return -- 成功: 0
10 * 失败: -1
11 * **********************************/
12 int32_t threadpool_addtask(
13 tThreadpoolInfo *threadpool,
14 THREADPOOLTASKFUNC taskfunc,
15 void *arg1,
16 void *arg2)
17 {
18 int32_t ret = 0;
19
20 if ((NULL == threadpool) || (NULL == taskfunc))
21 {
22 THREADPOOL_ERR("invalid param.\r\n");
23 return -1;
24 }
25
26 THREADPOOL_LOCK(threadpool);
27
28 do
29 {
30 if (threadpool->isShutdown)
31 {
32 THREADPOOL_ERR("threadpool is shutdown.\r\n");
33 ret = -1;
34 break;
35 }
36
37 /* 判断等待执行的任务队列是否满 */
38 if (threadpool->taskPendCnt == threadpool->taskQueueSize)
39 {
40 THREADPOOL_ERR("task queue is full.\r\n");
41 ret = -1;
42 break;
43 }
44
45 /* 如果pending状态的线程已用完,则启动新的线程 */
46 if (threadpool->threadPendCnt <= 0)
47 {
48 if (0 != threadpool_start(threadpool))
49 {
50 ret = -1;
51 break;
52 }
53 }
54
55 /* 将任务放入对尾 */
56 threadpool->taskQueue[threadpool->taskQueueTail].func = taskfunc;
57 threadpool->taskQueue[threadpool->taskQueueTail].arg1 = arg1;
58 threadpool->taskQueue[threadpool->taskQueueTail].arg2 = arg2;
59
60 threadpool->taskQueueTail = (threadpool->taskQueueTail + 1) % threadpool->taskQueueSize;
61 threadpool->taskPendCnt++;
62
63 /* 唤醒一个线程执行任务 */
64 THREADPOOL_COND_SIGNAL(threadpool);
65
66 } while(0);
67
68 THREADPOOL_UNLOCK(threadpool);
69 return ret;
70 }
线程的回调函数
- 线程第一次启动和被唤醒后检查队列中是否有需要执行的任务,如果没有则继续等待唤醒;
- 如果有需要执行的任务,则从队列中取一个任务并执行;
- 如果线程池已销毁,则退出线程;
1 /************************************
2 * 线程回调函数
3 * 等待线程池分配任务并执行分配的任务
4 *
5 * @arg -- 线程池引用
6 * **********************************/
7 void* thread_callback(void *arg)
8 {
9 tThreadpoolInfo *threadpool = (tThreadpoolInfo *)arg;
10 tThreadTaskInfo task;
11
12 while (1)
13 {
14 THREADPOOL_LOCK(threadpool);
15
16 /* 等待任务分配的信号
17 * 如果当前没有等待执行的任务,并且线程池没有关闭则继续等待信号 */
18 while ((0 == threadpool->taskPendCnt)
19 && (0 == threadpool->isShutdown))
20 {
21 THREADPOOL_COND_WAIT(threadpool);
22 }
23
24 /* 如果线程池已关闭,则退出线程 */
25 if (threadpool->isShutdown)
26 break;
27
28 /* 取任务队列中当前第一个任务 */
29 task.func = threadpool->taskQueue[threadpool->taskQueueHead].func;
30 task.arg1 = threadpool->taskQueue[threadpool->taskQueueHead].arg1;
31 task.arg2 = threadpool->taskQueue[threadpool->taskQueueHead].arg2;
32
33 threadpool->taskQueueHead = (threadpool->taskQueueHead + 1) % threadpool->taskQueueSize;
34 threadpool->taskPendCnt--;
35 threadpool->threadPendCnt--;
36
37 THREADPOOL_UNLOCK(threadpool);
38
39 /* 执行任务 */
40 (*(task.func))(task.arg1, task.arg2);
41
42 /* 任务执行完成后,线程进入pending状态 */
43 THREADPOOL_LOCK(threadpool);
44 threadpool->threadPendCnt++;
45 THREADPOOL_UNLOCK(threadpool);
46 }
47
48 threadpool->threadStartCnt--;
49 THREADPOOL_UNLOCK(threadpool);
50
51 pthread_exit(NULL);
52 }
线程池销毁
- 销毁为确保资源释放,需要唤醒所有线程,并等待所有线程退出;
1 /************************************
2 * 删除线程池
3 *
4 * @threadpool -- 线程池引用
5 * **********************************/
6 int32_t threadpool_destroy(tThreadpoolInfo *threadpool)
7 {
8 int32_t ret = 0;
9 int32_t i = 0;
10
11 if (NULL == threadpool)
12 {
13 THREADPOOL_ERR("invalid param.\r\n");
14 return -1;
15 }
16
17 THREADPOOL_LOCK(threadpool);
18
19 do
20 {
21 if (threadpool->isShutdown)
22 {
23 THREADPOOL_UNLOCK(threadpool);
24 break;
25 }
26
27 threadpool->isShutdown = 1;
28
29 /* 唤醒所有线程 */
30 if (0 != THREADPOOL_COND_BROADCAST(threadpool))
31 {
32 THREADPOOL_ERR("cond broadcast failed.\r\n");
33 threadpool->isShutdown = 0;
34 continue;
35 }
36
37 THREADPOOL_UNLOCK(threadpool);
38
39 /* 等待所有进程退出 */
40 for (i = 0; i < threadpool->threadStartCnt; i++)
41 {
42 pthread_cancel(threadpool->threads[i]);
43 pthread_join(threadpool->threads[i], NULL);
44 }
45
46 }while(0);
47
48 if (0 != ret)
49 {
50 threadpool->isShutdown = 0;
51 return ret;
52 }
53
54 threadpool_free(threadpool);
55 return ret;
56 }
线程池测试
- 创建最大线程数=256,队列大小=64,启动步进=8 的线程池;
- 向线程池添加1024个任务,如果添加失败则等待1秒再添加;
- 验证1024个任务是否均能执行;
1 /***********************************
2 * Filename : test_main.c
3 * Author : taopeng
4 * *********************************/
5
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10
11 #include "threadpool.h"
12
13 void test_task(void *arg)
14 {
15 long id = (long)arg;
16
17 printf("task[%ld] enter\r\n", id);
18 sleep(3);
19
20 return;
21 }
22
23 int32_t main(int32_t argc, char *argv[])
24 {
25 tThreadpoolInfo *threadpool;
26 long id;
27
28 threadpool = threadpool_create(128, 8, 64);
29 if (NULL == threadpool)
30 return -1;
31
32 for (id = 1; id <= 1024;)
33 {
34 if (0 != threadpool_addtask(threadpool, (THREADPOOLTASKFUNC)test_task, (void *)id, NULL))
35 {
36 sleep(1);
37 continue;
38 }
39
40 id++;
41 }
42
43 sleep(30);
44
45 threadpool_destroy(threadpool);
46 return 0;
47 }
代码实例链接
https://gitee.com/github-18274965/threadpool.git