(区别于linux内核所用的自旋锁和互斥锁,本文中讨论的锁用于普通编程)
当两个或多个并发线程的执行次序造成了意想不到的错误结果时,“竞态条件”就是会产生。防止“竞态条件”的一个方法是使用同步机制,对访问“共享资源”的代码中关键段实施“品行访问”控制。常用的OS同步机制有:互斥体(mutex)、“多读取者/单写入者”锁(reader/writer locks)、信号量(semaphores)和条件变量(condition variable)。
1.互斥体(mutex,Mutual Exclusion)锁
当共享资源被多个线程并发访问时,为了确保这些资源的完整性,我们可以使用互斥体(mutex)锁。互斥体可用来串行执行多个线程,这需要在代码中确定关键 (critical section)——即,一次只能由一个线程执行的代码。“互斥体”语义像双括号那样具有“对称体”:也就是就说,如果一个线程拥有互斥体,那么,它还和负责释放这个互斥体。这种简单的语义有助于互斥体的高效实现——如通过硬件自旋锁(spinlock)来实现。
常见的互斥体有两种:
非递归互斥体(nonrecursive mutex):如果当前拥有互斥体的线程在没有首先释放它的情况,试图再次获得它,就会导致死锁或失败。
递归互斥体(recursive mutex):拥有互斥体的线程可能多次获得它而不会产生自死锁,只要这个线程最终以相同次数释放这个互斥体即可。
2.Readers/Writer锁
Readers/Writer(多读取者/单写入者)锁可以通过以下方式之一访问共享资源:
多个线程并发读取资源,但不修改它;
一次只一个线程修改资源,此时其它线程都不能对其进行读/写访问。
Readers/Writer锁可以用来保护“读操作比写操作频繁”的资源,从而提高并发应用程序的性能。在实现Readers/Writer锁时,要么给“读取者”以优先权,要么给“写入者”以优先权。
Readers/Writer 锁和互斥体有某些共性,例如:获得锁的线程也必须释放这个锁,如查一个“写入者”希望获得这个锁,那么,这个线程必须等待其它所有“拥有这个锁”的线程释放它;然后,这个“写入者”线程单独占有这个锁。便但和互斥体不同的是,多个线程可以同时获得一个Readers/Writer锁执行“读”操作。
3.信号量锁
从概念上说,信号量(semaphore)是可以原子(automically)递增和背叛的非负整数。如果一个线程试图递减一个信号量,但这个信号量的值已经为0,则线程将会阻塞。另一个线程“发出(post)”这个信号(semaphore),使用信号量大于0之后,被阻塞的线程才会被释放。
信号量维护状态信息,对信号计数值(count)和被阻塞线程的数量进行记录。它们一般是通过“休止锁(sleep lock)”来实现的;休止锁用来触发环境切换,以允许其他线程执行。和互斥体不同的是,释放信号量的线程不必是最初获得这个信号量的线程。这使得信号量适用于更广泛的执行环境,如信号处理程序或中断处理程序。
4.条件变量
和互斥体、Readers/Writer锁、信号量锁不同,条件变量(condtion variable)提供了不同风格的同步方式。在前三种机制中,当“占有锁的线程”在关键段中执行代码时,其他线程会等待。下此相反,使用条件变量,线程可以调整和调度自己的处理过程。
例如,某一数据被其他线程共享;条件变量可能使用处于等待状态,直至“涉及这个数据”的一个条件表达式达到某一状态。当一个“合作线程(cooperation thread)”显示共享数据的状态已经改变时,阻塞在条件上的线程会被唤醒;然后,被唤醒的线程重新计算它的条件表达式,如果共享数据达到预期状态,则恢复处理。再复杂的条件表达式也可以通过条件变量来等待;所以,较之前面所说的同步机制,条件变量允许更复杂的调度决策。
Windows 平台下的同步机制 (1)– 临界区(CriticalSection)
临界区的使用在线程同步中应该算是比较简单,说它简单还是说它同后面讲到的其它方法相比更容易理解。举个简单的例子:比如说有一个全局变量(公共资源)两个线程都会对它进行写操作和读操作,如果我们在这里不加以控制,会产生意想不到的结果。假设线程A正在把全局变量加1然后打印在屏幕上,但是这时切换到线程B,线程B又把全局变量加1然后又切换到线程A,这时候线程A打印的结果就不是程序想要的结果,也就产生了错误。解决的办法就是设置一个区域,让线程A在操纵全局变量的时候进行加锁,线程B如果想操纵这个全局变量就要等待线程A释放这个锁,这个也就是临界区的概念。
使用方法:
CRITICAL_SECTION cs;
InitializeCriticalSection(&cs);
EnterCriticalSection(&cs);
…
LeaveCriticalSection(&cs);
DeleteCriticalSection(&cs);
#include "stdafx.h"
#include <windows.h>
#include <process.h>
#include <iostream>
using namespace std;
/****************************************************************
*在使用临界区的时候要注意,每一个共享资源就有一个CRITICAL_SECTION
*如果要一次访问多个共享变量,各个线程要保证访问的顺序一致,如果不
*一致,很可能发生死锁。例如:
* thread one:
* EnterCriticalSection(&c1)
* EnterCriticalSection(&c2)
* …
* Leave…
* Leave…
*
* thread two:
* EnterCriticalSection(&c2);
* EnterCriticalSection(&c1);
* …
* Leave…
* Leave…
*这样的情况就会发生死锁,应该让线程2进入临界区的顺序同线程1相同
****************************************************************/
封装类:
class Lock;
class CLock
{
public:
CLock() { ::InitializeCriticalSection(&_cs); }
~CLock() { ::DeleteCriticalSection(&_cs); }
private:
friend class Lock;
CRITICAL_SECTION _cs;
CLock(const CLock&);
CLock& operator=(const CLock&);
};
class Lock
{
public:
Lock(CLock& lock) : _lock(lock) { ::EnterCriticalSection(&_lock._cs); }
~Lock() { ::LeaveCriticalSection(&_lock._cs); }
private:
CLock& _lock;
Lock(const Lock&);
Lock& operator=(const Lock&);
};
Windows 平台下的同步机制 (2)– 互斥体(Mutex)
windows api中提供了一个互斥体,功能上要比临界区强大。Mutex是互斥体的意思,当一个线程持有一个Mutex时,其它线程申请持有同一个Mutex会被阻塞,因此可以通过Mutex来保证对某一资源的互斥访问(即同一时间最多只有一个线程访问)。
调用CreateMutex可以创建或打开一个Mutex对象,其原型如下
HANDLE CreateMutex(
LPSECURITY_ATTRIBUTES lpMutexAttributes,
BOOL bInitialOwner,
LPCTSTR lpName
);
其中参数lpMutexAttributes用来设定Mutex对象的安全描述符和是否允许子进程继承句柄。bInitialOwner表明是否将Mutex的持有者设置为调用线程。lpName参数设置Mutex的名字,该名字区分大小写并不能包含"",最大长度为MAX_PATH,可设置为NULL表明该Mutex为匿名对象。
如果调用成功,则返回Mutex的句柄,否则返回NULL,如果lpName不为NULL且调用前同名的Mutex已被创建,则返回同名Mutex的句柄,此时调用GetLastError将返回ERROR_ALREADY_EXISTS,参数bInitialOwner将被忽略。
还可以调用OpenMutex打开创建的非匿名Mutex,原型如下
HANDLE OpenMutex(
DWORD dwDesiredAccess,
BOOL bInheritHandle,
LPCTSTR lpName
);
在成功创建或打开Mutex后,可以使用wait functions来等待并获取Mutex的持有权。
下面的例子用来通过Mutex对象控制某一应用程序只运行一次
int WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdShow)
{
HANDLE hMutex = CreateMutex(NULL, FALSE, "Mutex_Only_One_Instance_Allowed");
if (NULL == hMutex)
{
Error("Create mutex error.");
return -1;
}
DWORD dw = WaitForSingleObject(hMutex, 0);
if (WAIT_FAILED == dw)
{
Error("Wait for mutex error.");
CloseHandle(hMutex); // 释放句柄,当指向同一系统对象的所有句柄释放后,该对象将被删除。
return -1;
}
else if (WAIT_TIMEOUT == dw)
{
// 另外一个实例正在运行
CloseHandle(hMutex);
return 1;
}
// 没有其它实例在运行,本实例将继续运行
// 在此实现必要的功能性代码,如创建窗口,进入消息循环
// ……………
ReleaseMutex(hMutex); // 释放hMutex的持有权,注意这并不等同于删除Mutex对象
CloseHandle(hMutex);
return 0;
}
其中WaitForSingleObject是等待特定对象发出信号(signaled),而Mutex对象在没有任何线程持有时会发出信号。
与临界区(critical section)有什么区别,为什么强大?它们有以下几点不一致:
1.critical section是局部对象,而mutex是核心对象。因此像waitforsingleobject是不可以等待临界区的。
2.critical section是快速高效的,而mutex同其相比要慢很多
3.critical section使用范围是单一进程中的各个线程,而mutex由于可以有一个名字,因此它是可以应用于不同的进程,当然也可以应用于同一个进程中的不同线程。
4.critical section 无法检测到是否被某一个线程释放,而mutex在某一个线程结束之后会产生一个abandoned的信息。同时mutex只能被拥有它的线程释放。下面举两个应用mutex的例子,一个是程序只能运行一个实例,也就是说同一个程序如果已经运行了,就不能再运行了;另一个是关于非常经典的哲学家吃饭问题的例子。
互斥体通常用于多进程之间的同步问题
程序运行单个实例:
#include "stdafx.h"
#include <windows.h>
#include <process.h>
#include <iostream>
using namespace std;
//当输入s或者c时候结束程序
void PrintInfo(HANDLE& h, char t)
{
char c;
while (1)
{
cin >> c;
if (c == t)
{
ReleaseMutex(h);
CloseHandle(h);
break;
}
Sleep(100);
}
}
int main(int argc, char* argv[])
{
//创建mutex,当已经程序发现已经有这个mutex时候,就相当于openmutex
HANDLE hHandle = CreateMutex(NULL, FALSE, "mutex_test");
if (GetLastError() == ERROR_ALREADY_EXISTS)
{
cout << "you had run this program!" << endl;
cout << "input c to close this window" << endl;
PrintInfo(hHandle, ‘c’);
return 1;
}
cout << "program run!" << endl;
cout << "input s to exit program" <<endl;
PrintInfo(hHandle, ‘s’);
return 1;
}
封装:
struct _Lock
{
_Lock(HANDLE& mtx) : _mtx(mtx), _ret(WAIT_OBJECT_0)
{
if (_mtx != 0)
_ret = WaitForSingleObject(_mtx, 100000);
}
~_Lock()
{
Release();
}
void Release()
{
if (_mtx != 0 && _ret == WAIT_OBJECT_0)
ReleaseMutex(_mtx);
}
HANDLE& _mtx;
DWORD _ret;
};
const int PHILOSOPHERS = 5; //哲学家人数
const int TIME_EATING = 50; //吃饭需要的时间 毫秒
HANDLE event[PHILOSOPHERS]; //主线程同工作线程保持同步的句柄数组
HANDLE mutex[PHILOSOPHERS]; //mutex数组,这里相当于公共资源筷子
CRITICAL_SECTION cs; //控制打印的临界区变量
UINT WINAPI ThreadFunc(void* arg)
{
int num = (int)arg;
DWORD ret = 0;
while (1)
{
ret = WaitForMultipleObjects(2, mutex, TRUE, 1000);
if (ret == WAIT_TIMEOUT)
{
Sleep(100);
continue;
}
EnterCriticalSection(&cs);
cout << "philosopher " << num << " eatting" << endl;
LeaveCriticalSection(&cs);
Sleep(TIME_EATING);
break;
}
//设置时间为有信号
SetEvent(event[num]);
return 1;
}
int main(int argc, char* argv[])
{
HANDLE hThread;
InitializeCriticalSection(&cs);
//循环建立线程
for (int i = 0; i < PHILOSOPHERS; i++)
{
mutex[i] = CreateMutex(NULL, FALSE, "");
event[i] = CreateEvent(NULL, TRUE, FALSE, "");
hThread = (HANDLE)_beginthreadex(NULL, 0, ThreadFunc, (void*)i, 0, NULL);
if (hThread == 0)
{
cout << "create thread " << i << "failed with code: "
<< GetLastError() << endl;
DeleteCriticalSection(&cs);
return -1;
}
CloseHandle(hThread);
}
//等待所有的哲学家吃饭结束
DWORD ret = WaitForMultipleObjects(PHILOSOPHERS, event, TRUE, INFINITE);
if (ret == WAIT_OBJECT_0)
{
cout << "all the philosophers had a dinner!" << endl;
}
else
{
cout << "WaitForMultipleObjects failed with code: " << GetLastError() << endl;
}
DeleteCriticalSection(&cs);
for (int j = 0; j < PHILOSOPHERS; j++)
{
CloseHandle(mutex[j]);
}
return 1;
}
Windows 平台下的同步机制 (3)– 事件(Event)
事件对象的特点是它可以应用在重叠I/O(overlapped I/0)上,比如说socket编程中有两种模型,一种是重叠I/0,一种是完成端口都是可以使用事件同步。它也是核心对象,因此可以被waitforsingleobje这些函数等待;事件可以有名字,因此可以被其他进程开启。
Event即事件是一种用于进行线程/进程间同步的对象,事件有置位和复位两种状态,当线程通过waiting functions等待Event对象置位时该线程将进入阻塞状态,当该Event对象被置位或等待超时后,等待的线程将恢复执行。Event可以用在一个线程要等待其它线程时。
可以使用CreateEvent创建Event对象
HANDLE WINAPI CreateEvent(
LPSECURITY_ATTRIBUTES lpEventAttributes,
BOOL bManualReset,
BOOL bInitialState,
LPCTSTR lpName
);
lpEventAttributes用于指定Event对象的安全属性,包括句柄是否可被子进程继承和对象的安全描述符。可设置NULL取默认安全属性。
bManualReset表明Event对象是否需要手动复位。如果该参数为TRUE,则Event对象需要通过ResetEvent函数手动复位。如果该参数为FALSE,则Event被创建为自动复位的Event,任何等待的线程被恢复执行后,该Event将被系统自动复位。打个比方,如果有10个线程在等待一个Event,这时将Event置位,如果这是个手动复位Event,那么这10个线程将被依次唤醒直到通过ResetEvent调用将该Event复位;如果Event为自动复位Event,那么10个线程中的第一个被唤醒后Event被自动复位,其它线程将继续等待。
bInitialState参数表明Event对象被创建后默认是否置位。
lpName参数是Event的名字,可以为空表明将创建匿名Event。
CreateEvent函数在调用成功后返回Event句柄。如果同名Event已经存在,则返回这个已经存在了的Event的句柄,此时调用GetLastError函数将返回 ERROR_ALREADY_EXISTS。
还可以通过OpenEvent打开一个已经创建的非匿名Event
HANDLE WINAPI OpenEvent(
DWORD dwDesiredAccess,
BOOL bInheritHandle,
LPCTSTR lpName
);
在创建或打开了Event对象之后,可以使用SetEvent和ResetEvent函数来置位或复位一个Event对象。
BOOL WINAPI SetEvent(
HANDLE hEvent
);
BOOL WINAPI ResetEvent(
HANDLE hEvent
);
要等待一个或多个Event对象置位可以使用wait functions。
简单示例,一个线程不停读取用户输入并放入message列表,另一个线程模拟将message发送出去,如果没有消息,则发送线程处于阻塞状态等待,一旦有消息录入,输入线程将event置位,发送线程即被激活并逐个发送消息。
#include "stdafx.h"
#include <windows.h>
#include <tchar.h>
#include <iostream>
#include <list>
#include <string>
using namespace std;
#ifdef _UNICODE
typedef wstring tstring;
#define tcout wcout
#define tcin wcin
#else
typedef string tstring;
#define tcout cout
#define tcin cin
#endif /* _UNICODE */
typedef list<tstring> StringList;
HANDLE hMutex = NULL;
HANDLE hEvent = NULL;
HANDLE hSendThread = NULL;
StringList messages;
bool isRunning;
DWORD WINAPI SendThreadProc(LPVOID lpThreadParameter)
{
DWORD dw;
while(isRunning)
{
dw = WaitForSingleObject(hEvent, INFINITE);
if(dw != WAIT_OBJECT_0)
{
tcout << _T("Wait error.") << endl;
return -1;
}
dw = WaitForSingleObject(hMutex, INFINITE);
if(WAIT_OBJECT_0 != dw && WAIT_ABANDONED != dw)
{
tcout << _T("Wait error.") << endl;
return -2;
}
StringList list(messages);
messages.clear();
ReleaseMutex(hMutex);
for(StringList::iterator i = list.begin(); i != list.end(); i++)
{
Sleep(1000); //休眠1秒模拟发送所耗时间
tcout << _T("/* Send Message:") << *i << _T(" */");
}
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
hMutex = CreateMutex(NULL, FALSE, NULL);
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
isRunning = true;
hSendThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)SendThreadProc, NULL, 0, NULL);
while(isRunning)
{
tstring s;
tcin >> s;
if(s == _T("quit"))
{
isRunning = true;
break;
}
DWORD dw = WaitForSingleObject(hMutex, INFINITE);
if(WAIT_OBJECT_0 != dw && WAIT_ABANDONED != dw)
{
tcout << _T("Wait error.") << endl;
return -1;
}
messages.push_back(s);
ReleaseMutex(hMutex);
SetEvent(hEvent);
}
CloseHandle(hMutex);
CloseHandle(hEvent);
CloseHandle(hSendThread);
return 0;
}
Windows 平台下的同步机制 (4)– 信号量(Semaphore)
Semaphore是旗语的意思,在Windows中,Semaphore对象用来控制对资源的并发访问数。Semaphore对象具有一个计数值,当值大于0时,Semaphore被置信号,当计数值等于0时,Semaphore被清除信号。每次针对Semaphore的wait functions返回时,计数值被减1,调用ReleaseSemaphore可以将计数值增加 lReleaseCount 参数值指定的值。
CreateSemaphore函数用于创建一个Semaphore
HANDLE CreateSemaphore(
LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,
LONG lInitialCount,
LONG lMaximumCount,
LPCTSTR lpName
);
lpSemaphoreAttributes为安全属性,
lInitialCount为Semaphore的初始值,
lMaximumCount为最大值,
lpName为Semaphore对象的名字,NULL表示创建匿名Semaphore
此外还可以调用OpenSemaphore来打开已经创建的非匿名Semaphore
HANDLE OpenSemaphore(
DWORD dwDesiredAccess,
BOOL bInheritHandle,
LPCTSTR lpName
);
调用ReleaseSemaphore增加Semaphore计算值
BOOL ReleaseSemaphore(
HANDLE hSemaphore,
LONG lReleaseCount,
LPLONG lpPreviousCount
);
lpReleaseCount参数表示要增加的数值,
lpPreviousCount参数用于返回之前的计算值,如果不需要可以设置为NULL
比如我们要控制到服务器的连接数不超过10个,可以创建一个Semaphore,初值为10,每当要连接到服务器时,使用WaitForSingleObject请求Semaphore,当成功返回后再尝试连接到服务器,当连接失败或连接使用完后释放时,调用ReleaseSemaphore增加Semaphore计数值。
看个例子,popo现在好像在本机只能运行三个实例,mutex可以让程序只是运行一个实例,下面我通过信号量机制让程序像popo一样运行三个实例。
#include "stdafx.h"
#include <windows.h>
#include <iostream>
using namespace std;
const int MAX_RUNNUM = 3; //最多运行实例个数
void PrintInfo()
{
char c;
cout << "run program" << endl;
cout << "input s to exit program!" << endl;
while (1)
{
cin >> c;
if (c == ‘s’)
{
break;
}
Sleep(10);
}
}
int main(int argc, char* argv[])
{
HANDLE hSe = CreateSemaphore(NULL, MAX_RUNNUM, MAX_RUNNUM, "semaphore_test");
DWORD ret = 0;
if (hSe == NULL)
{
cout << "createsemaphore failed with code: " << GetLastError() << endl;
return -1;
}
ret = WaitForSingleObject(hSe, 1000);
if (ret == WAIT_TIMEOUT)
{
cout << "you have runned " << MAX_RUNNUM << " program!" << endl;
ret = WaitForSingleObject(hSe, INFINITE);
}
PrintInfo();
ReleaseSemaphore(hSe, 1, NULL);
CloseHandle(hSe);
return 0;
}
From MSDN:
The following example uses a semaphore object to limit the number of threads that can perform a particular task. First, it uses theCreateSemaphore function to create the semaphore and to specify initial and maximum counts, then it uses theCreateThread function to create the threads.
Before a thread attempts to perform the task, it uses the WaitForSingleObject function to determine whether the semaphore’s current count permits it to do so. The wait function’s time-out parameter is set to zero, so the function returns immediately if the semaphore is in the nonsignaled state.WaitForSingleObject decrements the semaphore’s count by one.
When a thread completes the task, it uses the ReleaseSemaphore function to increment the semaphore’s count, thus enabling another waiting thread to perform the task.
#include <windows.h>
#include <stdio.h>
#define MAX_SEM_COUNT 10
#define THREADCOUNT 12
HANDLE ghSemaphore;
DWORD WINAPI ThreadProc( LPVOID );
void main()
{
HANDLE aThread[THREADCOUNT];
DWORD ThreadID;
int i;
// Create a semaphore with initial and max counts of MAX_SEM_COUNT
ghSemaphore = CreateSemaphore(
NULL, // default security attributes
MAX_SEM_COUNT, // initial count
MAX_SEM_COUNT, // maximum count
NULL); // unnamed semaphore
if (ghSemaphore == NULL)
{
printf("CreateSemaphore error: %dn", GetLastError());
return;
}
// Create worker threads
for( i=0; i < THREADCOUNT; i++ )
{
aThread[i] = CreateThread(
NULL, // default security attributes
0, // default stack size
(LPTHREAD_START_ROUTINE) ThreadProc,
NULL, // no thread function arguments
0, // default creation flags
&ThreadID); // receive thread identifier
if( aThread[i] == NULL )
{
printf("CreateThread error: %dn", GetLastError());
return;
}
}
// Wait for all threads to terminate
WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
// Close thread and semaphore handles
for( i=0; i < THREADCOUNT; i++ )
CloseHandle(aThread[i]);
CloseHandle(ghSemaphore);
}
DWORD WINAPI ThreadProc( LPVOID lpParam )
{
DWORD dwWaitResult;
BOOL bContinue=TRUE;
while(bContinue)
{
// Try to enter the semaphore gate.
dwWaitResult = WaitForSingleObject(
ghSemaphore, // handle to semaphore
0L); // zero-second time-out interval
switch (dwWaitResult)
{
// The semaphore object was signaled.
case WAIT_OBJECT_0:
// TODO: Perform task
printf("Thread %d: wait succeededn", GetCurrentThreadId());
bContinue=FALSE;
// Simulate thread spending time on task
Sleep(5);
// Release the semaphore when task is finished
if (!ReleaseSemaphore(
ghSemaphore, // handle to semaphore
1, // increase count by one
NULL) ) // not interested in previous count
{
printf("ReleaseSemaphore error: %dn", GetLastError());
}
break;
// The semaphore was nonsignaled, so a time-out occurred.
case WAIT_TIMEOUT:
printf("Thread %d: wait timed outn", GetCurrentThreadId());
break;
}
}
return TRUE;
}