一:Critiacal_Section
1:使用临界区的目的是确保资源每次只能被一个线程所使用。一个线程进入某个临界区,另一个线程就不能够再进入同一个临界区。临界区不是核心对象,它只存在进程的内存空间。没有所谓的句柄,只能在同一进程中的线程间完成同步。
2:使用函数
VOID InitializeCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
VOID DeleteCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
VOID EnterCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
VOID LeaveCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
操作顺序:
//开始
CRITICAL_SECTION cs;
InitializeCriticalSection(&cs);
//线程1:
EnterCriticalSection(&cs);
LeaveCriticalSection(&cs);
//线程2:
EnterCriticalSection(&cs);
LeaveCriticalSection(&cs);
//最后:
DeleteCriticalSection(&cs);
3:封装成类:两个线程对同一数据一读一写,因此需要让它们在这里互斥,不能同时访问。
class InstanceLock;
class InstanceLockBase
{
friend class InstanceLock;//可以正确的访问私有函数
CRITICAL_SECTION cs;
void Lock()//私有
{
::EnterCriticalSection(&cs);//加锁
}
void Unlock()//私有
{
::LeaveCriticalSection(&cs);//解锁
}
protected://继承
InstanceLockBase()
{
::InitializeCriticalSection(&cs);//构建时初始化
}
~InstanceLockBase()
{
::DeleteCriticalSection(&cs);//析构时释放
}
};
//为了保证Lock和Unlock能成对调用,
//C++对于构造函数和析构函数的调用是自动成对的,把对Lock和Unlock的调用专门写在一个类的构造函数和析构函数中
class InstanceLock
{
InstanceLockBase* _pObj;
public:
InstanceLock(InstanceLockBase* pObj)
{
_pObj = pObj;
if(NULL != _pObj)
_pObj->Lock();
}
~InstanceLock()
{
if(NULL != _pObj)
_pObj->Unlock();
}
};
void Say(char* text)
{
static int count = 0;
SYSTEMTIME st;
::GetLocalTime(&st);
printf("%03d [%02d:%02d:%02d.%03d]%s /n", ++count, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, text);
}
//1 让被锁类从InstanceLockBase继承
//2 所有要访问被锁对象的代码前面声明InstanceLock的实例,并传入被锁对象的指针。
class MyClass: public InstanceLockBase{};
MyClass mc;
//将InstanceLockBase中protected改称public后,也可以直接用基类
//InstanceLockBase mc;
DWORD CALLBACK ThreadProc(LPVOID param)
{
InstanceLock il(&mc);
Say("in sub thread, lock");
Sleep(2000);
Say("in sub thread, unlock");
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
CreateThread(0, 0, ThreadProc, 0, 0, 0);
{
InstanceLock il(&mc);
Say("in main thread, lock");
Sleep(3000);
Say("in main thread, lock");
}
Sleep(5000);
return 0;
}
4:另一种封装方法
class InstanceLockBase
{
CRITICAL_SECTION cs;
public:
InstanceLockBase()
{
InitializeCriticalSection(&cs);
}
~InstanceLockBase()
{
DeleteCriticalSection(&cs);
}
void Lock()
{
EnterCriticalSection(&cs);
}
void Unlock()
{
LeaveCriticalSection(&cs);
}
};
template <typename LockType>
class AutoLock
{
LockType& lock;
public:
AutoLock(LockType& _lock) : lock(_lock)
{
lock.Lock();
}
~AutoLock()
{
lock.Unlock();
}
};
#endif
//使用方法
InstanceLockBase lock;
AutoLock<InstanceLockBase> tmplock(lock);
//将InstanceLockBase中protected改称public后,也可以直接用基类
InstanceLockBase lock;
InstanceLock il(&lock);
二:Mutex
1:和临界区(Critical Section)的区别
1)锁住一个未被拥有的mutex,比锁住一个未被拥有的critical section,所需花费几乎100倍的时间。因为critical section不需要进入操作系统核心。
2)互斥器可以跨进程使用(此时应指定名称。未命名的互斥器只能在同一进程内使用)。临界区只能够在同一进程内使用。
3)等待一个互斥器,可以指定“结束等待”的时间。临界区不可以。
2:使用函数
HANDLE CreateMutex();
HANDLE OpenMutex();
DWORD WaitForSingleObject();
DWORD WaitForMultipleObjects();
DWORD MsgWaitForMultipleObjects();
BOOL ReleaseMutex();
BOOL CloseHandle();
HANDLE CreateMutex(LPSECURITY_ATTRIBUTES lpMutexAttributes, // 指向安全属性的指针
BOOL bInitialOwner, // 初始化互斥对象的所有者
LPCTSTR lpName); // 指向互斥对象名的指针
注意:一旦不再需要,注意必须用CloseHandle函数将互斥体句柄关闭。从属于它的所有句柄都被关闭后,就会删除对象。
根据lpName,系统中的任何线程都可以使用这个名称来处理该Mutex,Mutex名称对整个系统而言是全局的。
HANDLE WINAPI OpenMutex(DWORD dwDesiredAccess, //打开一个已经存在的Mutex。
BOOL bInheritHandle,
LPCTSTR lpName);
3:下列说明有两个线程需要操作资源,但是一个时刻只能有一个线程操作该资源
#define THREADCOUNT 2
HANDLE ghMutex;
DWORD WINAPI WriteToDatabase( LPVOID );
void main()
{
HANDLE aThread[THREADCOUNT];
DWORD ThreadID;
int i;
ghMutex = CreateMutex( NULL, FALSE, NULL);
if (ghMutex == NULL)
{
printf("CreateMutex error: %d/n", GetLastError());
return;
}
for( i=0; i < THREADCOUNT; i++ )
{
aThread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) WriteToDatabase, NULL, 0, &ThreadID);
if( aThread[i] == NULL )
{
printf("CreateThread error: %d/n", GetLastError());
return;
}
}
WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
for( i=0; i < THREADCOUNT; i++ )
CloseHandle(aThread[i]);
CloseHandle(ghMutex);
}
DWORD WINAPI WriteToDatabase( LPVOID lpParam )
{
DWORD dwCount=0, dwWaitResult;
while( dwCount < 20 )
{
//线程间只有一个能使用该信号量
dwWaitResult = WaitForSingleObject(ghMutex,INFINITE);
switch (dwWaitResult)
{
case WAIT_OBJECT_0:
__try
{
printf("Thread %d writing to database.../n", GetCurrentThreadId());
dwCount++;
}
__finally
{
if (! ReleaseMutex(ghMutex))
{
}
}
break;
case WAIT_ABANDONED:
return FALSE;
}
}
return TRUE;
}
3:多个线程访问同一数据,一部分是读,一部分是写。我们知道只有读-写或写-写同时进行时可能会出现问题,而读-读则可以同时进行,因为它们不会对数据进行修改,所以也有必要在C++中封装一种方便的允许读-读并发、读-写与写-写互斥的锁。要实现这种锁,使用临界区就很困难了,不如改用内核对象,这里我使用的是互斥量(Mutex)
class RWLock;
class _RWLockBase
{
friend class RWLock;
protected:
virtual DWORD ReadLock(int timeout) = 0;
virtual void ReadUnlock(int handleIndex) = 0;
virtual DWORD WriteLock(int timeout) = 0;
virtual void WriteUnlock() = 0;
};
template <int maxReadCount = 3> //这里给一个缺省参数,尽量减少客户端代码量
class RWLockBase: public _RWLockBase
{
//二是为了允许读-读并发,这里只声明一个Mutex是不够的,必须要声明多个Mutex,而且有多少个Mutex就同时允许多少个读线程并发
HANDLE handles[maxReadCount];
DWORD ReadLock(int timeout) //加读锁,只要等到一个互斥量返回即可
{
return ::WaitForMultipleObjects(maxReadCount, handles, FALSE, timeout);
}
void ReadUnlock(int handleIndex) //解读锁,释放已获得的互斥量
{
::ReleaseMutex(handles[handleIndex]);
}
DWORD WriteLock(int timeout) //加写锁,等到所有互斥量,从而与其他所有线程互斥
{
return ::WaitForMultipleObjects(maxReadCount, handles, TRUE, timeout);
}
void WriteUnlock() //解写锁,释放所有的互斥量
{
for(int i = 0; i < maxReadCount; i++)
::ReleaseMutex(handles[i]);
}
protected:
RWLockBase() //构造函数,初始化每个互斥量
{
for(int i = 0; i < maxReadCount; i++)
handles[i] = ::CreateMutex(0, FALSE, 0);
}
~RWLockBase() //析构函数,销毁对象
{
for(int i = 0; i < maxReadCount; i++)
::CloseHandle(handles[i]);
}
};
class RWLock
{
bool lockSuccess; //因为有可能超时,需要保存是否等待成功
int readLockHandleIndex; //对于读锁,需要知道获得的是哪个互斥量
_RWLockBase* _pObj; //目标对象基类指针
public:
//这里通过第二个参数决定是加读锁还是写锁,第三个参数为超时的时间
RWLock(_RWLockBase* pObj, bool readLock = true, int timeout = 3000)
{
_pObj = pObj;
lockSuccess = FALSE;
readLockHandleIndex = -1;
if(NULL == _pObj)
return;
if(readLock) //读锁
{
DWORD retval = _pObj->ReadLock(timeout);
if(retval < WAIT_ABANDONED) //返回值小于WAIT_ABANDONED表示成功
{ //其值减WAIT_OBJECT_0就是数组下标
readLockHandleIndex = retval - WAIT_OBJECT_0;
lockSuccess = TRUE;
}
}
else
{
DWORD retval = _pObj->WriteLock(timeout);
if(retval < WAIT_ABANDONED) //写锁时获得了所有互斥量,无需保存下标
lockSuccess = TRUE;
}
}
~RWLock()
{
if(NULL == _pObj)
return;
if(readLockHandleIndex > -1)
_pObj->ReadUnlock(readLockHandleIndex);
else
_pObj->WriteUnlock();
}
bool IsLockSuccess() const { return lockSuccess; }
};
class MyClass2: public RWLockBase<>
{};
MyClass2 mc2;
void Say(char* text, int index)
{
static int count = 0;
SYSTEMTIME st;
::GetLocalTime(&st);
printf("%03d [%02d:%02d:%02d.%03d]%s %d/n", ++count, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, text, index);
}
//读线程
DWORD CALLBACK ReadThreadProc(LPVOID param)
{
int i = (int)param;
RWLock lock(&mc2); //加读锁
if(lock.IsLockSuccess()) //如果加锁成功
{
Say("read thread %d started", i); //为了代码短一些,假设Say函数有这种能力
Sleep(1000);
Say("read thread %d ended", i);
}
else //加锁超时,则显示超时信息
{
Say("read thread %d timeout", i);
}
return 0;
}
//写线程
DWORD CALLBACK WriteThreadProc(LPVOID param)
{
int i = (int)param;
RWLock lock(&mc2, false); //加写锁。
if(lock.IsLockSuccess())
{
Say("write thread %d started", i);
Sleep(600);
Say("write thread %d ended", i);
}
else
{
Say("write thread %d timeout", i);
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
int i;
for(i = 0; i < 5; i++)
::CreateThread(0, 0, ReadThreadProc, (LPVOID)i, 0, 0);
for(i = 0; i < 5; i++)
::CreateThread(0, 0, WriteThreadProc, (LPVOID)i, 0, 0);
Sleep(10000);
return 0;
}
三:Seamphore
1:Semaphore是一件可以容纳N人的房间,如果人不满就可以进去,如果人满了,就要等待有人出来。对于N=1的情况,称为binarysemaphore。一般的用法是,用于限制对于某一资源的同时访问。而Mutex是一把钥匙,一个人拿了就可进入一个房间,出来的时候把钥匙交给队列的第一个。一般的用法是用于串行化对critical section代码的访问,保证这段代码不会被并行的运行。
和Mutex的区别
1)信号量没有所谓的”wait abandoned”状态可被其它线程侦测到;
2)拥有互斥器的线程无论再调用多少次wait…()函数都不会被阻塞。但对于信号量,如果锁定成功也不会收到信号量的拥有权——因为同时可以多个线程同时锁定一个信号量。信号量没有“独占锁定”这种事情,也没有所有权的观念,一个线程可以反复调用wait…()函数产生新的锁定,每锁定一次,信号量的现值就减1。
3)与互斥器不同,调用ReleaseSemaphore()的那个线程,并不一定就得是调用Wait…()函数的那个线程。任何线程都可以在任何时间调用ReleaseSemaphore(),解除被任何线程锁定的信号量。
2:常用函数
HANDLE CreateSemaphore();
HANDLE OpenSemaphore();
DWORD WaitForSingleObject();
DWORD WaitForMultipleObjects();
DWORD MsgWaitForMultipleObjects();
BOOL ReleaseSemaphore();
BOOL CloseHandle();
HANDLE CreateSemaphore(LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, //CE不支持
LONG lInitialCount, //信号量初始化计数值
LONG lMaximumCount, //信号量计数最大值,统一时间能锁定Seamphore之线程的最大个数。
LPCTSTR lpName); //信号量对象名称
BOOL ReleaseSemaphore(HANDLE hSemaphore, //信号量句柄
LONG lReleaseCount, //信号量计数增加的值
LPLONG lpPreviousCount); //输出量,表示上一次信号量计数
3:下列可以理解为有12个人需要使用10个房间 ,当时一个房间同一时刻只能被一个人使用。
#define MAX_SEM_COUNT 10
#define THREADCOUNT 12
HANDLE ghSemaphore;
DWORD WINAPI ThreadProc( LPVOID );
void main()
{
HANDLE aThread[THREADCOUNT];
DWORD ThreadID;
int i;
ghSemaphore = CreateSemaphore( NULL, MAX_SEM_COUNT, MAX_SEM_COUNT, NULL);
if (ghSemaphore == NULL)
{
printf("CreateSemaphore error: %d/n", GetLastError());
return;
}
for( i=0; i < THREADCOUNT; i++ )
{
aThread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) ThreadProc, NULL, 0, &ThreadID);
if( aThread[i] == NULL )
{
printf("CreateThread error: %d/n", GetLastError());
return;
}
}
WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
for( i=0; i < THREADCOUNT; i++ )
CloseHandle(aThread[i]);
CloseHandle(ghSemaphore);
}
DWORD WINAPI ThreadProc( LPVOID lpParam )
{
DWORD dwWaitResult;
BOOL bContinue=TRUE;
while(bContinue)
{
dwWaitResult = WaitForSingleObject( ghSemaphore, 0L);
switch (dwWaitResult)
{
case WAIT_OBJECT_0:
printf("Thread %d: wait succeeded/n", GetCurrentThreadId());
bContinue=FALSE;
Sleep(5);
if (!ReleaseSemaphore(ghSemaphore, 1, NULL) )
{
printf("ReleaseSemaphore error: %d/n", GetLastError());
}
break;
case WAIT_TIMEOUT:
printf("Thread %d: wait timed out/n", GetCurrentThreadId());
break;
}
}
return TRUE;
}
4:对列Queue的同步访问。即生产者消费者概念。
class Queue
{
public:
Queue();
~Queue();
void push(const int* t, size_t count);
int pop();
private:
HANDLE semaphore;
queue<int> queue;
};
//为了保证能顺利释放信号变量,初始队列中没有产品
Queue:Queue() : semaphore(NULL)
{
semaphore = CreateSemaphore(NULL,0,1024*1024*10,NULL);
}
Queue::~Queue()
{
if (semaphore)
CloseHandle(semaphore);
}
//放入了Count了个产品后,Seamphore信号量重置值
void Queue::push(const int* t, size_t count)
{
for (size_t i=0; i<count; i++)
{
queue_.push(t[i]);
}
ReleaseSemaphore(semaphore_, count, NULL);
}
int Queue::pop()
{
WaitForSingleObject(semaphore_, INFINITE);
return queue_.pop();
}
四:Event
1:
如果一个事件是自动事件,那么当它处于激发状态时,可唤醒一个等待它的线程,线程被唤醒后,自动地转入非激发状态;
如果一个事件是手动事件,那么当它处于激发状态时,可唤醒所有等待它的线程,并且一直保持状态为激发状态,直到被明确地ResetEvent()后,才转入非激发状态。
2:常用函数
HANDLE CreateEvent();
HANDLE OpenEvent();
BOOL SetEvent();
BOOL PluseEvent();
BOOL ResetEvent();
DWORD WaitForSingleObject();
DWORD WaitForMultipleObjects();
DWORD MsgWaitForMultipleObjects();
BOOL CloseHandle();
HANDLE CreateEvent(LPSECURITY_ATTRIBUTES lpEventAttributes, // 安全属性
BOOL bManualReset, // 复位方式
BOOL bInitialState, // 初始状态
LPCTSTR lpName); // 对象名称
bManualReset:指定将事件对象创建成手动复原还是自动复原。如果是TRUE,那么必须用ResetEvent函数来手工将事件的状态复原到无信号状态。如果设置为FALSE,当事件被一个等待线程释放以后,系统将会自动将事件状态复原为无信号状态。
bInitialState:指定事件对象的初始状态。如果为TRUE,初始状态为有信号状态;否则为无信号状态。
lpName:指定事件的对象的名称,任何线程或进程都可以根据这个名称使用这一Event对象。
3:以下通过事件控制当写时,四个读线程不可操作队列。
#define THREADCOUNT 4
HANDLE ghWriteEvent;
HANDLE ghThreads[THREADCOUNT];
DWORD WINAPI ThreadProc(LPVOID);
void CreateEventsAndThreads(void)
{
int i;
DWORD dwThreadID;
//自动重置事件
ghWriteEvent = CreateEvent(NULL,TRUE,FALSE,TEXT("WriteEvent"));
if (ghWriteEvent == NULL)
{
printf("CreateEvent failed (%d)/n", GetLastError());
return;
}
for(i = 0; i < THREADCOUNT; i++)
{
ghThreads[i] = CreateThread(NULL, 0, ThreadProc, NULL, 0, &dwThreadID);
if (ghThreads[i] == NULL)
{
printf("CreateThread failed (%d)/n", GetLastError());
return;
}
}
}
void WriteToBuffer(VOID)
{
printf("Main thread writing to the shared buffer.../n");
if (! SetEvent(ghWriteEvent) )
{
printf("SetEvent failed (%d)/n", GetLastError());
return;
}
}
void CloseEvents()
{
CloseHandle(ghWriteEvent);
}
void main()
{
DWORD dwWaitResult;
CreateEventsAndThreads();
//开始写数据,写完后激活事件
WriteToBuffer();
printf("Main thread waiting for threads to exit.../n");
//四个线程都在等待激活事件
dwWaitResult = WaitForMultipleObjects(THREADCOUNT, ghThreads, TRUE, INFINITE);
switch (dwWaitResult)
{
case WAIT_OBJECT_0:
printf("All threads ended, cleaning up for application exit.../n");
break;
default:
printf("WaitForMultipleObjects failed (%d)/n", GetLastError());
return;
}
// Close the events to clean up
CloseEvents();
}
DWORD WINAPI ThreadProc(LPVOID lpParam)
{
DWORD dwWaitResult;
printf("Thread %d waiting for write event.../n", GetCurrentThreadId());
dwWaitResult = WaitForSingleObject( ghWriteEvent, INFINITE);
switch (dwWaitResult)
{
case WAIT_OBJECT_0:
printf("Thread %d reading from buffer/n",GetCurrentThreadId());
break;
default:
printf("Wait error (%d)/n", GetLastError());
return 0;
}
printf("Thread %d exiting/n", GetCurrentThreadId());
return 1;
}
五:Interlocked Variables
interlocked函数没有“等待”机能,它只是保证对某个特定的变量的存取是排他性的、原子性的。这相当于给某个变量设置了一个临界区或互斥量。
1:常用函数
LONG InterlockedIncrement(LPLONG lpAddend); //(*lpAddend)++;
LONG InterlockedDecrement(LPLONG lpAddend); //(*lpAddend)--;
LONG InterlockedExchangeAdd(LPLONG Addend,LONG Increment); //(*lpAddend) += Increment;
LONG InterlockedExchange(LPLONG Target,LONG Value); //(*lpAddend) = Value;
PVOID InterlockedCompareExchange(PVOID *Destination,PVOIDExchange,PVOID Comperand);//if (*Destination == Comperand) *Destination= Exchange;
六:总结
1:Critical Section
Critical section(临界区)用来实现“排他性占有”。适用范围是单一进程的各线程。它是:
• 一个局部对象,不是核心对象;
• 快速而有效率;
• 不能够同时有一个以上的临界区被等待,否则容易陷入死锁;
• 无法侦测是否被某个线程放弃。
2:Mutex
Mutex是一个核心对象,可以在不同的线程之间实现“排他性占有”,甚至那些线程分属不同的进程。它是:
• 一个核心对象;
• 如果mutex拥有的那个线程结束,则会产生一个”abandoned”错误信息;
• 可以使用wait…()函数等待一个mutex;
• 可以具名,因此可以被其它进程开启;
• 只能被拥有它的线程释放。
3:Semaphore
Semaphore用来追踪有限的资源。它是:
• 一个核心对象;
• 没有拥有者;
• 可以具名,因此可以被其它进程开启;
• 可以被任何一个线程释放。
4:Event Object
Event object通常用于overlapped I/O,或用来设计某些自定义的同步对象。它是:
• 一个核心对象;
• 完全在程序掌控之下;
• 适用于设计新的同步对象;
• “要求苏醒”的请求并不会被存储起来,可能会遗失掉;
• 可以具名,因此可以被其它进程开启;
5:Interlocked Variable
Interlocked variable主要用于应用计数,不使用临界区或互斥器之类,对对4字节的数值操作有些基本的同步。