C++线程池的简单实现方法

时间:2022-11-04 00:15:09

本文以实例形式较为详细的讲述了C++线程池的简单实现方法。分享给大家供大家参考之用。具体方法如下:

一、几个基本的线程函数:

1.线程操纵函数:

?
1
2
3
4
int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg); //创建
void pthread_exit(void *retval);            //终止自身
int pthread_cancel(pthread_t tid);            //终止其他.发送终止信号后目标线程不一定终止,要调用join函数等待
int pthread_join(pthread_t tid, void **retval);   //阻塞并等待其他线程

2.属性:

?
1
2
3
int pthread_attr_init(pthread_attr_t *attr);           //初始化属性
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate); //设置分离状态
int pthread_attr_destroy(pthread_attr_t *attr);           //销毁属性

 

3.同步函数
互斥锁

?
1
2
3
4
5
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); //初始化锁
int pthread_mutex_destroy(pthread_mutex_t *mutex); //销毁锁
int pthread_mutex_lock(pthread_mutex_t *mutex); //加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex); //尝试加锁,上面lock的非阻塞版本
int pthread_mutex_unlock(pthread_mutex_t *mutex); //解锁

4.条件变量

?
1
2
3
4
int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr); //初始化
int pthread_cond_destroy(pthread_cond_t *cond);                 //销毁
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);     //等待条件
int pthread_cond_signal(pthread_cond_t *cond);                 //通知,唤醒第一个调用pthread_cond_wait()而进入睡眠的线程

5.工具函数

?
1
2
3
int pthread_equal(pthread_t t1, pthread_t t2); //比较线程ID
int pthread_detach(pthread_t tid);       //分离线程
pthread_t pthread_self(void);            //自身ID

上述代码中,线程的cancel和join,以及最后的工具函数,这些函数的参数都为结构体变量,其他的函数参数都是结构体变量指针;品味一下,参数为指针的,因为都需要改变结构体的内容,而参数为普通变量的,则只需要读内容即可。

二、线程池代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>  //linux环境中多线程的头文件,非C语言标准库,编译时最后要加 -lpthread 调用动态链接库
 
//工作链表的结构
typedef struct worker {
  void *(*process)(void *arg);  //工作函数
  void *arg;           //函数的参数
  struct worker *next;
}CThread_worker;
 
//线程池的结构
typedef struct {
  pthread_mutex_t queue_lock;   //互斥锁
  pthread_cond_t queue_ready;  //条件变量/信号量
 
  CThread_worker *queue_head;   //指向工作链表的头结点,临界区
  int cur_queue_size;       //记录链表中工作的数量,临界区
 
  int max_thread_num;       //最大线程数
  pthread_t *threadid;      //线程ID
 
  int shutdown;          //开关
}CThread_pool;
 
static CThread_pool *pool = NULL;  //一个线程池变量
int pool_add_worker(void *(*process)(void *arg), void *arg);  //负责向工作链表中添加工作
void *thread_routine(void *arg);  //线程例程
 
//线程池初始化
void pool_init(int max_thread_num)
{
  int i = 0;
 
  pool = (CThread_pool *) malloc (sizeof(CThread_pool));  //创建线程池
 
  pthread_mutex_init(&(pool->queue_lock), NULL);   //互斥锁初始化,参数为锁的地址
  pthread_cond_init( &(pool->queue_ready), NULL);   //条件变量初始化,参数为变量地址
 
  pool->queue_head = NULL;
  pool->cur_queue_size = 0;
 
  pool->max_thread_num = max_thread_num;
  pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
  for (i = 0; i < max_thread_num; i++) {
    pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); //创建线程, 参数为线程ID变量地址、属性、例程、参数
  }
 
  pool->shutdown = 0;
}
 
//例程,调用具体的工作函数
void *thread_routine(void *arg)
{
  printf("starting thread 0x%x\n", (int)pthread_self());
  while(1) {
    pthread_mutex_lock(&(pool->queue_lock));  //从工作链表中取工作,要先加互斥锁,参数为锁地址
 
    while(pool->cur_queue_size == 0 && !pool->shutdown) {    //链表为空
      printf("thread 0x%x is waiting\n", (int)pthread_self());
      pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));  //等待资源,信号量用于通知。会释放第二个参数的锁,以供添加;函数返回时重新加锁。
    }
 
    if(pool->shutdown) {
      pthread_mutex_unlock(&(pool->queue_lock));     //结束开关开启,释放锁并退出线程
      printf("thread 0x%x will exit\n", (int)pthread_self());
      pthread_exit(NULL);   //参数为void *
    }
 
    printf("thread 0x%x is starting to work\n", (int)pthread_self());
 
    --pool->cur_queue_size;
    CThread_worker *worker = pool->queue_head;
    pool->queue_head = worker->next;
 
    pthread_mutex_unlock (&(pool->queue_lock));   //获取一个工作后释放锁
    (*(worker->process))(worker->arg);   //做工作
    free(worker);
    worker = NULL;
  }
  pthread_exit(NULL);
}
 
//销毁线程池
int pool_destroy()
{
  if(pool->shutdown)   //检测结束开关是否开启,若开启,则所有线程会自动退出
    return -1;
  pool->shutdown = 1;
 
  pthread_cond_broadcast( &(pool->queue_ready) );   //广播,唤醒所有线程,准备退出
 
  int i;
  for(i = 0; i < pool->max_thread_num; ++i)
    pthread_join(pool->threadid[i], NULL);   //主线程等待所有线程退出,只有join第一个参数不是指针,第二个参数类型是void **,接收exit的返回值,需要强制转换
  free(pool->threadid);
  CThread_worker *head = NULL;
  while(pool->queue_head != NULL) {      //释放未执行的工作链表剩余结点
    head = pool->queue_head;
    pool->queue_head = pool->queue_head->next;
    free(head);
  }
 
  pthread_mutex_destroy(&(pool->queue_lock));   //销毁锁和条件变量
  pthread_cond_destroy(&(pool->queue_ready));
 
  free(pool);
  pool=NULL;
  return 0;
}
 
void *myprocess(void *arg)
{
  printf("threadid is 0x%x, working on task %d\n", (int)pthread_self(), *(int*)arg);
  sleep (1);
  return NULL;
}
 
//添加工作
int pool_add_worker(void *(*process)(void *arg), void *arg)
{
  CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));
  newworker->process = process;  //具体的工作函数
  newworker->arg = arg;
  newworker->next = NULL;
 
  pthread_mutex_lock( &(pool->queue_lock) );   //加锁
 
  CThread_worker *member = pool->queue_head;   //插入链表尾部
  if( member != NULL ) {
    while( member->next != NULL )
      member = member->next;
    member->next = newworker;
  }
  else {
    pool->queue_head = newworker;
  }
  ++pool->cur_queue_size;
 
  pthread_mutex_unlock( &(pool->queue_lock) );  //解锁
 
  pthread_cond_signal( &(pool->queue_ready) );  //通知一个等待的线程
  return 0;
}
 
int main(int argc, char **argv)
{
  pool_init(3);  //主线程创建线程池,3个线程
 
  int *workingnum = (int *) malloc(sizeof(int) * 10);
  int i;
  for(i = 0; i < 10; ++i) {
    workingnum[i] = i;
    pool_add_worker(myprocess, &workingnum[i]);   //主线程负责添加工作,10个工作
  }
 
  sleep (5);
  pool_destroy();   //销毁线程池
  free (workingnum);
 
  return 0;
}

希望本文所述对大家的C++程序设计有所帮助。