利用信号量实现线程同步

时间:2022-10-31 15:13:48

本篇使用信号量机制实现对全局资源的正确使用,包括以下两点:

  • 各个子线程对全局资源的互斥使用
  • 主线程对子线程的同步

信号量

简单的说,信号量内核对象,也是多线程同步的一种机制,它可以对资源访问进行计数,包括最大资源计数和当前资源计数,是两个32位的值;另外,计数是以原子访问的方式进行,由操作系统维护;

  • 最大资源计数,表示可以控件的最大资源数量
  • 当前资源计数,表示当前可用资源的数量

信号量的规则:

  • 如果当前资源计数器大于0,那么信号量处于触发状态
  • 如果当前资源计数器等于0,那么信号量处于未触发状态
  • 系统绝对不会让当前资源计数器变为负数
  • 当前资源计数器决定不会大于最大资源计数

信号量机制:

以一个停车场的运作为例。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆直接进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入外面的一辆进去,如果又离开两辆,则又可以放入两辆,如此往复。

在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用,当当前资源计数大于0,信号量处于触发状态,线程编程可调度;

相关API

  • 创建信号量
HANDLE WINAPI CreateSemaphore(
LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,//more设置为NULL
LONG lInitialCount, //当前可用资源的数量
LONG lMaximumCount, //最大资源数量
LPCTSTR lpName //信号量名字,可以设置为NULL
);
  • 释放信号量
BOOL WINAPI ReleaseSemaphore(
HANDLE hSemaphore, //信号量句柄
LONG lReleaseCount, //该函数返回时,当前可用资源的数增加lReleaseCount个
LPLONG lpPreviousCount
);
  • 打开已经存在的信号量
HANDLE WINAPI OpenSemaphore(
DWORD dwDesiredAccess,
BOOL bInheritHandle,
LPCTSTR lpName //打开信号量的名字
);
  • wait函数
DWORD WINAPI WaitForSingleObject(
HANDLE hHandle, //当等待成功时,当前可用信号量递减
DWORD dwMilliseconds //等待时间
);

线程同步举例

线程同步包含两层含义:

  1. 对全局资源互斥访问
  2. 线程间的有序、协同执行(比如:线程A执行完后,再进行线程B的操作)

例子1 互斥访问

#include "stdafx.h"
#include <iostream>
#include <afxmt.h>
using namespace std;

int g_nIndex = 0;
const int nMaxCnt = 20;

#define MAX_SEM_COUNT 1
#define THREADCOUNT 12

HANDLE ghSemaphore;

DWORD WINAPI ThreadProcBySEM( LPVOID );

int _tmain(int argc, _TCHAR* argv[])
{
HANDLE aThread[THREADCOUNT];
int i;

// Create a semaphore with initial and max counts of MAX_SEM_COUNT
// 备注:实现多个子线程资源互斥范围,最大信号量需要设置为1
ghSemaphore = CreateSemaphore(
NULL, // default security attributes
1, // initial count
1, // maximum count
NULL); // unnamed semaphore
if (ghSemaphore == NULL)
{
printf("CreateSemaphore error: %d\n", GetLastError());
return 0;
}

// Create worker threads
for( i=0; i < THREADCOUNT; i++ )
{
aThread[i] = CreateThread(
NULL, // default security attributes
0, // default stack size
(LPTHREAD_START_ROUTINE) ThreadProcBySEM,
NULL, // no thread function arguments
0, // default creation flags
NULL); // receive thread identifier

if( aThread[i] == NULL )
{
printf("CreateThread error: %d\n", GetLastError());
return 0;
}
}

//Wait for all threads to terminate
WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

//Close thread and semaphore handles
for( i=0; i < THREADCOUNT; i++ )
CloseHandle(aThread[i]);

CloseHandle(ghSemaphore);

return 0;
}


DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
{
DWORD dwWaitResult;
BOOL bContinue = TRUE;
while(bContinue)
{
//Try to enter the semaphore gate.
dwWaitResult = WaitForSingleObject(ghSemaphore,INFINITE);
if (WAIT_OBJECT_0 == dwWaitResult)
{
if (g_nIndex++ < nMaxCnt)
{
//Simulate thread spending time on task
Sleep(5);
printf("Thread %d: Index = %d\n", GetCurrentThreadId(), g_nIndex);
}
else
{
bContinue = FALSE;
}

// Relase the semaphore when task is finished
if (!ReleaseSemaphore(
ghSemaphore, // handle to semaphore
1, // increase count by one
NULL)) // not interested in previous count
{
printf("ReleaseSemaphore error: %d\n", GetLastError());
}
}
else
{
break;
}
}

return TRUE;
}

运行结果:

利用信号量实现线程同步

例子2 线程同步

下面我们将线程创建时的顺序编号,也打印出来,实现方式是将变量i传递给线程,以实现打印;

错误代码:

我们将编号i的地址作为CreateThread入参,传递给线程函数ThreadProcBySEM;主要修改如下:

 // Create worker threads
for( i=0; i < THREADCOUNT; i++ )
{
aThread[i] = CreateThread(
NULL, // default security attributes
0, // default stack size
(LPTHREAD_START_ROUTINE) ThreadProcBySEM,
&i, //线程编号地址
0, // default creation flags
NULL); // receive thread identifier

if( aThread[i] == NULL )
{
printf("CreateThread error: %d\n", GetLastError());
return 0;
}
}
//线程函数代码:
DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
{
DWORD dwWaitResult;
BOOL bContinue = TRUE;
if (NULL == lpParam)
{
return FALSE;
}
//获取编号,线程创建有时间开销,当函数执行到,赋值语句时
//lpParam所指向的是变量i,但是主线程已经对i进行++,不再是创建时的值了
int nThreadNum = *(int*)lpParam;

if (WAIT_OBJECT_0 == dwWaitResult)
{
if (g_nIndex++ < nMaxCnt)
{
//Simulate thread spending time on task
Sleep(5);
//打印线程编号
printf("NO %d = Thread %d: Index = %d\n", nThreadNum, GetCurrentThreadId(), g_nIndex);
}
}
}

运行结果:

从下图中可以看出,不同的线程句柄,却有相同的线程编号,属于异常线程;

利用信号量实现线程同步

原因分析:

线程创建到线程函数执行存在时间开销,线程函数不会第一时间开始执行,而此时主线程还处于非阻塞状态,主线程仍然可以对变量i不断进行++操作,导致当前线程进行访问时,其内容已经不再是当前那个值了;

正确做法:

1.引入关键代码段同步对象,用于各个子线间,访问全局资源g_nIndex,
2.在创建新线程前,需要将当前线程的编号保存到,临时变量;
3.将信号量对象用于主线程和子线程间的同步;

全部代码如下:

#include "stdafx.h"
//#include <windows.h>
#include <iostream>
#include <afxmt.h>
using namespace std;

#define MAX_SEM_COUNT 1
#define THREADCOUNT 12

int g_nIndex = 0;
const int nMaxCnt = 20;

HANDLE ghSemaphore;
CRITICAL_SECTION g_csLockA;

DWORD WINAPI ThreadProcBySEM( LPVOID );

int _tmain(int argc, _TCHAR* argv[])
{
HANDLE aThread[THREADCOUNT];
int i;

// Create a semaphore with initial and max counts of MAX_SEM_COUNT
ghSemaphore = CreateSemaphore(
NULL, // default security attributes
0, // initial count,modify:当前处于非触发状态
1, // maximum count
NULL); // unnamed semaphore
if (ghSemaphore == NULL)
{
printf("CreateSemaphore error: %d\n", GetLastError());
return 0;
}

//初始化关键代码段对象,用于访问全局资源的互斥
InitializeCriticalSection(&g_csLockA);

// Create worker threads
for( i=0; i < THREADCOUNT; i++ )
{

aThread[i] = CreateThread(
NULL, // default security attributes
0, // default stack size
(LPTHREAD_START_ROUTINE) ThreadProcBySEM,
&i, // no thread function arguments
0, // default creation flags
NULL); // receive thread identifier

if( aThread[i] == NULL )
{
printf("CreateThread error: %d\n", GetLastError());
return 0;
}
//modify 创建新线程前,需要将编号保存起来,才创建新线程
//用于同步主线程和子线程
WaitForSingleObject(ghSemaphore,INFINITE);
}

//Wait for all threads to terminate
WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

//Close thread and semaphore handles
for( i=0; i < THREADCOUNT; i++ )
CloseHandle(aThread[i]);

CloseHandle(ghSemaphore);

DeleteCriticalSection(&g_csLockA);

return 0;
}


DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
{
DWORD dwWaitResult;
BOOL bContinue = TRUE;
if (NULL == lpParam)
{
return FALSE;
}
int nThreadNum = *(int*)lpParam;

// Relase the semaphore
if (!ReleaseSemaphore(
ghSemaphore, // handle to semaphore
1, // increase count by one
NULL)) // not interested in previous count
{
printf("ReleaseSemaphore error: %d\n", GetLastError());
}

while(bContinue)
{
Sleep(10);//modify:当前线程处于休眠,给其他线程执行机会

//Try to enter the cs gate. //modify:保护资源的唯一性访问
EnterCriticalSection(&g_csLockA);
if (g_nIndex++ < nMaxCnt)
{
printf("NO %02d = Thread %d: Index = %d\n", nThreadNum, GetCurrentThreadId(), g_nIndex);
}
else
{
bContinue = FALSE;
}

LeaveCriticalSection(&g_csLockA);
}

return TRUE;
}

运行结果:

利用信号量实现线程同步

从运行结果看出,线程句柄和编号可以一一对应,并且资源访问也正常;