一、Linux线程
进程与线程之间是有区别的,不过Linux内核只提供了轻量进程的支持,未实现线程模型。Linux是一种“多进程单线程”的操作系统。Linux本身只有进程的概念,而其所谓的“线程”本质上在内核里仍然是进程。进程是资源分配的单位,同一进程中的多个线程共享该进程的资源(如作为共享内存的全局变量)。Linux的“线程”只是在被创建时clone了父进程的资源,因此clone出来的进程表现为“线程”。目前Linux中最流行的线程机制为LinuxThreads,所采用的就是线程-进程“一对一”模型,调度交给核心,而在用户级实现一个包括信号处理在内的线程管理机制。
编写的程序与Linuxthread 库相链接即可支持Linux平台上的多线程,在程序中需包含头文件pthread. h,在编译链接时使用命令:
gcc -D -REENTRANT -lpthread xxx. c
线程创建
进程被创建时,系统会为其创建一个主线程,而要在进程中创建新的线程,则可以调用pthread_create:
pthread_create(pthread_t *thread, const pthread_attr_t *attr, void * (start_routine)(void*), void *arg);
start_routine为新线程的入口函数,arg为传递给start_routine的参数。
每个线程都有自己的线程ID,以便在进程内区分。线程ID在pthread_create调用时回返给创建线程的调用者;一个线程也可以在创建后使用pthread_self()调用获取自己的线程ID:
pthread_self (void) ;
线程退出
线程的退出方式有三:
(1)执行完成后隐式退出;
(2)由线程本身显示调用pthread_exit 函数退出;
pthread_exit (void * retval) ;
(3)被其他线程用pthread_cance函数终止:
pthread_cance (pthread_t thread) ;
在某线程中调用此函数,可以终止由参数thread 指定的线程。
如果一个线程要等待另一个线程的终止,可以使用pthread_join函数,该函数的作用是调用pthread_join的线程将被挂起直到线程ID为参数thread的线程终止:
pthread_join (pthread_t thread, void** threadreturn);
二、线程通信
线程互斥
互斥意味着“排它”,即两个线程不能同时进入被互斥保护的代码。Linux下可以通过pthread_mutex_t 定义互斥体机制完成多线程的互斥操作,该机制的作用是对某个需要互斥的部分,在进入时先得到互斥体,如果没有得到互斥体,表明互斥部分被其它线程拥有,此时欲获取互斥体的线程阻塞,直到拥有该互斥体的线程完成互斥部分的操作为止。
下面的代码实现了对共享全局变量x 用互斥体mutex 进行保护的目的:
int x; // 进程中的全局变量 pthread_mutex_t mutex; pthread_mutex_init(&mutex, NULL); //按缺省的属性初始化互斥体变量mutex pthread_mutex_lock(&mutex); // 给互斥体变量加锁 … //对变量x 的操作 phtread_mutex_unlock(&mutex); // 给互斥体变量解除锁
线程同步
同步就是线程等待某个事件的发生。只有当等待的事件发生线程才继续执行,否则线程挂起并放弃处理器。当多个线程协作时,相互作用的任务必须在一定的条件下同步。
Linux下的C语言编程有多种线程同步机制,最典型的是条件变量(condition variable)。pthread_cond_init用来创建一个条件变量,其函数原型为:
pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr);
pthread_cond_wait和pthread_cond_timedwait用来等待条件变量被设置,值得注意的是这两个等待调用需要一个已经上锁的互斥体mutex,这是为了防止在真正进入等待状态之前别的线程有可能设置该条件变量而产生竞争。pthread_cond_wait的函数原型为:
pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex);
pthread_cond_broadcast用于设置条件变量,即使得事件发生,这样等待该事件的线程将不再阻塞:
pthread_cond_broadcast (pthread_cond_t *cond) ;
pthread_cond_signal则用于解除某一个等待线程的阻塞状态:
pthread_cond_signal (pthread_cond_t *cond) ;
pthread_cond_destroy 则用于释放一个条件变量的资源。
pthread_cond_destroy(pthread_cond_t *cond);
在头文件semaphore.h 中定义的信号量则完成了互斥体和条件变量的封装,按照多线程程序设计中访问控制机制,控制对资源的同步访问,提供程序设计人员更方便的调用接口。
sem_init(sem_t *sem, int pshared, unsigned int val);
这个函数初始化一个信号量sem 的值为val,参数pshared 是共享属性控制,表明是否在进程间共享。
sem_wait(sem_t *sem);
调用该函数时,若sem为无状态,调用线程阻塞,等待信号量sem值增加(post )成为有信号状态;若sem为有状态,调用线程顺序执行,但信号量的值减一。
sem_post(sem_t *sem);
调用该函数,信号量sem的值增加,可以从无信号状态变为有信号状态。
生产者线程与消费者线程通过缓冲区发生联系。生产者线程将生产的数据送入缓冲区,消费者线程则从中取出数据。缓冲区大小为BUFFER_SIZE,是一个环形的缓冲池。
//testThreadRingBuffer #include <stdio.h> #include <pthread.h> #define BUFFER_SIZE 16 // 缓冲区大小 #define TEST_DATA_NUM 200 //测试数据数量 #define TEST_OVER -1 //测试结束标志 #define MY_SUCCESS 0 //成功 #define MY_ERROR 1 //失败 typedef struct testThreadRingBuffer_t { // 缓冲区相关数据结构 int nRingBuffer[BUFFER_SIZE]; /* 实际数据存放的数组*/ pthread_mutex_t mutex_lock; /* 互斥体lock 用于对缓冲区的互斥操作 */ int nReadPos; /* 读指针位置 */ int nWritePos; /* 写指针位置 */ pthread_cond_t notEmptyFlag; /* 缓冲区非空的条件变量 */ pthread_cond_t notFullFlag; /* 缓冲区未满的条件变量 */ }testThreadRingBuffer_t; testThreadRingBuffer_t g_ThreadRingBuffer; /* 初始化缓冲区结构 */ void test_thread_ring_buffer_init(testThreadRingBuffer_t *args) { pthread_mutex_init(&args->mutex_lock, NULL); pthread_cond_init(&args->notEmptyFlag, NULL); pthread_cond_init(&args->notFullFlag, NULL); args->nReadPos = 0; args->nWritePos = 0; } /* 缓冲区已满 */ int ring_buffer_is_full(testThreadRingBuffer_t *args) { return ((args->nWritePos + 1) % BUFFER_SIZE == args->nReadPos); } /* 缓冲区已空 */ int ring_buffer_is_empty(testThreadRingBuffer_t *args) { return (args->nWritePos == args->nReadPos); } /* 将DATA放入缓冲区,这里是存入一个整数 */ void put_data_to_thread_ring_buffer(testThreadRingBuffer_t *args, int nData) { pthread_mutex_lock(&args->mutex_lock); /* 等待缓冲区未满 */ if (ring_buffer_is_full(args)) //判定缓冲区已满 { pthread_cond_wait(&args->notFullFlag, &args->mutex_lock); } /* 写数据,并移动指针 */ args->nRingBuffer[args->nWritePos] = nData; args->nWritePos++; /* 如果写到尽头 */ if (args->nWritePos >= BUFFER_SIZE) args->nWritePos = 0; /* 设置缓冲区非空的条件变量 */ pthread_cond_signal(&args->notEmptyFlag); //给一个已有数据信号 可以继续读取 pthread_mutex_unlock(&args->mutex_lock); } /* 从缓冲区中取出DATA */ int get_data_from_thread_ring_buffer(testThreadRingBuffer_t *args) { int nTempData; pthread_mutex_lock(&args->mutex_lock); /* 等待缓冲区非空 */ if (ring_buffer_is_empty(args)) { pthread_cond_wait(&args->notEmptyFlag, &args->mutex_lock); } /* 读数据,移动读指针 */ nTempData = args->nRingBuffer[args->nReadPos]; args->nReadPos++; /* 如果读到尽头 */ if (args->nReadPos >= BUFFER_SIZE) args->nReadPos = 0; /* 设置缓冲区未满的条件变量 */ pthread_cond_signal(&args->notFullFlag); //给一个数据信号 数据已经被消耗 可以继续传数据 pthread_mutex_unlock(&args->mutex_lock); return nTempData; } /* 测试:生产者线程将0 到TEST_DATA_NUM 的整数送入缓冲区,消费者线程从缓冲区中获取整数,两者都打印信息 */ void* producer_thread(void *data) { int i; printf("producer_thread start\n"); for (i = 0; i <= TEST_DATA_NUM; i++) { printf("PUT %d to ringBuffer\n", i); put_data_to_thread_ring_buffer(&g_ThreadRingBuffer, i); } put_data_to_thread_ring_buffer(&g_ThreadRingBuffer, TEST_OVER); printf("producer_thread leave\n"); pthread_exit(0); return MY_SUCCESS; } void* consumer_thread(void *data) { int nConsTempData; printf("consumer_thread start\n"); while (1) { nConsTempData = get_data_from_thread_ring_buffer(&g_ThreadRingBuffer); if (nConsTempData == TEST_OVER) break; printf("GET %d from ringBuffer \n", nConsTempData); } printf("consumer_thread leave\n"); pthread_exit(0); return MY_SUCCESS; } int main(void) { int ret; pthread_t threadId_A, threadId_B; test_thread_ring_buffer_init(&g_ThreadRingBuffer); /* 创建生产者和消费者线程 */ ret = pthread_create(&threadId_A, NULL, producer_thread, NULL); if (ret) { printf("create producer_thread ERROR\n"); return MY_ERROR; } ret = pthread_create(&threadId_B, NULL, consumer_thread, NULL); if (ret) { printf("create consumer_thread ERROR\n"); return MY_ERROR; } /* 等待两个线程结束 */ pthread_join(threadId_A, NULL); pthread_join(threadId_B, NULL); printf("\ninput any character to exit\n"); getchar(); return MY_SUCCESS; }