一、Linux进程与线程概述
进程与线程
为什么对于大多数合作性任务,多线程比多个独立的进程更优越呢?这是因为,线程共享相同的内存空间。不同的线程可以存取内存中的同一个变量。所以,程序中的所有线程都可以读或写声明过的全局变量。如果曾用fork() 编写过重要代码,就会认识到这个工具的重要性。为什么呢?虽然fork() 允许创建多个进程,但它还会带来以下通信问题:如何让多个进程相互通信,这里每个进程都有各自独立的内存空间。对这个问题没有一个简单的答案。虽然有许多不同种类的本地IPC (进程间通信),但它们都遇到两个重要障碍:
- 强加了某种形式的额外内核开销,从而降低性能。
- 对于大多数情形,IPC不是对于代码的“自然”扩展。通常极大地增加了程序的复杂性。
双重坏事: 开销和复杂性都非好事。如果曾经为了支持 IPC而对程序大动干戈过,那么您就会真正欣赏线程提供的简单共享内存机制。由于所有的线程都驻留在同一内存空间,POSIX线程无需进行开销大而复杂的长距离调用。只要利用简单的同步机制,程序中所有的线程都可以读取和修改已有的数据结构。而无需将数据经由文件描述符转储或挤入紧窄的共享内存空间。仅此一个原因,就足以让您考虑应该采用单进程/多线程模式而非多进程/单线程模式。
为什么要用线程?
与标准 fork()相比,线程带来的开销很小。内核无需单独复制进程的内存空间或文件描述符等等。这就节省了大量的CPU时间,使得线程创建比新进程创建快上十到一百倍。因为这一点,可以大量使用线程而无需太过于担心带来的CPU 或内存不足。使用 fork() 时导致的大量 CPU占用也不复存在。这表示只要在程序中有意义,通常就可以创建线程。
当然,和进程一样,线程将利用多CPU。如果软件是针对多处理器系统设计的,这就真的是一大特性(如果软件是开放源码,则最终可能在不少平台上运行)。特定类型线程程序(尤其是CPU密集型程序)的性能将随系统中处理器的数目几乎线性地提高。如果正在编写CPU非常密集型的程序,则绝对想设法在代码中使用多线程。一旦掌握了线程编码,无需使用繁琐的IPC和其它复杂的通信机制,就能够以全新和创造性的方法解决编码难题。所有这些特性配合在一起使得多线程编程更有趣、快速和灵活。
什么是线程?
- 专业点的说法,线程被定义为一个独立的指令流,它本身的运转由操作系统来安排,但是,这意味着什么呢?
- 对软件开发者来说,解释线程最好的描述就是“procedure”可以独立于主程序运行。
- 再进一步,设想一个包含了大量procedure的主程序,然后想象所有这些procedure在操作系统的安排下一起或者独立的运行,这就是对于多线程程序的一个简单描述。
- 问题是,它是如何实现的呢?
- 在弄懂线程之前,第一步要搞清楚Unix进程。进程被操作系统创建,并需要相当多的“开支”,进程包含如下程序资源和程序执行状态信息:
- 进程ID,进程群组ID,用户ID,群组ID
- 环境
- 工作目录
- 程序指令
- 寄存器
- 栈
- 堆
- 文件描述符
- 信号动作
- 共享库
- 进程间通信工具(例如消息队列,管道,信号量,共享内存)
Unix进程 Unix进程内部的线程
- 线程使用和在进程内的生存,仍由操作系统来安排并且独立的实体来运行,很大程度上是因为它们为可执行代码的存在复制了刚刚好的基本资源。
- 这个独立的控制流之所以可以实现,是因为线程维护着如下的东西:
- 栈指针
- 寄存器
- 调度属性(例如规则和优先级)
- 等待序列和阻塞信号
- 线程拥有的数据
- 所以,总的来说,Unix环境里的线程有如下特点:
- 它生存在进程中,并使用进程资源;
- 拥有它自己独立的控制流,前提是只要它的父进程还存在,并且OS支持它;
- 它仅仅复制可以使它自己调度的必要的资源;
- 它可能会同其它与之同等独立的线程分享进程资源;
- 如果父进程死掉那么它也会死掉——或者类似的事情;
- 它是轻量级的,因为大部分的开支已经在它的进程创建时完成了。
- 因为在同一进程内的线程分享资源,所以:
- 一个线程对共享的系统资源做出的改变(例如关闭一个文件)会被所有的其它线程看到;
- 指向同一地址的两个指针的数据是相同的;
- 对同一块内存进行读写操作是可行的,但需要程序员作明确的同步处理操作。
二、Pthreads概述
什么是Pthreads?
- 在过去,硬件提供商会去实现线程的硬件专用版本。这些线程的实现彼此会有很大的差异,所以会使得程序员开发可移植的线程应用程序非常困难。
- 为了充分利用线程的特性,我们需要一个标准的线程编程接口:
- 对于UNIX系统,这个接口已经被IEEE POSIX 1003.1c标准(1995)所指定;
- 这套标准的实现被称为POSIX threads,或者Pthreads;
- 现在大部分的硬件提供商都提供他们专有的API之外,还会有Pthreads库。
- POSIX标准在不停的进化和修改,包括Pthreads的规范。它的最新版本是IEEE Std 1003.1,2004版本。
- 一些有用的链接:
- POSIX FAQs: www.opengroup.org/austin/papers/posix_faq.html
- Download the Standard: www.unix.org/version3/ieee_std.html
- Pthreads库被定义为一系列的C语言程序类型和过程调用,是用一个pthreads.h的include头文件和一个线程库(尽管这个库是另一个库的一部分,就像libc一样)来实现的。
为什么要用Pthreads?
- 使用Pthreads的主要的目的是,它使获得潜在的程序执行性能变成现实;
- 当与创建和管理进程的代价相比较时,线程被创建时只需要更少的系统开支,管理线程比管理进程需要更少的系统资源;
- 例如,下面的表格比较的是fork()与pthreads_create():
Platform | fork() | pthread_create() | ||||
---|---|---|---|---|---|---|
real | user | sys | real | user | sys | |
AMD 2.3 GHz Opteron (16cpus/node) | 12.5 | 1.0 | 12.5 | 1.2 | 0.2 | 1.3 |
AMD 2.4 GHz Opteron (8cpus/node) | 17.6 | 2.2 | 15.7 | 1.4 | 0.3 | 1.3 |
IBM 4.0 GHz POWER6 (8cpus/node) | 9.5 | 0.6 | 8.8 | 1.6 | 0.1 | 0.4 |
IBM 1.9 GHz POWER5 p5-575 (8cpus/node) | 64.2 | 30.7 | 27.6 | 1.7 | 0.6 | 1.1 |
IBM 1.5 GHz POWER4 (8cpus/node) | 104.5 | 48.6 | 47.2 | 2.1 | 1.0 | 1.5 |
INTEL 2.4 GHz Xeon (2 cpus/node) | 54.9 | 1.5 | 20.8 | 1.6 | 0.7 | 0.9 |
INTEL 1.4 GHz Itanium2 (4 cpus/node) | 54.5 | 1.1 | 22.2 | 2.0 | 1.2 | 0.6 |
测试代码: ==============================================================================
C Code for fork() creation test
==============================================================================
#include <stdio.h>
#include <stdlib.h>
#define NFORKS 50000 void do_nothing() {
int i;
i= ;
} int main(int argc, char *argv[]) {
int pid, j, status; for (j=; j<NFORKS; j++) { /*** error handling ***/
if ((pid = fork()) < ) {
printf ("fork failed with error code= %d\n", pid);
exit();
} /*** this is the child of the fork ***/
else if (pid ==) {
do_nothing();
exit();
} /*** this is the parent of the fork ***/
else {
waitpid(pid, status, );
}
}
} ==============================================================================
C Code for pthread_create() test
==============================================================================
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h> #define NTHREADS 50000 void *do_nothing(void *null) {
int i;
i=;
pthread_exit(NULL);
} int main(int argc, char *argv[]) {
int rc, i, j, detachstate;
pthread_t tid;
pthread_attr_t attr; pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (j=; j<NTHREADS; j++) {
rc = pthread_create(&tid, &attr, do_nothing, NULL);
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-);
} /* Wait for the thread */
rc = pthread_join(tid, NULL);
if (rc) {
printf("ERROR; return code from pthread_join() is %d\n", rc);
exit(-);
}
} pthread_attr_destroy(&attr);
pthread_exit(NULL); }
- 进程内的所有线程共享相同的地址空间,在许多案例中跨线程通讯比跨进程更为有效率,应用也更为简单;
- 在许多方面,线程化的应用程序比未使用线程的程序提供了有更高的性能和实用性:
- CPU与I/O的重叠协作:例如,一个程序可能分多段对I/O进行长操作,当一个线程正在等待一个I/O系统调用完成时,CPU可以用其它线程进行现有的密集工作;
- 优先级/实时调度:可预定更重要的任务取代或者中断低优先级的任务;
- 异步事件处理:一些不确定次数和持续时间的服务事件是交叉执行的任务。例如,Web服务器可以在应答前一个传输数据的请求时候,处理新的数据请求。
- 在SMP架构上使用Pthreads最主要的目的就是获得最佳的执行性能。特别是,如果程序使用MPI做on-node通讯,那么使用Pthreads代替on-node数据传输,会获得很大的执行效率的提升。
- MPI库经常通过共享内存来实现on-node任务通讯,这样,就必须至少调用一次内存拷贝操作(进程对进程的);
- 对于Pthreads,不存在中间(intermediate)的内存拷贝,因为在一个进程中,线程共享相同的地址空间。本身也没有数据传输,它成为了一个从高速缓存到CPU或是内存到CPU带宽式的传输情况,它们的速度更快;
- 以下是比较列表:
Platform MPI Shared Memory Bandwidth
(GB/sec)Pthreads Worst Case
Memory-to-CPU Bandwidth
(GB/sec)AMD 2.3 GHz Opteron 1.8 5.3 AMD 2.4 GHz Opteron 1.2 5.3 IBM 1.9 GHz POWER5 p5-575 4.1 16 IBM 1.5 GHz POWER4 2.1 4 Intel 2.4 GHz Xeon 0.3 4.3 Intel 1.4 GHz Itanium 2 1.8 6.4
附注:
SMP——Symmetrical Multi-Processing,对称多处理系统;
MPI——Message Passing Interface,参照http://www-unix.mcs.anl.gov/mpi/mpich/。
三、POXI线程的创建与取消
创建线程函数——pthread_create()
#include <pthread.h>
int pthread_create( pthread_t* thread, const pthread_attr_t* attr, void* (*start_routine)(void* ), void* arg ); 描述:
pthread_create()函数创建一个新的线程,通过线程属性对象attr指定属性。
被创建的线程继承了父线程的信号掩码,且它的等待信号是空的。 参数:
thread:NULL,或者指向一个pthread_t对象的指针,指向此函数执行后存储的一个新线程的线程ID。 attr:
指向 pthread_attr_t结构体的指针,此结构体指定了线程的执行属性。
如果attr为NULL,则被设置为默认的属性,pthread_attr_init()可以设置attr属性。
注意:如果在创建线程后编辑attr属性,线程的属性不受影响。 start_routine:
线程的执行函数,arg为函数的参数。
如果start_routine()有返回值,调用pthread_exit(),start_routine()的返回值作为线程的退出状态。
在main()中的线程被调用有所不同。当它从main()返回,调用exit(),用main()的返回值作为退出状态。
arg:
传给start_routine的参数。
Linux线程在核内是以轻量级进程的形式存在的,拥有独立的进程表项,而所有的创建、同步、删除等操作都在核外pthread库中进行。pthread库使用一个管理线程(__pthread_manager(),每个进程独立且唯一)来管理线程的创建和终止,为线程分配线程ID,发送线程相关的信号(比如Cancel),而主线程(pthread_create())的调用者则通过管道将请求信息传给管理线程。
取消线程pthread_cancel()
#include <pthread.h>
int pthread_cancel( pthread_t thread ); 描述:
pthread_cancel()函数请求目标线程被取消(终止)。
取消类型(type)和状态(state)决定了取消函数是否会生效。
当取消动作被执行,目标线程的取消清除操作会被调用。
当最后的取消清除操作返回,目标线程的线程的析构函数会被调用。
当最后的析构函数返回,目标线程会被终止 (terminated)。
线程的取消过程同调用线程会异步执行。 参数:
thread: 欲取消的线程ID,可以通过pthread_create()或者pthread_self()函数取得。
pthread_testcancel()
#include <pthread.h>
void pthread_testcancel( void ); 描述:函数在运行的线程中创建一个取消点,如果cancellation无效则此函数不起作用。
pthread_setcancelstate()
#include <pthread.h>
int pthread_setcancelstate( int state, int* oldstate ); 描述:
pthread_setcancelstate() f函数设置线程取消状态为state,并且返回前一个取消点状态oldstate。 取消点有如下状态值:
PTHREAD_CANCEL_DISABLE:取消请求保持等待,默认值。
PTHREAD_CANCEL_ENABLE:取消请求依据取消类型执行;参考pthread_setcanceltype()。 参数:
state: 新取消状态。
oldstate: 指向本函数所存储的原取消状态的指针。
pthread_setcanceltype()
#include <pthread.h>
int pthread_setcanceltype( int type, int* oldtype ); 描述:
pthread_setcanceltype()函数设置运行线程的取消类型为type,并且返回原取消类型与oldtype。 取消类型值:
PTHREAD_CANCEL_ASYNCHRONOUS:如果取消有效,新的或者是等待的取消请求会立即执行。
PTHREAD_CANCEL_DEFERRED:如果取消有效,在遇到下一个取消点之前,取消请求会保持等待,默认值。
注意:标注POSIX和C库的调用不是异步取消安全地。 参数:
type: 新取消类型
oldtype: 指向该函数所存储的原取消类型的指针。
什么是取消点(cancelation point)?
资料中说,根据POSIX标准,pthread_join()、pthread_testcancel()、 pthread_cond_wait()、pthread_cond_timedwait()、sem_wait()、sigwait()等函数以及 read()、write()等会引起阻塞的系统调用都是Cancelation-point。而其他pthread函数都不会引起 Cancelation动作。但是pthread_cancel的手册页声称,由于LinuxThread库与C库结合得不好,因而目前C库函数都不是 Cancelation-point;但CANCEL信号会使线程从阻塞的系统调用中退出,并置EINTR错误码,因此可以在需要作为 Cancelation-point的系统调用前后调用pthread_testcancel(),从而达到POSIX标准所要求的目标,即如下代码段:
pthread_testcancel();
retcode = read(fd, buffer, length);
pthread_testcancel();
我发现,对于C库函数来说,几乎可以使线程挂起的函数都会响应CANCEL信号,终止线程,包括sleep、delay等延时函数,下面的例子对此会进行详细分析。
实例探讨
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 void cleanup(void *parm) {
6 printf("Inside cancellation cleanup handler\n");
7 }
8 void * child1(void *arg) {
9 int oldstate, oldtype;
10 int i = 0;
11 pthread_cleanup_push(cleanup,NULL);
12 printf("\nPTHREAD_CANCEL_DISABLE before\n");
13 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
14 // pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&oldtype);//comment 1
15 while (1) {
16 i++;
17 printf("child1: I am running. \n");
18 sleep(2);
19 if (i == 5) {
20 printf("\nPTHREAD_CANCEL_ENABLE\n");
21 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
22 //pthread_testcancel();//comment 2
23 }
24
25 }
26 pthread_cleanup_pop(0);
27 }
28
29 int main(int argc, char *argv[]) {
30 int tid1, rc;
31 void * status = NULL;
32 printf("hello, condition variable test\n");
33 pthread_create(&tid1, NULL, child1, NULL);
34 sleep(2);
35 printf("\npthread_cancel \n");
36 pthread_cancel(tid1);
37
38 rc = pthread_join(tid1, &status);
39 if (status != PTHREAD_CANCELED) {
40 printf("\npthread_join failed,status=%d,rc=%d\n", status, rc);
41 return -1;
42 } else
43 printf("\npthread_join succ,status=%d,rc=%d\n", status, rc);
44 printf("\nMain completed!~~\n");
45
46 return EXIT_SUCCESS;
47 }
运行结果:
hello, condition variable test PTHREAD_CANCEL_DISABLE before
child1: I am running. pthread_cancel
child1: I am running.
child1: I am running.
child1: I am running.
child1: I am running. PTHREAD_CANCEL_ENABLE
child1: I am running.
Inside cancellation cleanup handler pthread_join succ,status=-1,rc=0 Main completed!~~
代码分析:
- pthread_create(&tid1, NULL, child1, NULL)创建线程ID为tid1,调用child()过程;
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate),将线程tid1线程的取消置为无效;
- 主进程里 pthread_cancel(tid1)意为发送CANCEL信号取消线程tid1,但此时线程的而取消为无效,线程并不接受取消信号;
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate)将线程取消,但线程并不能立即终止,因为线程类型值为默认的PTHREAD_CANCEL_DEFERRED,即遇到下一个取消点才会终止线程;
那么,取消点在哪儿呢,也就是哪一句终止的线程呢?是sleep(2),它可以阻塞线程相应了取消信号。
如果你将sleep拿掉,会发现线程会无休止的运行下去,如果将sleep换成pthread_testcancel(),会发现线程也不会终止,这个函数不是有检测取消点,如果存在就取消线程的功用吗?懵了一会儿,索性将child1函数的while(1)内所有打印去掉,发现线程可以终止了。原来,无延时的频繁打印,会使pthread_testcancel()无法响应取消信号,解决方法是要么取消打印,要么加上延时。
最后是第13行:
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&oldtype);//comment 1
这句的作用是将取消类型设置为PTHREAD_CANCEL_ASYNCHRONOUS,即取消请求会被立即响应,加上这句,那么运行结果会变为:
hello, condition variable test PTHREAD_CANCEL_DISABLE before
child1: I am running. pthread_cancel
child1: I am running.
child1: I am running.
child1: I am running.
child1: I am running. PTHREAD_CANCEL_ENABLE
Inside cancellation cleanup handler pthread_join succ,status=-1,rc=0 Main completed!~~
由结果看到,比上次的执行结果在【PTHREAD_CANCEL_ENABLE】后少了一次【child1: I am running. 】打印,可以看出,在线程取消设为有效后,直接终止线程,并不进入下一轮循环。
总结:
创建线程
- 起初,主程序main()包含了一个唯一的默认线程。程序员必须明确创建所有其它线程;
- pthread_create创建一个新的线程并使其执行,这个过程可以在你的代码里的任何地方调用多次;
- 一个进程可以创建的线程的最大数量是依赖于实现的(The maximum number of threads that may be created by a process is implementation dependent. )。
- 线程一旦被创建,他们都是同等的,并且也可以创建其它线程。它们之间没有层次体系和依赖关系。
终止线程
- 一个线程有几种终止的方法:http://www.rosoo.net/a/201101/10809.html
- 线程从它的起始程序中返回:
- 线程调用了pthread_exit()函数;
- 线程被另一个线程调用pthread_cancel()函数所取消;
- 整个进程由于调用了exec或exit而退出。
- pthread_exit经常被用来明确的退出一个线程。通常,pthread_exit()函数在线程完成工作并不在需要时才被调用;
- 如果主程序main()在一个线程被创建之前,使用pthread_exit()函数退出结束,另一些线程继续执行。否则,他们在主程序main()结束的时候会自动终止;
- 清除:pthread_exit()函数不关闭文件,任何在线程中被打开的文件,在线程终止后仍然保持打开;
- 通常,你可以通过pthread_exit()函数来操作程序的执行到结束的过程,当然,除非你想要传递一个返回值。然而,在主程序main()中,在主程序结束时,它所产生的线程有个显而易见的问题。如果你不用pthread_exit(),当main()结束是,主进程以及所有的线程都会终止。如果在main()中调用了pthread_exit(),主进程和它所有的线程将会继续工作,尽管main()中的所有代码已经被执行了。
- thread
- The ID of the thread that you want to cancel, which you can get when you callpthread_create()orpthread_self().
四、POXI线程的连接与分离
线程连接(joining)和分离(detaching)函数:
- pthread_join(threadid,status)
- pthread_detach(threadid,status)
- pthread_attr_setdetachstate(attr,detachstate)
- pthread_attr_getdetachstate(attr,detachstate)
pthread_join(threadid,status)——等待线程终止
语法:
#include <pthread.h>
int pthread_join(pthread_t thread, void ** value_ptr);
描述:
pthread_join()将挂起调用线程的执行直到目标线程终止,除非目标线程已经终止了。
在一次成功调用pthread_join()并有非NULL的参数value_ptr,传给pthread_exit()终止线程的这个值以value_ptr作为引用是可用的。
当pthread_join()成功返回,目标线程就会终止。
对同一目标线程多次同时调用pthread_join()的结果是不确定的。
如果调用pthread_join()的线程被取消,那么目标线程将不会被分离。
一个线程是否退出或者保持为连接状态不明确会与{PTHREAD_THREADS_MAX}相背。
(It is unspecified whether a thread that has exited but remains unjoined counts against {PTHREAD_THREADS_MAX}.)
返回值
如果执行成功,pthread_join()会返回0;否则,会返回错误代码以表明错误类型。
原理
pthread_join()函数在多线程应用程序中的便捷性是令人满意的。如果没有其他状态作为参数传给start_routine(),程序员可以模拟这个函数。
pthread_detach——分离线程
语法:
#include <pthread.h>
int pthread_detach(pthread_t thread);
描述:
pthread_detach()函数在所指出线程终止时,该线程的内存空间可以被回收。
如果线程没有终止,pthread_detach()函数也不会令其终止。
对同一目标线程多次调用pthread_datach()的结果是不确定的。
返回值:
如果调用成功,pthread_detach()返回0;反之,会返回错误代码表明错误。
原理:
每一个被创建的线程,最终都需要调用pthread_join()函数或者pthread_detach()函数,因为与线程相关联的内存空间需要回收。
我们建议调用分离函数是不必要的;把线程创建时的属性置为分离状态就足够了,因为线程永远不需要动态分离。但是,需要的话仅限于如下两个原因:
- 在用pthread_join()进行取消操作时,用pthread_detach()函数分离pthread_join()正在等待的线程是很必要的。没有它,就必须用pthread_join()去试图分离另一个线程,这样,会无限期的延误取消处理,并且会引入新的pthread_join()调用,而它本身有可能需要取消处理。
- 为了脱离“初始线程”(在进程中设立服务线程是一种可取的办法)。
连接:
“连接”是线程间完成同步的一种方法,例如:
pthread_join()过程阻塞调用线程,直到被指定的threadid线程终止;
程序员能够获得目标线程终止的返回状态,如果目标线程调用pthread_exit()时指定;
一个连接线程跟一个pthread_join()调用对应。如果对同一个线程进行多次连接会发生逻辑错误;
还有两个同步方法:互斥锁和条件变量。
是否连接?
当一个线程被创建了,它有一个属性表明了它是否可以被连接和分离。只有线程是可连接的(joinable)才可以被连接。如果一个线程作为分离来创建的,那么它永远不能被连接。
POSIX标准的最终方案线程被设计为可连接被创建;
为了明确创建线程的连接性和分离性,pthread_create()函数可使用attr参数。典型的4个步骤:
- 声明一个pthread_attr_t数据类型的pthread属性变量;
- 使用pthread_init()初始化属性变量;
- 使用pthread_attr_setdetachstate()设置属性为分离状态;
- 当使用完成后,使用pthread_attr_destroy()释放属性用到的资源。
分离
pthread_detach()函数能够明确的分离一个线程,即使被创建为可连接的。这个过程是不可逆的。
建议
- 如果一个线程需要连接,可以考虑创建时明确它是可连接的。这是因为可移动设备中并不是所有的实现,线程默认的是按照可连接创建的。
- 如果你预先知道一个线程永远不需要与另一个线程连接,可以考虑把它创建为分离状态。一些系统资源可能会因此而被释放。
五、POXI线程互斥量的使用
Mutex Variables(互斥量)
- Mutex(互斥量)是“mutual exclusion”的缩写,互斥量最主要的用途是在多线程中对共享数据同进行写操作时同步线程并保护数据。
- 互斥量在保护共享数据资源时可以把它想象成一把锁,在Pthreads库中互斥量最基本的设计思想是在任一时间只有一个线程可以锁定(或拥有)互斥量,因此,即使许多线程尝试去锁定一个互斥量时成功的只会有一个,只有在拥有互斥量的线程开锁后其它的线程才能锁定,就是说,线程必须排队访问被保护的数据。
- 互斥量常被用来防止竞争条件(race conditions),下面是一个涉及银行业务的竞争条件案例:
Thread 1 | Thread 2 | Balance |
---|---|---|
Read balance: $1000 | $1000 | |
Read balance: $1000 | $1000 | |
Deposit $200 | $1000 | |
Deposit $200 | $1000 | |
Update balance $1000+$200 | $1200 | |
Update balance $1000+$200 | $1200 |
- 在上面的案例中,当线程使用共享数据资源Banlance时,互斥量会锁定“Balance”,如果不锁定,就会像表内情况一样,Balance的计算结果会混乱。
- 拥有互斥量的线程常常做的一个事情就是更改全局变量值,这是一个安全方法,确保在多个线程更改同一个变量时,它最终的值与唯一一个线程执行更改操作后的结果是一致的,这个被更改的变量属于一个临界区(critical section)。
- 使用互斥量的一个标准的过程是:
- 创建并初始化互斥量;
- 多个线程试图锁定此互斥量;
- 有且只有一个线程成功的拥有了这个互斥量;
- 拥有互斥量的线程执行了一系列的操作;
- 拥有互斥量的线程解锁互斥量;
- 另一个线程获得此互斥量,并重复上面的过程;
- 最终互斥量被销毁。
- 当许多线程竞争一个互斥量时,在请求时失败的线程会阻塞——无阻塞的请求是用“trylock”而不是用“lock”,所以pthread_mutex_trylock()与pthread_mutex_lock()的区别就是在互斥量还没有解锁的情况下,前者不会阻塞线程。
- 在保护共享数据时,程序员的责任是确定每个线程需要用到互斥量,例如,如果有4个线程正在更改同一个数据,但是只有一个线程使用了互斥量,那么这个数据仍然会损坏。
创建并销毁互斥量
函数:
pthread_mutex_init (mutex,attr)
pthread_mutex_destroy(mutex)
pthread_mutexattr_init(attr)
pthread_mutexattr_destroy(attr)
用法:
- 互斥量必须要声明为pthread_mutex_t类型,并且在使用之前必须要被初始化。有两种方法初始化互斥量:
- 静态初始化,例如:pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
- 动态初始化,使用pthread_mutex_init()过程,此函数可以设置互斥量属性对象attr;
注意,互斥量初始状态是未被锁定的(unlocked)。
- 对象attr常被用来设置互斥量对象的属性值的,而且必须是pthread_mutexattr_t类型(默认值是NULL)。Pthreads标准定义了3个互斥量可设置选项:
- Protocol:为互斥量指定用来防止优先级倒置的规则的;
- Prioceiling:为互斥量指定优先级上限的;
- Process-shared:为互斥量指定进程共享的。
注意,不是所有的实现都要提供这三个可选的互斥量属性的。
- pthread_mutexattr_init()和pthread_mutexattr_destroy()函数分别是用来创建和销毁互斥量属性的。
- pthread_mutex_destroy()在互斥量不再被需要时,用来释放互斥量对象。
锁定和解锁互斥量
函数:
pthread_mutex_lock (mutex)
pthread_mutex_trylock(mutex) pthread_mutex_unlock(mutex) |
用法:
- 线程用pthread_mutex_lock()函数通过指定的互斥量获得一个锁,如果互斥量已经被另一个线程锁定,那么这个请求会阻塞申请的线程,直到互斥量解锁。
- pthread_mutex_trylock()尝试锁定互斥量,然而,如果互斥量已经被锁定,那么此函数立即返回一个‘busy’的错误码,这个函数在优先级反转的情况下用来防止死锁是很有用的。
- 如果拥有锁的线程调用pthread_mutex_unlock()函数,那么将会解锁该互斥量;如果其它线程将要为处理被保护的数据请求互斥量,那么拥有该互斥量的线程在完成数据操作后调用该函数。发生下列情况,会有错误码返回:
- 如果互斥量已经被解锁;
- 如果互斥量被另一个线程拥有。
- 关于互斥量已经再也没有任何神奇的功用了……事实上,它们很类似于所有参与的线程之间的一个“君子协定”。
案例分析
/*****************************************************************************
* 描述:
* 这个例子阐明了互斥变量在线程编程中的用法
* main数据域定义一个全局的可访问结构体,所有线程均可访问,
* 每个线程都为此数据的一部分做运算工作,主线程等待所有线程完成运算,
* 并最后打印出计算结果。
******************************************************************************/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h> /*下面的结构体包含了dotprod函数要处理的所需要的信息,
* 访问输入数据并把输出写入结构体
* */
typedef struct {
double*a;
double*b;
double sum;//ab的和
int veclen;//结构体个数
} DOTDATA; /*全局变量和一个互斥量*/
#define NUMTHRDS 4
#define VECLEN 100
DOTDATA dotstr;
pthread_t callThd[NUMTHRDS];
pthread_mutex_t mutexsum; /*dotprod函数在线程创建时被激活,从DOTDATA结构体内取出数据,之后把运算结果又写入到结构体中。
* 这种处理方法的好处很明显是多线程编程:线程创建时,我们给被激活的函数传递了一个参数——参数值是线程的序号。
* 函数需要的其它信息是来自全局的结构体。
* */
void*dotprod(void*arg) {
int i, start, end, len;
long offset;
double mysum, *x, *y;
offset = (long) arg;
len = dotstr.veclen;
start = offset * len;
end = start + len;
x = dotstr.a;
y = dotstr.b;
//计算
mysum =0;
for (i = start; i < end; i++) {
mysum += (x[i] * y[i]);
} /*锁定互斥量,更改共享结构的值,解锁 */
pthread_mutex_lock(&mutexsum);
dotstr.sum += mysum;
printf("Thread%ld: mysum=%f,dotstr.sum=%f\n",offset,mysum,dotstr.sum);
pthread_mutex_unlock(&mutexsum); pthread_exit((void*) 0);
} /*主程序创建线程,线程计算数据,然后打印出结果。在创建线程前,创建输入数据。
* 因为所有线程更改同一个结构体,所以我们需要一个互斥量。
* 主线程需要等待所有线程完成,它等待这些线程中的一个,我们为线程赋予属性,允许主线程连接(join)其它被
* 创建的线程。注意,当不再需要它们是要记得释放它们。
* */
int main(int argc, char*argv[]) {
long i;
double*a, *b;
void*status;
pthread_attr_t attr; /* 初始化变量 */
a = (double*) malloc(NUMTHRDS * VECLEN *sizeof(double));
b = (double*) malloc(NUMTHRDS * VECLEN *sizeof(double)); for (i =0; i < VECLEN * NUMTHRDS; i++) {
a[i] =1;
b[i] = a[i];
} dotstr.veclen = VECLEN;
dotstr.a = a;
dotstr.b = b;
dotstr.sum =0; pthread_mutex_init(&mutexsum, NULL); pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (i =0; i < NUMTHRDS; i++) {
/*创建线程,激活dotpod函数,传入参数线程序号 */
printf("i=%d\n",i);
pthread_create(&callThd[i], &attr, dotprod, (void*) i);
} pthread_attr_destroy(&attr); /* 等待其它线程 */
for (i =0; i < NUMTHRDS; i++) {
pthread_join(callThd[i], &status);
} /* 线程join后,释放内存,销毁互斥量*/
printf("Sum = %f \n", dotstr.sum);
free(a);
free(b);
pthread_mutex_destroy(&mutexsum);
pthread_exit(NULL);
}
# Pthreads
i=
Thread0: mysum=100.000000,dotstr.sum=100.000000
i=
Thread1: mysum=100.000000,dotstr.sum=200.000000
i=
Thread2: mysum=100.000000,dotstr.sum=300.000000
i=
Thread3: mysum=100.000000,dotstr.sum=400.000000
Sum = 400.000000
指针和内存的图像化
主程序申请了两块内存,各分为四块,上图标明了各个指针的指向,程序共创建四个线程,把两块4*100的内存区域分成四块,做连加运算得到mysum,之后再把所有的运算结果加起来赋值于dostr.sum,由于它是多线程共享的全局变量这里得利用互斥变量排队做加法,才能保证该值结果的正常。
六、POXI线程库条件变量的使用
条件变量(Condition Variables)
条件变量是什么?
- 条件变量为我们提供了另一种线程间同步的方法,然而,互斥量是通过控制线程访问数据来实现同步,条件变量允许线程同步是基于实际数据的值。
- 如果没有条件变量,程序员需要让线程不断地轮询,以检查是否满足条件。由于线程处在一个不间断的忙碌状态,所以这是相当耗资源的。条件变量就是这么一个不需要轮询就可以解决这个问题的方法。
- 条件变量总是跟互斥锁(mutex lock)一起使用。
- 下面是使用条件变量的比较典型的过程:
主线程
|
|
线程A
|
Thread B
|
主线程
|
创建和销毁条件变量
函数:
pthread_cond_init (condition,attr)
pthread_cond_destroy (condition) pthread_condattr_init(attr) pthread_condattr_destroy (attr) |
用法:
- 条件变量必须声明为pthread_cond_t类型,并且在使用前必须要初始化。初始化,有两种方法:
- 静态初始化,像这样声明:pthread_con_t myconvar = PTHREAD_CON_INITIALIZER;
- 动态初始化,使用pthread_cond_init()函数。用创建条件变量的ID作为件参数传给线程,这种方法允许设置条件变量对象属性attr。
- 可设置的attr对象经常用来设置条件变量的属性,条件变量只有一种属性:process-thread,它的作用是允许条件变量被其它进程的线程看到。如果使用属性对象,必须是pthread_condattr_t类型(也可以赋值为NULL,作为默认值)。
注意,不是所有的实现都用得着process-shared属性。
- pthread_condattr_init()和pthread_condattr_destroy()函数是用来创建和销毁条件变量属性对象的。
- 当不再需要某条件变量时,可用pthread_cond_destroy()销毁。
条件变量的等待和信号发送
函数:
pthread_cond_wait(condition,mutex)
pthread_cond_signal (condition) pthread_cond_broadcast (condition) |
使用:
- pthread_cond_wait()阻塞调用线程,直到指定的条件变量收到信号。当互斥量被锁定时,应该调用这个函数,并且在等待时自动释放这个互斥量,在接收到信号后线程被唤醒,线程的互斥量会被自动锁定,程序员在线程中应当在此函数后解锁互斥量。
- pthread_cond_signal()函数常用来发信号给(或唤醒)正在等待条件变量的另一个线程,在互斥量被锁定后应该调用这个函数,并且为了pthread_cond_wait()函数的完成必须要解锁互斥量。
- 如果多个线程处于阻塞等待状态,那么必须要使用pthreads_cond_broadcast()函数,而不是pthread_cond_signal()。
- 在调用pthread_cond_wait()函数之前调用pthread_cond_signal()函数是个逻辑上的错误,所以,在使用这些函数时,正确的锁定和解锁与条件变量相关的互斥量是非常必要的,例如:
- 在调用pthread_cond_wait()之前锁定互斥量失败,可致使其无法阻塞;
- 在调用pthread_cond_signal()之后解锁互斥量失败,则致使与之对应的pthread_cond_wait()函数无法完成,并仍保持阻塞状态。
实例分析
看到下面的一汪代码不要挠头,99行而已,之后会抽丝剥茧,目的是对条件变量的运行机制了解个大概:
/******************************************************************************
* 描述:
* 应用Pthreads条件变量的实例代码,主线程创建三个线程,其中两个为“count”变量做
* 加法运算,第三个线程监视“count”的值。当“count”达到一个限定值,等待线程准备接收来
* 自于两个加法线程中一个的信号,等待 线程唤醒后更改“count”的值。程序继续运行直到加法
* 线程达到TCOUNT的值。最后,主程序打印出count的值。
******************************************************************************/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h> #define NUM_THREADS 3
#define TCOUNT 5 //单线程轮询次数
#define COUNT_LIMIT 7 //发送信号的次数
int count =0; //全局的累加量 pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv; void*inc_count(void*t) {
int i;
long my_id = (long) t; for (i =0; i < TCOUNT; i++) {
pthread_mutex_lock(&count_mutex);
count++;
/*
* 检查count的值,如果条件满足就发信号给等待线程
* 注意,此处是用信号量锁定的。
* */
if (count < COUNT_LIMIT) {
printf("inc_count(): thread %ld, count = %d Threshold reached. ",
my_id, count);
pthread_cond_signal(&count_threshold_cv);
printf("Just sent signal.\n");
}
printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id,
count);
pthread_mutex_unlock(&count_mutex); /*为线程轮询互斥锁增加延时*/
sleep(1);
}
pthread_exit(NULL);
} void*watch_count(void*t) {
long my_id = (long) t;
printf("Starting watch_count(): thread %ld\n", my_id); /*锁定互斥量并等待信号,注意,pthread_cond_wait函数在等待时将自动以自动原子方式
* 解锁互斥量。还有,请注意,如果等待线程运行到等待函数之前已经满足COUNT_LIMIT的
* 条件判断,轮询会忽略掉等待函数,
* */
while (count < COUNT_LIMIT) {
pthread_mutex_lock(&count_mutex);
printf("watch_count(): thread %ld going into wait...\n", my_id);
pthread_cond_wait(&count_threshold_cv, &count_mutex);
printf("watch_count(): thread %ld Condition signal received.\n", my_id); printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
pthread_mutex_unlock(&count_mutex);
}
pthread_exit(NULL);
} int main(int argc, char*argv[]) {
int i;
long t1 =1, t2 =2, t3 =3;
pthread_t threads[3];
pthread_attr_t attr; /*初始化互斥量和条件变量对象*/
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init(&count_threshold_cv, NULL); /*创建线程时设为可连接状态,便于移植*/
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threads[0], &attr, watch_count, (void*) t1);
pthread_create(&threads[1], &attr, inc_count, (void*) t2);
pthread_create(&threads[2], &attr, inc_count, (void*) t3); /* 等待所有线程完成*/
for (i =1; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
/*发送信号给监听线程*/
pthread_cond_signal(&count_threshold_cv);
pthread_join(threads[0],NULL);
printf("Main(): Waited on %d threads. Final value of count = %d. Done.\n",
NUM_THREADS, count); /*清除并退出 */
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_threshold_cv);
pthread_exit(NULL);
}
- 主线程创建了3个子线程,一个线程用来监听信号(threads[0],调用watch_count()函数),两个线程用来发送信号(threads[1]、threads[2],调用inc_count()函数);
- 两个发送信号线程,主要负责两件事:
- 两个线程利用互斥量为count做加法运算,两个线程一起做了(2*TCOUNT=)10次运算;
- count值小于COUNT_LIMIT时,发送信号给监听线程;
- 监听线程作用只有一个,就是如果count值小于COUNT_LIMIT则等待线程;
- 整体来讲,就是两个发送信号的线程让count做迭加运算,并在迭加到一定值之前给监听线程发送信号,监听线程收到打印信息
- 两个地方需要注意一下:
- pthread_cond_wait()有解锁和锁定互斥量的操作,它所进行的操作大体有三步:解锁—阻塞监听—锁定,所以在监听线程的循环体里面有两次“锁定-解锁”的操作;
- 主函数main最后的pthread_cond_signal()这句必不可少,因为监听线程运转没有延时,在count的值达到COUNT_LIMIT-1时,已经处于waiting状态。
# Pthreads
Starting watch_count(): thread 1
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count =1 Threshold reached. Just sent signal.
inc_count(): thread 2, count =1, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =1.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count =2 Threshold reached. Just sent signal.
inc_count(): thread 3, count =2, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =2.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count =3 Threshold reached. Just sent signal.
inc_count(): thread 2, count =3, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =3.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count =4 Threshold reached. Just sent signal.
inc_count(): thread 3, count =4, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =4.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count =5 Threshold reached. Just sent signal.
inc_count(): thread 2, count =5, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =5.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count =6 Threshold reached. Just sent signal.
inc_count(): thread 3, count =6, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =6.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count =7, unlocking mutex
inc_count(): thread 3, count =8, unlocking mutex
inc_count(): thread 2, count =9, unlocking mutex
inc_count(): thread 3, count =10, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =10.
Main(): Waited on 3 threads. Final value of count =10. Done.
<!--[endif]-->四、POXI线程的连接与分离
POSIX Threads Programming:https://computing.llnl.gov/tutorials/pthreads/