Linux多线程编程之同步对象编程:线程信号量

时间:2021-09-07 15:13:58
       在计算机版本中,信号以简单整数来表示。线程等待获得许可以便继续运行,然后发出信号,表示该线程已经通过针对信号执行P操作来继续运行。线程必须等到信号的值为正,然后才能通过将信号值减1来更改该值。完成此操作后,线程会执行V操作,即通过将信号值加1来更改该值。这些操作必须以原子方式执行,不能再将其划分成子操作,即,在这些子操作之间不能对信号执行其他操作。在P操作中,信号值在减小之前必须为正,从而确保生成的信号值不为负,并且比该值减小之前小1。在P和V操作中,必须在没有干扰的情况下进行运算。如果针对同一信号同时执行两个V操作,则实际结果是信号的新值比原来大2。
       对于大多数人来说,如同记住Dijkstra是荷兰人一样,记住P和V本身的含义并不重要。但是,从真正学术的角度来说,P代表prolagen,这是由proberen te verlagen演变而来的杜撰词,其意思是尝试减小。V代表verhogen,其意思是增加。Dijkstra的技术说明EWD 74中介绍了这些含义。sem_wait(3RT)和sem_post(3RT)分别与Dijkstra的P和V操作相对应。sem_trywait(3RT)是P操作的一种条件形式。如果调用线程不等待就不能减小信号的值,则该调用会立即返回一个非零值。

       有两种基本信号:二进制信号和计数信号量。二进制信号的值只能是0或1,计数信号量可以是任意非负值。二进制信号在逻辑上相当于一个互斥锁。计数信号量与互斥锁一起使用时的功能几乎与条件变量一样强大。在许多情况下,使用计数信号量实现的代码比使用条件变量实现的代码更为简单。


1、命名信号和未命名信号

       

     Posex信号量接口总结(见下图):
     上面一行是有名信号量,可于fifo相类比,其值保存在文件中,可用于进程和线程同步;
     下面一行是无名信号量,可与pipe相类比,其值保存在内存中,可用于进程和线程同步;
      中间部分,是两者的公用接口。

  1. sem_open()                                sem_close(),sem_unlink()  //有名信号量  
  2.          \ |sem_wait(),sem_post()       |/  
  3.          / |sem_trywait(),sem_getvalue()|\sem_destroy()  //无名信号量  
  4. sem_init()  


2、计数信号量概述

      从概念上来说,信号量是一个非负整数计数。信号量通常用来协调对资源的访问,其中信号计数会初始化为可用资源的数目。然后,线程在资源增加时会增加计数,在删除资源时会减小计数,这些操作都以原子方式执行。

     如果信号计数变为零,则表明已无可用资源。计数为零时,尝试减小信号的线程会被阻塞,直到计数大于零为止。

信号操作:

      初始化信号                      sem_init 
     增加信号                           sem_post 

     基于信号计数阻塞             sem_wait 
     减小信号计数                    sem_trywait  

     销毁信号状态                    sem_destroy 

     读取当前信号量                 sem_getvalue

       由于信号无需由同一个线程来获取和释放,因此信号可用于异步事件通知,如用于信号处理程序中。同时,由于信号包含状态,因此可以异步方式使用,而不用象条件变量那样要求获取互斥锁。但是,信号的效率不如互斥锁高。

     

3、无名线程信号量应用实例

      (1)初始化信号量  sem_init();

       (2)增加信号量    sem_post();

      (3)阻塞减少信号量  sem_wait();

      (4) 非阻塞减少信号技术  sem_trywait();

       (5) 销毁信号量               sem_destory();

        (6) 读取当前信号量的值  sem_getvalue();

      以下是使用两个创建线程的实例程序的对比。每个程序都创建了多个线程,且在每个线程都打印一条测试信息。

     在第一个程序中,采用平常的方式创建线程,从运行的结果可以看出,打印的信息杂乱无章,即个线程输出的信息交叉在一起。

 

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include <iostream>

using  namespace std;
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>

#define BUFFERSIZE  1024
#define TEN_MILLION 1000000L

void *threadout( void *args);   //定义线程函数

int main( int argc,  char *argv[])
{
     int error;
     int i;
     int n;
    pthread_t *tids;

     if(argc !=  2)   //要求输入创建线程的个数
    {
        fprintf(stderr,  "Usage:%s numthreads\n", argv[ 0]);
         return  1;
    }

    n = atoi(argv[ 1]);    //将输入的数字字符串转换成数值
    tids = (pthread_t *)calloc(n,  sizeof(pthread_t));   //申请空间以存储线程ID
     if(tids ==  NULL)
    {
        perror( "Failed to allocate memory for thread IDs");
         return  1;
    }
     for(i =  0; i < n; i++)   //创建n个线程,n为用户运行程序时输入的值
    {
         if(error = pthread_create(tids + i,  NULL, threadout,  NULL))
        {
            fprintf(stderr,  "Failed to create thread %s\n", strerror(error));
             return  1;
        }

    }
     for(i =  0; i < n; i++)   //等待线程结束
    {
         if(error = pthread_join(tids[i],  NULL))
        {
            fprintf(stderr,  "Failed to join thread %s\n", strerror(error));
             return  1;
        }

    }

     return  0;
}
//创建新线程的程序。
void *threadout( void *args)
{
     char buffer[BUFFERSIZE];
     char *c;
     struct timespec sleeptime;
    sleeptime.tv_sec =  0;
    sleeptime.tv_nsec = TEN_MILLION;
    snprintf(buffer, BUFFERSIZE,  "This is a thread %d from process %ld\n", ( unsigned  long)pthread_self, ( long)getpid());
    c = buffer;

     //start of critical section
     while(*c !=  '\0')
    {
        fputc(*c, stderr);
        c++;
        nanosleep(&sleeptime,  NULL);
    }
     //end of critical section
     return  NULL;
}
     在程序2中,采用了线程信号量来同步各线程,从运行结构可以看出,各线程打印的信息是有序的。

     

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <iostream>

using  namespace std;
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <semaphore.h>
#include <errno.h>

#define BUFFERSIZE  1024
#define TEN_MILLION 1000000L


void *threadout( void *args)
{
     char buffer[BUFFERSIZE];
     char *c;
    sem_t *semlockp;                //信号量
     struct timespec sleeptime;

    semlockp=(sem_t *)args;   //从函数中获取信号量
    sleeptime.tv_sec= 0;
    sleeptime.tv_nsec=TEN_MILLION;
    snprintf(buffer,BUFFERSIZE, "This is a thread %d from process %ld\n",( unsigned  long)pthread_self,( long)getpid());
    c=buffer;

     while(sem_wait(semlockp)==- 1)   //进入临界区
     if(errno!=EINTR)
    {
        fprintf(stderr, "Thread failed to lock semaphore\n");
          return  NULL;
    }


     //start of critical section
     while(*c!= '\0')
    {
        fputc(*c,stderr);
        c++;
        nanosleep(&sleeptime, NULL);
    }
     //退出临界区
     if(sem_post(semlockp)==- 1)
      fprintf(stderr, "Thread failed to unlock semaphore\n");
     return  NULL;
}

int main( int argc, char *argv[])
{
      int error;
     int i;
     int n;
    sem_t semlock;    //信号量定义
    pthread_t *tids;

     if(argc!= 2)
    {
        fprintf(stderr, "Usage:%s numthreads\n",argv[ 0]);
         return  1;
    }

    n=atoi(argv[ 1]);
    tids=(pthread_t *)calloc(n, sizeof(pthread_t));
     if(tids== NULL)
    {
        perror( "Failed to allocate memory for thread IDs");
         return  1;
    }
     if(sem_init(&semlock, 0, 1)==- 1)   //初始化
    {
         perror( "Failed to initialize semaphore.");
         return  1;
    }
      for(i= 0;i<n;i++)
    {
         if(error=pthread_create(tids+i, NULL,threadout,&semlock))   //参数传递线程
        {
            fprintf(stderr, "Failed to create thread %s\n",strerror(error));
             return  1;
        }

    }
     for(i= 0;i<n;i++)
    {
          if(error=pthread_join(tids[i], NULL))
        {
            fprintf(stderr, "Failed to join thread %s\n",strerror(error));
             return  1;
        }

    }

     return  0;
}


4、命名线程信号量管理

    (1)打开/创建命令线程信号量  sem_open();

     (2)   关闭命令信号量   sem_close();

     (3)   删除信号量  sem_unlink();

 

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <iostream>

using  namespace std;
#include<stdio.h>
#include<errno.h>
#include <semaphore.h>
#include<stdlib.h>
#include<unistd.h>
#include<sys/wait.h>
#include<fcntl.h>
#include<sys/stat.h>

#define PERMS (mode_t)(S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH)   //设置权限
#define FLAGS (O_CREAT|O_EXCL)     //设置标识
#define BUFSIZE  1024            

int getnamed( char *name, sem_t **sem,  int val)
{
     while(((*sem = sem_open(name, FLAGS, PERMS, val)) == SEM_FAILED) && (errno == EINTR))
    {
            //创建
    }
     if(*sem != SEM_FAILED)
         return  0;
     if(errno != EEXIST)
         return - 1;
     while(((*sem = sem_open(name,  0)) == SEM_FAILED) && (errno == EINTR));
   //打开命名信号量
     if(*sem != SEM_FAILED)
         return  0;
     return - 1;
}
int main( int argc,  char *argv[])
{
     char buffer[BUFSIZE];
     char *c ;
    pid_t childpid =  0;
     int delay;
     volatile  int dummy =  0;
     int i, n;
    sem_t *semlockp;
     if(argc !=  4)   //检测参数个数
    {
        fprintf(stderr,  "Usage%s processes delay semaphorename\n", argv[ 0]);
         return  1;
    }
    n = atoi(argv[ 1]);
    delay = atoi(argv[ 2]);
     for(i =  1; i < n; i++)
    {
         if(childpid = fork())   //创建子进程
             break;
    }
    snprintf(buffer, BUFSIZE,  "i:%d process ID:%d parent ID:%ld child ID:%ld\n", i
             , ( long)getpid(), ( long)getppid(), ( long)childpid);
    c = buffer;
     if(getnamed(argv[ 3], &semlockp,  1) == - 1)
    {
        perror( "Failed to create named semaphore");
         return  1;
    }
     while(sem_wait(semlockp) == - 1)   //信号量阻塞
         if(errno != EINTR)
        {
            perror( "Failed to lock semlock");
             return  1;
        }
     while(*c !=  '\0')   //临界区
    {
        fputc(*c, stderr);
        c++;
         for(i =  0; i < delay; i++)
            dummy++;
    }
     if(sem_post(semlockp) == - 1)
    {
        perror( "Failed to unlock semlock");
         return  1;
    }
     if(wait( NULL) == - 1)   //退出
         return  1;

     return  0;
}