信号量和事件---信号量

时间:2021-02-21 20:38:00
关于复杂的多线程和多进程应用程序的映射指南

级别: 高级

Srinivasan S. Muthuswamy, 软件工程师, IBM Global Services Group
Kavitha Varadarajan, 软件工程师, IBM India Software Lab

2005 年 6 月 27 日

随着开发人员将一些普及的 Windows® 应用程序迁移到 Linux™ 平台,企业中正在进行的向开放源码迁移的浪潮有可能引发极大的移植问题。这个由三部分组成的系列文章提供了一个映射指南,并附有一些例子,以简化从 Windows 到 Linux 的转移。本文是系列文章的第 2 部分,将介绍两种同步对象类型:信号量和事件。

当前,很多全球商务和服务都正在走向开源 —— 业界的所有主要参与者都在争取实现此目标。这一趋势催生了一个重要的迁移模式:为不同平台(Windows、OS2、Solaris 等)维持的许多现有产品都将被移植到开放源码的 Linux 平台。

很多应用程序在设计时并未考虑到需要将它们移植到 Linux。这有可能使移植成为一件痛苦的事情,但并非绝对如此。本系列文章的目的是,帮助您将涉及到 IPC 和线程原语的复杂应用程序从 Windows 迁移到 Linux。我们与您分享迁移这些关键应用程序的经验,其中包括要求线程同步的多线程应用程序以及要求进程间同步的多进程应用程序。

简言之,可以将此系列文章看作是一个映射文档 —— 它提供了与线程、进程和进程间通信元素(互斥、信号量等等)相关的各种 Windows 调用到 Linux 调用的映射。我们将那些映射分为三部分:

  • 在 第 1 部分 中,我们已经对进程和线程进行了讨论。
  • 本文将介绍信号量和事件。
  • 第 3 部分将介绍互斥、临界区和等待函数。

在本文中,我们将从同步技术入手,继续从 Windows 到 Linux 的映射指导。

同步
在 Windows 上,同步是使用等待函数中的同步对象来实现的。同步对象可以有两种状态:有信号(signaled)状态和无信号(non-signaled)状态。当在一个等待函数中使用同步对象时,等待函数就会阻塞调用线程,直到同步对象的状态被设置为有信号为止。

下面是在 Windows 上可以使用的一些同步对象:

  • 事件(Event)
  • 信号量(Semaphore)
  • 互斥(Mutexe)
  • 临界区(Critical section)

在 Linux 中,可以使用不同的同步原语。Windows 与 Linux 的不同之处在于每个原语都有自己的等待函数(所谓等待函数就是用来修改同步原语状态的函数);在 Windows 中,有一些通用的等待函数来实现相同的目的。以下是 Linux 上可以使用的一些同步原语:

  • 信号量(Semaphore)
  • 条件变量(Conditional variable)
  • 互斥(Mutexe)

通过使用上面列出的这些原语,各种库都可以用于 Linux 之上,以提供同步机制。

表 1. 同步映射

Windows Linux —— 线程 Linux —— 进程
互斥 互斥 - pthread 库 System V 信号量
临界区 互斥 - pthread 库 不适用,因为临界区只用于同一进程的不同线程之间
信号量 具有互斥的条件变量 - pthreads
POSIX 信号量
System V 信号量
事件 具有互斥的条件变量 - pthreads System V 信号量

信号量
Windows 信号量是一些计数器变量,允许有限个线程/进程访问共享资源。Linux POSIX 信号量也是一些计数器变量,可以用来在 Linux 上实现 Windows 上的信号量功能。

在对进程进行映射时,我们需要考虑以下问题:

  • 信号量的类型: Windows 提供了有名(named)信号量和无名(unnamed)信号量。有名信号量可以在进程之间进行同步。在 Linux 上,在相同进程的不同线程之间,则只使用 POSIX 信号量。在进程之间,可以使用 System V 信号量。
  • 等待函数中的超时: 当在一个等待函数中使用时,可以为 Windows 信号量对象指定超时值。在 Linux 中,并没有提供这种功能,只能通过应用程序逻辑处理超时的问题。

表 2. 信号量映射

Windows Linux 线程 Linux 进程 类别
CreateSemaphore sem_init semget
semctl
与上下文相关
OpenSemaphore 不适用 semget 与上下文相关
WaitForSingleObject sem_wait
sem_trywait
semop 与上下文相关
ReleaseSemaphore sem_post semop 与上下文相关
CloseHandle sem_destroy semctl 与上下文相关

创建信号量
在 Windows 中,可以使用 CreateSemaphore() 创建或打开一个有名或无名的信号量。


            HANDLE CreateSemaphore(
            LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,
            LONG lInitialCount,
            LONG lMaximumCount,
            LPCTSTR lpName
            );
            

在这段代码中:

  • lpSemaphoreAttributes 是一个指向安全性属性的指针。如果这个指针为空,那么这个信号量就不能被继承。
  • lInitialCount 是该信号量的初始值。
  • lMaximumCount 是该信号量的最大值,该值必须大于 0。
  • lpName 是信号量的名称。如果该值为 NULL,那么这个信号量就只能在相同进程的不同线程之间共享。否则,就可以在不同的进程之间进行共享。

这个函数创建信号量,并返回这个信号量的句柄。它还将初始值设置为调用中指定的值。这样就可以允许有限个线程来访问某个共享资源。

在 Linux 中,可以使用 sem_init() 来创建一个无名的 POSIX 信号量,这个调用可以在相同进程的线程之间使用。它还会对信号量计数器进行初始化:int sem_init(sem_t *sem, int pshared, unsigned int value)。在这段代码中:

  • value(信号量计数器)是这个信号量的初始值。
  • pshared 可以忽略,因为在目前的实现中,POSIX 信号量还不能在进程之间进行共享。

这里要注意的是,最大值基于 demaphore.h 头文件中定义的 SEM_VALUE_MAX。

在 Linux 中,semget() 用于创建 System V 信号量,它可以在不同集成的线程之间使用。可以用它来实现与 Windows 中有名信号量相同的功能。这个函数返回一个信号量集标识符,它与一个参数的键值关联在一起。当创建一个新信号量集时,对于与 semid_ds 数据结构关联在一起的信号量,semget() 要负责将它们进行初始化,方法如下:

  • sem_perm.cuid 和 sem_perm.uid 被设置为调用进程的有效用户 ID。
  • sem_perm.cgid 和 sem_perm.gid 被设置为调用进程的有效组 ID。
  • sem_perm.mode 的低 9 位被设置为 semflg 的低 9 位。
  • sem_nsems 被设置为 nsems 的值。
  • sem_otime 被设置为 0。
  • sem_ctime 被设置为当前时间。

用来创建 System V 信号量使用的代码是:int semget(key_t key, int nsems, int semflg)。下面是对这段代码的一些解释:

  • key 是一个惟一的标识符,不同的进程使用它来标识这个信号量集。我们可以使用 ftok() 生成一个惟一的键值。IPC_PRIVATE 是一个特殊的 key_t 值;当使用 IPC_PRIVATE 作为 key 时,这个系统调用就会只使用 semflg 的低 9 位,但却忽略其他内容,从而新创建一个信号量集(在成功时)。
  • nsems 是这个信号量集中信号量的数量。
  • semflg 是这个新信号量集的权限。要新创建一个信号量集,您可以将使用 IPC_CREAT 来设置位操作或访问权限。如果具有该 key 值的信号量集已经存在,那么 IPC_CREAT/IPC_EXCL 标记就会失败。

注意,在 System V 信号量中,key 被用来惟一标识信号量;在 Windows 中,信号量是使用一个名称来标识的。

为了对信号量集数据结构进行初始化,可以使用 IPC_SET 命令来调用 semctl() 系统调用。将 arg.buf 所指向的 semid_ds 数据结构的某些成员的值写入信号量集数据结构中,同时更新这个结构的 sem_ctime member 的值。用户提供的这个 arg.buf 所指向的 semid_ds 结构如下所示:

  • sem_perm.uid
  • sem_perm.gid
  • sem_perm.mode (只有最低 9 位有效)

调用进程的有效用户 ID 应该是超级用户,或者至少应该与这个信号量集的创建者或所有者匹配: int semctl(int semid, int semnum, int cmd = IPC_SET, ...)。在这段代码中:

  • semid 是信号量集的标识符。
  • semnum 是信号量子集偏移量(从 0 到 nsems -1,其中 n 是这个信号量集中子集的个数)。这个命令会被忽略。
  • cmd 是命令;它使用 IPC_SET 来设置信号量的值。
  • args 是这个信号量集数据结构中要通过 IPC_SET 来更新的值(在这个例子中会有解释)。

最大计数器的值是根据在头文件中定义的 SEMVMX 来决定的。

打开信号量
在 Windows 中,我们使用 OpenSemaphore() 来打开某个指定信号量。只有在两个进程之间共享信号量时,才需要使用信号量。在成功打开信号量之后,这个函数就会返回这个信号量的句柄,这样就可以在后续的调用中使用它了。


            HANDLE OpenSemaphore(
            DWORD dwDesiredAccess,
            BOOL bInheritHandle,
            LPCTSTR lpName
            )
            

在这段代码中:

  • dwDesiredAccess 是针对该信号量对象所请求的访问权。
  • bInheritHandle 是用来控制这个信号量句柄是否可继承的标记。如果该值为 TRUE,那么这个句柄可以被继承。
  • lpName 是这个信号量的名称。

在 Linux 中,可以调用相同的 semget() 来打开某个信号量,不过此时 semflg 的值为 0:int semget(key,nsems,0)。在这段代码中:

  • key 应该指向想要打开的信号量集的 key 值。
  • 为了打开一个已经存在的信号量,可以将 nsems 和标记设置为 0。semflg 值是在返回信号量集标识符之前对访问权限进行验证时设置的。

获取信号量
在 Windows 中,等待函数提供了获取同步对象的机制。可以使用的等待函数有多种类型;在这一节中,我们只考虑 WaitForSingleObject()(其他类型将会分别进行讨论)。这个函数使用一个信号量对象的句柄作为参数,并会一直等待下去,直到其状态变为有信号状态或超时为止。

DWORD WaitForSingleObject( HANDLE hHandle, DWORD dwMilliseconds );

在这段代码中:

  • hHandle 是指向互斥句柄的指针。
  • dwMilliseconds 是超时时间,以毫秒为单位。如果该值是 INFINITE,那么它阻塞调用线程/进程的时间就是不确定的。

在 Linux 中,sem_wait() 用来获取对信号量的访问。这个函数会挂起调用线程,直到这个信号量有一个非空计数为止。然后,它可以原子地减少这个信号量计数器的值:int sem_wait(sem_t * sem)

在 POSIX 信号量中并没有超时操作。这可以通过在一个循环中执行一个非阻塞的 sem_trywait() 实现,该函数会对超时值进行计算:int sem_trywait(sem_t * sem)

在使用 System V 信号量时,如果通过使用 IPC_SET 命令的 semctl() 调用设置初始的值,那么必须要使用 semop() 来获取信号量。semop() 执行操作集中指定的操作,并阻塞调用线程/进程,直到信号量值为 0 或更大为止:int semop(int semid, struct sembuf *sops, unsigned nsops)

函数 semop() 原子地执行在 sops 中所包含的操作 —— 也就是说,只有在这些操作可以同时成功执行时,这些操作才会被同时执行。sops 所指向的数组中的每个 nsops 元素都使用 struct sembuf 指定了一个要对信号量执行的操作,这个结构包括以下成员:

  • unsigned short sem_num; (信号量个数)
  • short sem_op; (信号量操作)
  • short sem_flg; (操作标记)

要获取信号量,可以通过将 sem_op 设置为 -1 来调用 semop();在使用完信号量之后,可以通过将 sem_op 设置为 1 来调用 semop() 释放信号量。通过将 sem_op 设置为 -1 来调用 semop(),信号量计数器将会减小 1,如果该值小于 0(信号量的值是不能小于 0 的),那么这个信号量就不能再减小,而是会让调用线程/进程阻塞,直到其状态变为有信号状态为止。

sem_flg 中可以识别的标记是 IPC_NOWAIT 和 SEM_UNDO。如果某一个操作被设置了 SEM_UNDO 标记,那么在进程结束时,该操作将被取消。如果 sem_op 被设置为 0,那么 semop() 就会等待 semval 变成 0。这是一个“等待为 0” 的操作,可以用它来获取信号量。

记住,超时操作在 System V 信号量中并不适用。这可以在一个循环中使用非阻塞的 semop()(通过将 sem_flg 设置为 IPC_NOWAIT)实现,这会计算超时的值。

释放信号量
在 Windows 中,ReleaseSemaphore() 用来释放信号量。


            BOOL ReleaseSemaphore(
            HANDLE hSemaphore,
            LONG lReleaseCount,
            LPLONG lpPreviousCount
            );
            

在这段代码中:

  • hSemaphore 是一个指向信号量句柄的指针。
  • lReleaseCount 是信号量计数器,可以通过指定的数量来增加计数。
  • lpPreviousCount 是指向上一个信号量计数器返回时的变量的指针。如果并没有请求上一个信号量计数器的值,那么这个参数可以是 NULL。

这个函数会将信号量计数器的值增加在 lReleaseCount 中指定的值上,然后将这个信号量的状态设置为有信号状态。

在 Linux 中,我们使用 sem_post() 来释放信号量。这会唤醒对这个信号量进行阻塞的所有线程。信号量的计数器同时被增加 1。要为这个信号量的计数器添加指定的值(就像是 Windows 上一样),可以使用一个互斥变量多次调用以下函数:int sem_post(sem_t * sem)

对于 System V 信号量来说,只能使用 semop() 来释放信号量:int semop(int semid, struct sembuf *sops, unsigned nsops)

函数 semop() 原子地执行 sops 中包含的一组操作(只在所有操作都可以同时成功执行时,才会将所有的操作同时一次执行完)。sops 所指向的数组中的每个 nsops 元素都使用一个 struct sembuf 结构指定了一个要对这个信号量执行的操作,该结构包含以下元素:

  • unsigned short sem_num;(信号量个数)
  • short sem_op; (信号量操作)
  • short sem_flg; (操作标记)

要释放信号量,可以通过将 sem_op 设置为 1 来调用 semop()。通过将 semop() 设置为 1 来调用 semop(),这个信号量的计数器会增加 1,同时用信号通知这个信号量。

关闭/销毁信号量
在 Windows 中,我们使用 CloseHandle() 来关闭或销毁信号量对象。


            BOOL CloseHandle(
            HANDLE hObject
            );
            

hObject 是指向这个同步对象句柄的指针。

在 Linux 中,sem_destroy() 负责销毁信号量对象,并释放它所持有的资源: int sem_destroy(sem_t *sem)。对于 System V 信号量来说,只能使用 semctl() 函数的 IPC_RMID 命令来关闭信号量集:int semctl(int semid, int semnum, int cmd = IPC_RMID, ...)

这个命令将立即删除信号量集及其数据结构,并唤醒所有正在等待的进程(如果发生错误,则返回,并将 errno 设置为 EIDRM)。调用进程的有效用户 ID 必须是超级用户,或者可以与该信号量集的创建者或所有者匹配的用户。参数 semnum会被忽略。

例子
下面是信号量的几个例子。

清单 1. Windows 无名信号量的代码

            HANDLE hSemaphore;
            LONG   lCountMax = 10;
            LONG   lPrevCount;
            DWORD  dwRetCode;
            // Create a semaphore with initial and max. counts of 10.
            hSemaphore = CreateSemaphore(
            NULL,        // no security attributes
            0,           // initial count
            lCountMax,   // maximum count
            NULL);       // unnamed semaphore
            // Try to enter the semaphore gate.
            dwRetCode = WaitForSingleObject(
            hSemaphore,  // handle to semaphore
            2000L);   // zero-second time-out interval
            switch (dwRetCode)
            {
            // The semaphore object was signaled.
            case WAIT_OBJECT_0:
            // Semaphore is signaled
            // go ahead and continue the work
            goto success:
            break;
            case WAIT_TIMEOUT:
            // Handle the time out case
            break;
            }
            Success:
            // Job done, release the semaphore
            ReleaseSemaphore(
            hSemaphore,  // handle to semaphore
            1,           // increase count by one
            NULL)        // not interested in previous count
            // Close the semaphore handle
            CloseHandle(hSemaphore);
            

清单 2. Linux 使用 POSIX 信号量的等效代码
// Main thread
            #define TIMEOUT 200  /* 2 secs */
            // Thread 1
            sem_t sem     ; // Global Variable
            int   retCode ;
            // Initialize event semaphore
            retCode = sem_init(
            sem,   // handle to the event semaphore
            0,     // not shared
            0);    // initially set to non signaled state
            while (timeout < TIMEOUT ) {
            delay.tv_sec = 0;
            delay.tv_nsec = 1000000;  /* 1 milli sec */
            // Wait for the event be signaled
            retCode = sem_trywait(
            &sem); // event semaphore handle
            // non blocking call
            if (!retCode)  {
            /* Event is signaled */
            break;
            }
            else {
            /* check whether somebody else has the mutex */
            if (retCode == EPERM ) {
            /* sleep for delay time */
            nanosleep(&delay, NULL);
            timeout++ ;
            }
            else{
            /* error  */
            }
            }
            }
            // Completed the job,
            // now destroy the event semaphore
            retCode = sem_destroy(
            &sem);   // Event semaphore handle
            // Thread 2
            // Condition met
            // now signal the event semaphore
            sem_post(
            &sem);    // Event semaphore Handle
            

清单 3. Linux 使用 System V 信号量的等效代码
// Process 1
            #define TIMEOUT 200
            //Definition of variables
            key_t key;
            int semid;
            int Ret;
            int timeout = 0;
            struct sembuf operation[1] ;
            union semun
            {
            int val;
            struct semid_ds *buf;
            USHORT *array;
            } semctl_arg,ignored_argument;
            key = ftok(); // Generate a unique key, U can also supply a value instead
            semid = semget(key,             // a unique identifier to identify semaphore set
            1,              // number of semaphore in the semaphore set
            0666 | IPC_CREAT // permissions (rwxrwxrwx) on the new
            //    semaphore set and creation flag
            );
            //Set Initial value for the resource
            semctl_arg.val = 0; //Setting semval to 0
            semctl(semid, 0, SETVAL, semctl_arg);
            //Wait for Zero
            while(timeout < TIMEOUT)
            {
            delay.tv_sec = 0;
            delay.tv_nsec = 1000000;  /* 1 milli sec */
            //Call Wait for Zero with IPC_NOWAIT option,so it will be non blocking
            operation[0].sem_op = -1; // Wait until the semaphore count becomes 0
            operation[0].sem_num = 0;
            operation[0].sem_flg = IPC_NOWAIT;
            ret = semop(semid, operation,1);
            if(ret < 0)
            {
            /* check whether somebody else has the mutex */
            if (retCode == EPERM )
            {
            /* sleep for delay time */
            nanosleep(&delay, NULL);
            timeout++ ;
            }
            else
            {
            printf("ERROR while wait ");
            break;
            }
            }
            else
            {
            /*semaphore got triggered */
            break;
            }
            }
            //Close semaphore
            iRc = semctl(semid, 1, IPC_RMID , ignored_argument);
            }
            // Process 2
            key_t key = KEY; // Process 2 should know key value in order to open the
            //    existing semaphore set
            struct sembuf operation[1] ;
            //Open semaphore
            semid = semget(key, 1, 0);
            operation[0].sem_op = 1; // Release the resource so Wait in process 1 will
            //    be triggered
            operation[0].sem_num = 0;
            operation[0].sem_flg = SEM_UNDO;
            //Release semaphore
            semop(semid, operation,0);
            }