#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h> typedef void* (*fun)(void*); fun fun1, fun2; pthread_mutex_t pmu = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond;
pthread_t pid1, pid2;
int flag = ;
int gnum = ;
int gsub = ; void * func1(void * para)
{
int k = (int)para;
printf("func1, ******\n");
while(gnum<=)
{
pthread_mutex_lock(&pmu);
printf("gnum == %d", gnum);
while(gnum==)
{
printf("suspend thread1 at gnum==50 !!! \n");
pthread_cond_wait(&cond, &pmu);
gnum++;
}
++gnum;
++flag;
++k;
//printf("flag = %d, k = %d\n", flag, k);
pthread_mutex_unlock(&pmu);
printf("I am func1\n");
}
pthread_exit((void*)); } void * func2(void * para)
{
int f = (int)para;
printf("f == %d\n", f);
printf("pthread2 start running !\n");
void * ret = NULL;
while(gsub>=)
{
pthread_mutex_lock(&pmu);
gsub--;
printf("gsub= %d ", gsub);
if(gsub == )
{
printf("now gsnb ==20, and send signal\n");
pthread_cond_signal(&cond);
}
++flag;
++f;
printf("flag = %d, f = %d\n", flag, f);
pthread_mutex_unlock(&pmu);
printf("I am func2 \n");
}
//pthread_join(pid1, &ret);
pthread_exit((void*));
} int main()
{
int id = ;
void * ret = NULL;
int key = ; pthread_cond_init(&cond, NULL); //属性设置NULL默认属性
id = pthread_create(&pid1, NULL, func1, (void*)key);
if(id != )
{
printf("pthread_create error !\n");
exit();
} if(pthread_create(&pid2, NULL, func2, (void*)key))
{
printf("pthread_create error ! \n");
exit();
}
87 pthread_join(pid2, &ret); //等待pid2线程退出
88 pthread_join(pid1, &ret); //等待pid1线程退出 //pthread_detach(pid1); //主线程与pid1线程进行分离,一般用来实现异步返回
//pthread_detach(pid2); //同上
pthread_exit((void*));
}
gcc test_thread.c -lpthread
./a.out
线程池实例代码:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h> typedef struct worker
{
//回调函数,任务运行时会调用此函数,也可以声明为其他形式;
void * (*process)(void *arg); //该函数返回值是任意类型的;参数也是任意类型的;`
void *arg; //回调函数的参数;
struct worker *next;
}CThread_worker; //线程池结构
typedef struct
{
pthread_mutex_t queue_lock; //互斥量
pthread_cond_t queue_ready; //条件变量 //链表结构, 线程池中所有等待任务
CThread_worker *queue_head; //是否销毁线程池
int shutdown;
pthread_t *threadid; //线程池中允许的活动线程数目;
//线程池中允许的活动线程数目;
int max_thread_num;
//当前等待队列的任务数目;
int cur_queue_size; }CThread_pool; int pool_add_worker(void * (*process)(void *arg), void *arg);
void * thread_routine(void *arg); static CThread_pool *pool = NULL;
void pool_init(int max_thread_num)
{
pool = (CThread_pool*)malloc(sizeof(CThread_pool)); //初始化互斥量;
pthread_mutex_init(&(pool->queue_lock), NULL);
//初始化条件变量
pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; //最大线程数目
pool->max_thread_num = max_thread_num;
//当前线程数目
pool->cur_queue_size = ; pool->shutdown = ;
pool->threadid = (pthread_t*)malloc(max_thread_num * sizeof(pthread_t));
int i = ;
for(i=; i<max_thread_num;i++)
{
pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);
}
} //向线程池中加入任务
int pool_add_worker(void*(*process)(void *arg), void *arg)
{
//构建一个新任务
CThread_worker *newworker = (CThread_worker *)malloc(sizeof(CThread_worker));
newworker->process = process;
newworker->arg = arg;
//别忘了置空
newworker->next = NULL; //加锁互斥量
pthread_mutex_lock(&(pool->queue_lock));
//将任务加入到等待队列中
CThread_worker *member = pool->queue_head;
if(member !=NULL)
{
while(member->next != NULL)
member = member->next;
member->next = newworker;
}
else
{
pool->queue_head = newworker;
} assert(pool->queue_head != NULL);
pool->cur_queue_size++;
pthread_mutex_unlock(&(pool->queue_lock)); //好了,等待队列中有任务了,唤醒一个等待线程;
// 注意如果所有线程都在忙碌,这句没有任何作用
pthread_cond_signal(&(pool->queue_ready));
return ;
} /*销毁线程池,等待队列中的任务不会再被执行,
*但是正在运行的线程会一直 把任务运行完后 再退出;
*/ int pool_destroy()
{
if(pool->shutdown)
return -; //防止两次调用
pool->shutdown = ; //唤醒所有等待线程,线程池要销毁了
pthread_cond_broadcast(&(pool->queue_ready)); //阻塞等待线程退出, 否则就成僵尸了
int i;
for(i=; i<pool->max_thread_num; i++)
{
pthread_join(pool->threadid[i], NULL);
} free(pool->threadid); //销毁等待队列
CThread_worker *head = NULL;
while(pool->queue_head != NULL)
{
head=pool->queue_head;
pool->queue_head = pool->queue_head->next;
free(head);
} //条件变量和互斥量也别忘了销毁
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready)); free(pool);
/*销毁后指针置空是个好习惯*/
pool = NULL;
return ;
} void* thread_routine(void *arg)
{
printf("start thread 0x%x\n", pthread_self());
while()
{
pthread_mutex_lock(&(pool->queue_lock));
/*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
*pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/
while(pool->cur_queue_size == && !pool->shutdown)
{
printf("thread 0x%x is waiting \n", pthread_self());
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
} //线程池要销毁了;
if(pool->shutdown)
{
//遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
pthread_mutex_unlock(&(pool->queue_lock));
printf("thread 0x %x will exit \n", pthread_self());
pthread_exit(NULL);
} printf("thread 0x %x is starting to work \n", pthread_self()); //使用断言
assert(pool->cur_queue_size!= );
assert(pool->queue_head!= NULL); //等待队列长度减去1,并取出链表中的头元素
pool->cur_queue_size--;
CThread_worker *worker = pool->queue_head;
pool->queue_head = worker->next;
pthread_mutex_unlock(&(pool->queue_lock)); //调用回调函数,执行任务
(*(worker->process))(worker->arg);
free(worker);
worker = NULL;
}
//这一句正常情况下是不可达的
pthread_exit(NULL);
} //test code
void *myprocess(void *arg)
{
printf("threadid is 0x%x, working on task %d\n", pthread_self(), *(int*)arg);
sleep(); //休息一秒,延长任务的执行时间
return NULL;
} int main(int argc, char** argv)
{
pool_init(); /*线程池中最多三个活动线程*/ //连续向线程池中放入10个任务;
int *workingnum = (int*)malloc(sizeof(int)*);
int i;
for(i=; i< ;i++)
{
workingnum[i] = i;
pool_add_worker(myprocess, &workingnum[i]);
} sleep();
//销毁线程池;
pool_destroy();
free(workingnum); return ;
}