C语言实现支持动态拓展和销毁的线程池

时间:2022-10-29 22:02:28

本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下

实现功能

  • 1.初始化指定个数的线程
  • 2.使用链表来管理任务队列
  • 3.支持拓展动态线程
  • 4.如果闲置线程过多,动态销毁部分线程
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <signal.h>
 
 
/*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/
typedef struct thread_worker_s{
  void *(*process)(void *arg); //处理函数
  void *arg;          //参数
  struct thread_worker_s *next;
}thread_worker_t;
 
#define bool int
#define true 1
#define false 0
 
/*线程池中各线程状态描述*/
#define THREAD_STATE_RUN        0
#define THREAD_STATE_TASK_WAITING   1
#define THREAD_STATE_TASK_PROCESSING  2
#define THREAD_STATE_TASK_FINISHED   3
#define THREAD_STATE_EXIT       4  
 
 
typedef struct thread_info_s{
  pthread_t id;
  int    state;
  struct thread_info_s *next;
}thread_info_t;
 
static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"};
/*线程压缩的时候只有 0,1,2,4 状态的线程可以销毁*/
 
 
/*线程池管理器*/
#define THREAD_BUSY_PERCENT 0.5  /*线程:任务 = 1:2 值越小,说明任务多,增加线程*/
#define THREAD_IDLE_PERCENT 2   /*线程:任务 = 2:1 值大于1,线程多于任务,销毁部分线程*/
 
typedef struct thread_pool_s{
  pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁
  pthread_cond_t queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了
 
  thread_worker_t *head   ;    //任务队列头指针
  bool    is_destroy   ;    //线程池是否已经销毁
  int num;              //线程的个数
  int rnum;         ;    //正在跑的线程
  int knum;         ;    //已杀死的线程
  int queue_size       ;    //工作队列的大小
  thread_info_t *threads   ;    //线程组id,通过pthread_join(thread_ids[0],NULL) 来执行线程
  pthread_t   display   ;    //打印线程
  pthread_t   destroy   ;    //定期销毁线程的线程id
  pthread_t   extend    ;
  float percent       ;    //线程个数于任务的比例 rnum/queue_size
  int  init_num      ;
  pthread_cond_t  extend_ready     ;    //如果要增加线程
}thread_pool_t;
 
/*-------------------------函数声明----------------------*/
/**
 * 1.初始化互斥变量
 * 2.初始化等待变量
 * 3.创建指定个数的线程线程
 */
thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);
 
 
/*调试函数*/
void debug(char *message,int flag){
  if(flag)
    printf("%s\n",message);
}
 
void *display_thread(void *arg);
/**
 * 添加任务包括以下几个操作
 * 1.将任务添加到队列末尾
 * 2.通知等待进程来处理这个任务 pthread_cond_singal();
*/
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //网线程池的队列中增加一个需要执行的函数,也就是任务
 
/**
 * 销毁线程池,包括以下几个部分
 * 1.通知所有等待的进程 pthread_cond_broadcase
 * 2.等待所有的线程执行完
 * 3.销毁任务列表
 * 4.释放锁,释放条件
 * 4.销毁线程池对象
 */
 
 
 
void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);
 
 
thread_pool_t *thread_pool_create(int num){
  if(num<1){
    return NULL;
  }
  thread_pool_t *p;
  p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
  if(p==NULL)
    return NULL;
  p->init_num = num;
  /*初始化互斥变量与条件变量*/
  pthread_mutex_init(&(p->queue_lock),NULL);
  pthread_cond_init(&(p->queue_ready),NULL);
 
  /*设置线程个数*/
  p->num  = num;
  p->rnum = num;
  p->knum = 0;
 
  p->head = NULL;
  p->queue_size =0;
  p->is_destroy = false;
 
   
  int i=0;
  thread_info_t *tmp=NULL;
  for(i=0;i<num;i++){
    /*创建线程*/
    tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
    if(tmp==NULL){
      free(p);
      return NULL;
    }else{
      tmp->next = p->threads;
      p->threads = tmp;
    }
    pthread_create(&(tmp->id),NULL,thread_excute_route,p);
    tmp->state = THREAD_STATE_RUN;
  }
 
  /*显示*/
  pthread_create(&(p->display),NULL,display_thread,p);
  /*检测是否需要动态线程*/
  //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
  /*动态销毁*/
  pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
  return p;
}
 
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
  thread_pool_t *p= pool;
  thread_worker_t *worker=NULL,*member=NULL;
  worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
  int incr=0;
  if(worker==NULL){
    return -1;
  }
  worker->process = process;
  worker->arg   = arg;
  worker->next  = NULL;
  thread_pool_is_need_extend(pool);
  pthread_mutex_lock(&(p->queue_lock));
  member = p->head;
  if(member!=NULL){
    while(member->next!=NULL)
      member = member->next;
    member->next = worker;
  }else{
    p->head = worker;
  }
  p->queue_size ++;
  pthread_mutex_unlock(&(p->queue_lock));
  pthread_cond_signal(&(p->queue_ready));
  return 1;
}
 
 
void thread_pool_wait(thread_pool_t *pool){
  thread_info_t *thread;
  int i=0;
  for(i=0;i<pool->num;i++){
    thread = (thread_info_t*)(pool->threads+i);
    thread->state = THREAD_STATE_EXIT;
    pthread_join(thread->id,NULL);
  }
}
void thread_pool_destory(thread_pool_t *pool){
  thread_pool_t  *p   = pool;
  thread_worker_t *member = NULL;
 
  if(p->is_destroy)
    return ;
  p->is_destroy = true;
  pthread_cond_broadcast(&(p->queue_ready));
  thread_pool_wait(pool);
  free(p->threads);
  p->threads = NULL;
  /*销毁任务列表*/
  while(p->head){
    member = p->head;
    p->head = member->next;
    free(member);
  }
  /*销毁线程列表*/
  thread_info_t *tmp=NULL;
  while(p->threads){
    tmp = p->threads;
    p->threads = tmp->next;
    free(tmp);
  }
 
  pthread_mutex_destroy(&(p->queue_lock));
  pthread_cond_destroy(&(p->queue_ready));
  return ;
}
/*通过线程id,找到对应的线程*/
thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
  thread_info_t *thread=NULL;
  thread_info_t *p=pool->threads;
  while(p!=NULL){
    if(p->id==id)
      return p;
    p = p->next;
  }
  return NULL;
}
 
 
/*每个线程入口函数*/
void *thread_excute_route(void *arg){
  thread_worker_t *worker = NULL;
  thread_info_t  *thread = NULL;
  thread_pool_t*  p = (thread_pool_t*)arg;
  //printf("thread %lld create success\n",pthread_self());
  while(1){
    pthread_mutex_lock(&(p->queue_lock));
 
    /*获取当前线程的id*/
    pthread_t pthread_id = pthread_self();
    /*设置当前状态*/
    thread = get_thread_by_id(p,pthread_id);
 
    /*线程池被销毁,并且没有任务了*/
    if(p->is_destroy==true && p->queue_size ==0){
      pthread_mutex_unlock(&(p->queue_lock));
      thread->state = THREAD_STATE_EXIT;
      p->knum ++;
      p->rnum --;
      pthread_exit(NULL);
    }
    if(thread){
      thread->state = THREAD_STATE_TASK_WAITING; /*线程正在等待任务*/
    }
    /*线程池没有被销毁,没有任务到来就一直等待*/
    while(p->queue_size==0 && !p->is_destroy){
      pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
    }
    p->queue_size--;
    worker = p->head;
    p->head = worker->next;
    pthread_mutex_unlock(&(p->queue_lock));
     
 
    if(thread)
      thread->state = THREAD_STATE_TASK_PROCESSING; /*线程正在执行任务*/
    (*(worker->process))(worker->arg);
    if(thread)
      thread->state = THREAD_STATE_TASK_FINISHED;  /*任务执行完成*/
    free(worker);
    worker = NULL;
  }
}
 
 
 
/*拓展线程*/
void *thread_pool_is_need_extend(void *arg){
  thread_pool_t *p = (thread_pool_t *)arg;
  thread_pool_t *pool = p;
  /*判断是否需要增加线程,最终目的 线程:任务=1:2*/
  if(p->queue_size>100){
    int incr =0;
    if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
      incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*计算需要增加线程个数*/
      int i=0;
      thread_info_t *tmp=NULL;
      thread_pool_t *p = pool;
      pthread_mutex_lock(&pool->queue_lock);
      if(p->queue_size<100){
        pthread_mutex_unlock(&pool->queue_lock);
        return ;
      }
      for(i=0;i<incr;i++){
        /*创建线程*/
        tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));
        if(tmp==NULL){
          continue;
        }else{
          tmp->next = p->threads;
          p->threads = tmp;
        }
        p->num ++;
        p->rnum ++;
        pthread_create(&(tmp->id),NULL,thread_excute_route,p);
        tmp->state = THREAD_STATE_RUN;
      }
      pthread_mutex_unlock(&pool->queue_lock);
    }
  }
  //pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;
/*恢复初始线程个数*/
void *thread_pool_is_need_recovery(void *arg){
  thread_pool_t *pool = (thread_pool_t *)arg;
  int i=0;
  thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
  /*如果没有任务了,当前线程大于初始化的线程个数*/
  while(1){
    i=0;
    if(pool->queue_size==0 && pool->rnum > pool->init_num ){
      sleep(5);
      /*5s秒内还是这个状态的话就,销毁部分线程*/
      if(pool->queue_size==0 && pool->rnum > pool->init_num ){
        pthread_mutex_lock(&pool->queue_lock);
        tmp = pool->threads;
        while((pool->rnum != pool->init_num) && tmp){
          /*找到空闲的线程*/
          if(tmp->state != THREAD_STATE_TASK_PROCESSING){
            i++;
            if(prev)
              prev->next  = tmp->next;
            else
              pool->threads = tmp->next;
            pool->rnum --; /*正在运行的线程减一*/
            pool->knum ++; /*销毁的线程加一*/
            kill(tmp->id,SIGKILL); /*销毁线程*/
            p1 = tmp;
            tmp = tmp->next;
            free(p1);
            continue;
          }
          prev = tmp;
          tmp = tmp->next;
        }
        pthread_mutex_unlock(&pool->queue_lock);
        printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程\n",i);
      }
    }
    sleep(5);
  }
}
 
 
 
/*打印一些信息的*/
void *display_thread(void *arg){
  thread_pool_t *p =(thread_pool_t *)arg;
  thread_info_t *thread = NULL;
  int i=0;
  while(1){
    printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum);  /*线程总数,正在跑的,已销毁的*/
    thread = p->threads;
    while(thread){
      printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]);
      thread = thread->next;
    }
    sleep(5);
  }
}

希望本文所述对大家学习C语言程序设计有所帮助。