对于大多数人来说,如同记住Dijkstra是荷兰人一样,记住P和V本身的含义并不重要。但是,从真正学术的角度来说,P代表prolagen,这是由proberen te verlagen演变而来的杜撰词,其意思是尝试减小。V代表verhogen,其意思是增加。Dijkstra的技术说明EWD 74中介绍了这些含义。sem_wait(3RT)和sem_post(3RT)分别与Dijkstra的P和V操作相对应。sem_trywait(3RT)是P操作的一种条件形式。如果调用线程不等待就不能减小信号的值,则该调用会立即返回一个非零值。
有两种基本信号:二进制信号和计数信号量。二进制信号的值只能是0或1,计数信号量可以是任意非负值。二进制信号在逻辑上相当于一个互斥锁。计数信号量与互斥锁一起使用时的功能几乎与条件变量一样强大。在许多情况下,使用计数信号量实现的代码比使用条件变量实现的代码更为简单。
1、命名信号和未命名信号
Posex信号量接口总结(见下图):
上面一行是有名信号量,可于fifo相类比,其值保存在文件中,可用于进程和线程同步;
下面一行是无名信号量,可与pipe相类比,其值保存在内存中,可用于进程和线程同步;
中间部分,是两者的公用接口。
- sem_open() sem_close(),sem_unlink() //有名信号量
- \ |sem_wait(),sem_post() |/
- / |sem_trywait(),sem_getvalue()|\sem_destroy() //无名信号量
- sem_init()
2、计数信号量概述
从概念上来说,信号量是一个非负整数计数。信号量通常用来协调对资源的访问,其中信号计数会初始化为可用资源的数目。然后,线程在资源增加时会增加计数,在删除资源时会减小计数,这些操作都以原子方式执行。
如果信号计数变为零,则表明已无可用资源。计数为零时,尝试减小信号的线程会被阻塞,直到计数大于零为止。
信号操作:
初始化信号 sem_init
增加信号 sem_post
基于信号计数阻塞 sem_wait
减小信号计数 sem_trywait
销毁信号状态 sem_destroy
读取当前信号量 sem_getvalue
由于信号无需由同一个线程来获取和释放,因此信号可用于异步事件通知,如用于信号处理程序中。同时,由于信号包含状态,因此可以异步方式使用,而不用象条件变量那样要求获取互斥锁。但是,信号的效率不如互斥锁高。
3、无名线程信号量应用实例
(1)初始化信号量 sem_init();
(2)增加信号量 sem_post();
(3)阻塞减少信号量 sem_wait();
(4) 非阻塞减少信号技术 sem_trywait();
(5) 销毁信号量 sem_destory();
(6) 读取当前信号量的值 sem_getvalue();
以下是使用两个创建线程的实例程序的对比。每个程序都创建了多个线程,且在每个线程都打印一条测试信息。
在第一个程序中,采用平常的方式创建线程,从运行的结果可以看出,打印的信息杂乱无章,即个线程输出的信息交叉在一起。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
#include <iostream>
using namespace std; #include <pthread.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #define BUFFERSIZE 1024 #define TEN_MILLION 1000000L void *threadout( void *args); //定义线程函数 int main( int argc, char *argv[]) { int error; int i; int n; pthread_t *tids; if(argc != 2) //要求输入创建线程的个数 { fprintf(stderr, "Usage:%s numthreads\n", argv[ 0]); return 1; } n = atoi(argv[ 1]); //将输入的数字字符串转换成数值 tids = (pthread_t *)calloc(n, sizeof(pthread_t)); //申请空间以存储线程ID if(tids == NULL) { perror( "Failed to allocate memory for thread IDs"); return 1; } for(i = 0; i < n; i++) //创建n个线程,n为用户运行程序时输入的值 { if(error = pthread_create(tids + i, NULL, threadout, NULL)) { fprintf(stderr, "Failed to create thread %s\n", strerror(error)); return 1; } } for(i = 0; i < n; i++) //等待线程结束 { if(error = pthread_join(tids[i], NULL)) { fprintf(stderr, "Failed to join thread %s\n", strerror(error)); return 1; } } return 0; } //创建新线程的程序。 void *threadout( void *args) { char buffer[BUFFERSIZE]; char *c; struct timespec sleeptime; sleeptime.tv_sec = 0; sleeptime.tv_nsec = TEN_MILLION; snprintf(buffer, BUFFERSIZE, "This is a thread %d from process %ld\n", ( unsigned long)pthread_self, ( long)getpid()); c = buffer; //start of critical section while(*c != '\0') { fputc(*c, stderr); c++; nanosleep(&sleeptime, NULL); } //end of critical section return NULL; } |
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
#include <iostream>
using namespace std; #include <pthread.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <semaphore.h> #include <errno.h> #define BUFFERSIZE 1024 #define TEN_MILLION 1000000L void *threadout( void *args) { char buffer[BUFFERSIZE]; char *c; sem_t *semlockp; //信号量 struct timespec sleeptime; semlockp=(sem_t *)args; //从函数中获取信号量 sleeptime.tv_sec= 0; sleeptime.tv_nsec=TEN_MILLION; snprintf(buffer,BUFFERSIZE, "This is a thread %d from process %ld\n",( unsigned long)pthread_self,( long)getpid()); c=buffer; while(sem_wait(semlockp)==- 1) //进入临界区 if(errno!=EINTR) { fprintf(stderr, "Thread failed to lock semaphore\n"); return NULL; } //start of critical section while(*c!= '\0') { fputc(*c,stderr); c++; nanosleep(&sleeptime, NULL); } //退出临界区 if(sem_post(semlockp)==- 1) fprintf(stderr, "Thread failed to unlock semaphore\n"); return NULL; } int main( int argc, char *argv[]) { int error; int i; int n; sem_t semlock; //信号量定义 pthread_t *tids; if(argc!= 2) { fprintf(stderr, "Usage:%s numthreads\n",argv[ 0]); return 1; } n=atoi(argv[ 1]); tids=(pthread_t *)calloc(n, sizeof(pthread_t)); if(tids== NULL) { perror( "Failed to allocate memory for thread IDs"); return 1; } if(sem_init(&semlock, 0, 1)==- 1) //初始化 { perror( "Failed to initialize semaphore."); return 1; } for(i= 0;i<n;i++) { if(error=pthread_create(tids+i, NULL,threadout,&semlock)) //参数传递线程 { fprintf(stderr, "Failed to create thread %s\n",strerror(error)); return 1; } } for(i= 0;i<n;i++) { if(error=pthread_join(tids[i], NULL)) { fprintf(stderr, "Failed to join thread %s\n",strerror(error)); return 1; } } return 0; } |
4、命名线程信号量管理
(1)打开/创建命令线程信号量 sem_open();
(2) 关闭命令信号量 sem_close();
(3) 删除信号量 sem_unlink();
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
#include <iostream>
using namespace std; #include<stdio.h> #include<errno.h> #include <semaphore.h> #include<stdlib.h> #include<unistd.h> #include<sys/wait.h> #include<fcntl.h> #include<sys/stat.h> #define PERMS (mode_t)(S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH) //设置权限 #define FLAGS (O_CREAT|O_EXCL) //设置标识 #define BUFSIZE 1024 int getnamed( char *name, sem_t **sem, int val) { while(((*sem = sem_open(name, FLAGS, PERMS, val)) == SEM_FAILED) && (errno == EINTR)) { //创建 } if(*sem != SEM_FAILED) return 0; if(errno != EEXIST) return - 1; while(((*sem = sem_open(name, 0)) == SEM_FAILED) && (errno == EINTR)); //打开命名信号量 if(*sem != SEM_FAILED) return 0; return - 1; } int main( int argc, char *argv[]) { char buffer[BUFSIZE]; char *c ; pid_t childpid = 0; int delay; volatile int dummy = 0; int i, n; sem_t *semlockp; if(argc != 4) //检测参数个数 { fprintf(stderr, "Usage%s processes delay semaphorename\n", argv[ 0]); return 1; } n = atoi(argv[ 1]); delay = atoi(argv[ 2]); for(i = 1; i < n; i++) { if(childpid = fork()) //创建子进程 break; } snprintf(buffer, BUFSIZE, "i:%d process ID:%d parent ID:%ld child ID:%ld\n", i , ( long)getpid(), ( long)getppid(), ( long)childpid); c = buffer; if(getnamed(argv[ 3], &semlockp, 1) == - 1) { perror( "Failed to create named semaphore"); return 1; } while(sem_wait(semlockp) == - 1) //信号量阻塞 if(errno != EINTR) { perror( "Failed to lock semlock"); return 1; } while(*c != '\0') //临界区 { fputc(*c, stderr); c++; for(i = 0; i < delay; i++) dummy++; } if(sem_post(semlockp) == - 1) { perror( "Failed to unlock semlock"); return 1; } if(wait( NULL) == - 1) //退出 return 1; return 0; } |