Windows核心编程 第八章 用户模式下的线程同步
1、 线程之间通信发生在以下两种情况:
① 需要让多个线程同时访问一个共享资源,同时不能破坏资源的完整性
② 一个线程需要通知其他线程某项任务已经完成
2、 原子访问
所谓原子访问就是一个线程在访问某个资源的同时能够保证没有其他线程会在同一时刻访问同一资源。
CPU一个最小操作单位并不是一条编程语言指令,所以即使在线程中只有一行赋值语句,在CPU看来却不是一条语句,所以如果两个线程中有同样的一行赋值语句,那么也可能会发生不可预料的结果。
Windows提供了一系列原子操作的函数,Interlocked系列函数。
LONG InterlockedExchangeAdd(
PLONG volatile plAddend, //目的操作数
LONG lIncrement //可为正数或负数,正数为加,负数为减
);
函数返回之前修改之前的目的操作数
Eg.
int _tmain(int argc, _TCHAR* argv[])
{
LONG num = 1000;
LONG lOld = InterlockedExchangeAdd(&num,
1000000);
cout<<lOld<<endl<<num<<endl;
return 0;
}
LONG InterlockedExchangeAdd64(
PLONGLONG volatile pllAddend, //目的操作数
LONGLONG llIncrement //正数或负数
);
LONG InterlockedIncrement(
PLONG volatile lAddend //加1的目的操作数
);
Interlocked系列函数的原理:取决于CPU平台,在X86系列CPU,Interlocked函数会在总线上维持一个硬件信号,这个信号会阻止其他CPU访问同一个内存地址。
需要注意的是:传给Interlocked系列函数的变量地址必须是经过对齐的!
C运行库提供了一个_aligned_malloc函数,用这个函数来分配一块对齐过的内存:
Void* _align_malloc(size_t size, size_t alignment);
Size表示分配字节数, alignment表示对齐字节数,必须是2的N次方。
Interlocked函数执行快,通常只占几个CPU周期,而且不需要用户模式和内核模式之间进行切换(这种切换通常需要1000个周期以上)。
LONG InterlockedExchange( //32位程序 64位程序都是替换32位值
PLONG volatile plTarget //被替换的内存单元
LONG lValue //用来替换的值
);
LONG InterlockedExchange64(
PLONGLONG volatile plTarget, //被替换的内存单元
LONGLONG lValue //用来替换的值
);
PVOID InterlockedExchangePointer( //32位程序下替换32位值,64位程序下替换64位值
PVOID* volatile ppvTarget, //被替换的指针的地址
PVOID pvValue //用来替换的指针值
);
关于旋转锁
旋转锁就是两段代码根据一个标志位变量的值类实现类似关键区的作用。 旋转锁假定所有使用旋转锁的线程都拥有同样的优先级,否则低优先级的线程可能无法得到CPU,在旋转锁中等待的时候使用SwitchToThread替换Sleep可能更好,因为SwitchToThread允许优先级低的线程获得CPU。
【旋转锁变量】和【保护的数据】不能在相同的【高速缓存行】,如果在相同的高速缓存行,那么使用资源的CPU就会与任何试图访问资源的CPU发生争夺,从而影响性能。
在单CPU上不要用旋转锁!
PLONG InterlockedCompareExchange( //32/64位程序下都是对32位值操作
PLONG plDestination, //使用交换值替换该该值
LONG lExchange, //用来替换的值
LONG lComparand //用来与第一个参数指向的值比较
);
上面函数的伪代码:
LONG lOld = *plDestination;
If(*plDestination == lComparand)
{
*lpDestination = lExchange;
}
Return lOld;
//////////////////////////////////////////////////////////////
类似的函数
LONG InterlockedCompareExchangePointer(//32位程序下对32为值操作,64位下对64位值进行操作
PVOID* ppvDestination, //被替换的指针的地址
PVOID pvExchange, //用来替换的指针值
PVOID pvComparand //用来比较的指针值
);
LONG InterlockedCompareExchange64( //对已对齐的64位值进行操作
LONGLONG pllDestination,
LONGLONG llExchange,
LONGLONG llComaprand
);
LONG InterlockedIncrement( // 加1
PLONG plAddend
);
LONG InterlockedDecrement( // 减1
PLONG plAddend
);
另外还有一组基于InterlockedCompareExchange64的OR, AND, XOR辅助函数。
LONGLONG InterlockedAnd64(
LONGLONG* Destination,
LONGLONG value,
LONGLONG Old
);
3、高速缓存行
当CPU从内存中读取一个字节的时候,它并不是从内存中只读一个字节,而是取回一个高速缓存行。 高速缓存行可能包含32字节, 64字节甚至是128字节。
他们始终对齐到32字节边界,64字节边界或128字节边界。 高速缓存行的优点是提高了性能。
缺点也很明显,当CPU1和CPU2读取到各自的高速缓存行的数据包含相同的内存单元,而这个内存单元又在某个CPU中被修改时,那么另一个CPU中的该值就是脏数据,所以需要修改数据的CPU将该地址的数据写回内存,然后通知另一个CPU重新的去该地址的值。
在多处理器的机器上他们同样会损伤性能。
所以:我们应该根据高速缓存行的大小来将应用程序的数据组织在一起,并将数据与缓存的边界对齐,这样做的目的是确保不同的CPU能够各自访问不同的内存地址,而且这些地址不在同一个高速缓存行中。 此外,要把只读数据(或不经常读的数据) 与 可写数据分别存放。 我们还应该把差不多会同一时间内访问的数据组织在一起。
获得高速缓存行的大小:
BOOL WINAPI GetLogicalProcessorInformation(
_Out_ PSYSTEM_LOGICAL_PROCESSOR_INFORMATION Buffer,
_Inout_ PDWORD ReturnLength
);
关于__declspec(aling(#)):
该指示器是用来字节对齐,与他类似的还有#pragma pack(N)
两个都是字节对齐,但是当两者都存在的时候前者即__declspec(align(#))有更高的优先级。
对于结构体来说:
每个成员的的起始地址= min(__declspec(align(#))指定的, #pragma pack(N)指定的,成员本身的大小)的整数倍;
整个结构体的大小 = max(__declspec((#))指定的,结构体中占字节数最大的成员的大小 )的整数倍;
MS Visual studio和其他编译器可能会有所不同,因此最后的结构体大小依据编译器而定。
4、 高级线程同步
4.1 volatile, 轮询
不要在某个线程中使用某个变量在while中轮询来检查其值,这样会浪费CPU时间,甚至当该线程优先级不够高的时候可能会得不到CPU。
如果实在要这样做,可以在while中使用Sleep或SwitchToThread, 轮询变量一定要加上限定符volatile, 加上这个关键字后,CPU每次读这个值的时候都会从内存中读取,编译器对这样的变量不会做优化。
4.2 关键段
Critical Secion
将某段代码(资源)以原子形式访问,当某个线程访问该资源时,不会有其他线程同时访问。
Eg.
///////////////////////////////////////////////////////////
CRITICAL_SECTION cs;
InitializeCriticalSection(&cs);
//子线程1
EnterCriticalSection(&cs);
//codes
//access key resource
//…
LeaveCriticalSection(&cs);
//子线程2
EnterCriticalSection(&cs);
//codes
//访问资源
//…
LeaveCriticalSection(&cs);
///////////////////////////////////////////////////////////////
另外一个很关键的函数,
BOOL TryEnterCriticalSection(PCRITICAL_SECTION pcs);
当不能进入关键区的时候该函数不会导致线程阻塞,而是如果可以进入关键区则返回TRUE并进入, 如果不能则返回FALSE, 这个很有用。 对于每个返回TRUE的该函数都要对应一个LeaveCriticalSection(&cs);
CRITICAL_SECTION内部有一些成员变量,EnterCriticalSection/TryEnterCirticalSection和LeaveCriticalSection都会检查这些变量,并且是以原子的方式来检查。
CRITICAL_SECTION内部的【计数器】成员表示调用线程获准访问共享资源的次数, 如果次数大于0,LeaveCriticalSection会直接返回,不执行其他任何操作,如果计数器编程了0, LeaveCriticalSection会更新成员变量以表示没有任何线程正在访问受保护的资源。 同时会检查有没有其他线程由于调用了EnterCriticalSection而处于等待状态,如果至少有一个线程因此而处于等待状态,那么函数会更新成员变量
LeaveCriticalSection不会使线程变为等待状体,他只会立即返回,
TryEnterCriticalSection也不会使线程进入等待状态,而是立即返回,返回TRUE表示可以访问,返回FALSE表示不能访问。
4.3 关于关键段和旋转锁
当调用EnterCriticalSection使线程等待时,线程必须从【用户模式】切换到【内核模式】,这个切换是代价是很大的,大约需要1000个CPU周期,然而可能在线程从【用户模式】切换到【内核模式】之前,占用资源的线程就已经释放了,因此这样会浪费大量的CPU时间。
为了提高性能,可以让线程从【用户模式】切到到【内核模式】之前先等待一段时间,这段等待时间就使用了【旋转锁】,这个时间不宜过长也不宜过短。
BOOL InitializeCriticalSectionAndSpinCount(
PCRITICAL_SECTION pcs, //CRITICAL_SECTION指针
DWORD dwSpinCount //旋转锁循环次数,4000仅供参考
);
dwSpinCount是个经验值。
在单CPU的机器上,函数会忽略dwSpinCount这个参数,因为没有意义,如果不忽略这个参数,那就有点又想当XX又想XXX的意思了。
4.4 关键段和错误处理
使用关键段的时候可能会发生的错误:
① InitializeCriticalSection,该函数会在内部分配内存用来提供一些内部调试信息, 如果函数失败会抛出STATUS_NO_MEMORY异常。
解决方法:使用InitializeCriticalSectionAndSpinCount代替InitializeCriticalSection,这个函数也会分配内存,如果分配不成功会返回FALSE。
② EnterCriticalSection, 如果有多个线程在同一时刻争夺同一个关键段,则关键段会在内部使用一个【事件内核对象】, 由于争夺现象很少发生,因此只有当第一次要用到事件对象的时候才会真正创建它,当发生争夺关键段并且内存不足的时候可能无法创建该【事件内核对象】,从事EnterCriticalSection会抛出EXCEPTION_INVALID_HANDLE异常。
解决方法1:使用结构化异常处理(SEH)来捕捉错误。
解决方法2:使用InitializeCriticalSectionAndSpinCount来创建关键段,并将dwSpinCount参数的最高位设为1。 当函数看到dwSpinCount最高位为1 的时候会在初始化时就创建一个与关键段关联的【事件内核对象】,如果无法创建就返回FALSE。
概述:只有当很苛刻的条件下才会考虑上述问题。
实际应用来说,最好还是使用InitializeCriticalSectionAndSpinCount来代替InitializeCriticalSection。
另有:【有键事件】, 在关键段的情况中,如果内存少到不足以创建一个【事件内核对象】时, 可以将关键段的地址当作键值来使用。 通过将关键段的地址当作键值来使用,系统可以对试图进入这个关键段的线程进行同步,并且在必要的情况下将它们阻挡在外。
5、 Slim读写锁
SRWLock的目的和关键段相同,只是SRWLock允许我们区分【读者线程】和【写入者线程】,允许多个【读者线程】同一时刻访问共享资源,因为仅仅只读不会破坏数据。
SRWLOCK结构,该结构中只有一个PVOID指针,但指向的东西未公开。
SRWLOCK在windef.h中被定义为RTL_SRWLOCK。
Void InitializeSRWLock(PSRWLOCK SWRLock);
Void AcquireSRWLockExclusive(PSRWLOCK SRWLock);
Void ReleaseSRWLockExclusive(PSRWLOCK SRWLock);
Void AcquireSRWLockShared(PSRWLOCK SRWLock);
Void ReleaseSRWLockShared(PSRWLOCK SRWLock);
/////////////////////////////////////////////////
BOOLEAN TryAcquireSRWLockExclusive(
__inout PSRWLOCK SRWLock
);
BOOLEAN TryAcquireSRWLockShared(
__inout PSRWLOCK SRWLock
);
该函数和TryEnterCirticalSection类似,如果不能访问共享资源就返回FALSE,这个非常有用。
Mutex互斥对象用来同步的性能最差,关键段次之,SRWLock再次之, 然后是Interlcoked API, volatile写入, volatile读取。
6、 条件变量
有时我们想让线程以原子方式把锁释放并将自己阻塞,直到某一个条件成立为止。 要实现这样的线程同步,通过如下两个函数:
BOOL SleepConditionVariableCS(
PCONDITION_VARIABLE pConditionVariable, //条件变量,CONDITION_VARIABLE指针
PCRITICAL_SECTION pCriticalContion, //关键段
DWORD dwMilliseconds //等待时间,ms
);
BOOL SleepConditionVariableSRW(
PCONDITION_VARIABLE pConditionVariable, //条件变量
PSRWLOCK pSRWLock, //RTL_SRWLOCK指针
DWORD dwMilliseconds, //等待时间,ms
ULONG Flags //如果是【读者线程】传入0, 否则传入CONDITION_VARIABLE_LOCKMODE_SHARED
);
CONDITION_VARIABLE必须用下面函数初始化:
VOID WINAPI InitializeConditionVariable(
_Out_ PCONDITION_VARIABLE ConditionVariable
);
当dwMilliseconds指定的时间用完的时候条件变量尚未被触发,函数会返回FALSE, 否则函数返回TRUE。 当函数返回FALSE的时候说明线程没有获得【关键段】或【锁】。
///////////////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "windows.h"
#include "process.h"
#include <iostream>
#include <vector>
using namespace std;
vector<int> g_iVec;
RTL_SRWLOCK g_srwLock;
CONDITION_VARIABLE g_conditionVariable;
#pragma region 生产者线程 和 消费者线程
unsigned int __stdcall ProduceThread(void* p)
{
for (int i = 0; i < 10; ++ i)
{
AcquireSRWLockExclusive(&g_srwLock);
g_iVec.push_back(i);
ReleaseSRWLockExclusive(&g_srwLock);
WakeConditionVariable(&g_conditionVariable);
Sleep(10);
}
return 0;
}
////////////////////////////////////////////////////
unsigned int __stdcall ConsumerThread1(void* p)
{
while(TRUE)
{
AcquireSRWLockExclusive(&g_srwLock);
while (g_iVec.empty())
{
cout<<"\r\n ConsumerThread1, waiting for writing...\r\n";
SleepConditionVariableSRW(&g_conditionVariable, &g_srwLock, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED);
}
cout<<"\r\nConsmer1-----"<<g_iVec.back()<<endl;
g_iVec.pop_back();
ReleaseSRWLockExclusive(&g_srwLock);
}
return 0;
}
unsigned int __stdcall ConsumerThread2(void* p)
{
while (TRUE)
{
AcquireSRWLockExclusive(&g_srwLock);
while (g_iVec.empty())
{
cout<<"\r\n ConsumerThread2, waiting for writing...\r\n";
SleepConditionVariableSRW(&g_conditionVariable, &g_srwLock, INFINITE,
CONDITION_VARIABLE_LOCKMODE_SHARED);
}
cout<<"\r\n Consumer1 ----"<<g_iVec.back()<<endl;
g_iVec.pop_back();
ReleaseSRWLockExclusive(&g_srwLock);
}
return 0;
}
#pragma endregion
int _tmain(int argc, _TCHAR* argv[])
{
InitializeSRWLock(&g_srwLock);
InitializeConditionVariable(&g_conditionVariable);
unsigned int nRetProduce = _beginthreadex(
NULL,
0,
ProduceThread,
NULL,
0,
NULL);
unsigned int nRetConsumer1 = _beginthreadex(
NULL,
0,
ConsumerThread1,
NULL,
0,
NULL);
unsigned int nRetConsumer2 = _beginthreadex(
NULL,
0,
ConsumerThread2,
NULL,
0,
NULL);
if (0 == nRetProduce || 0 == nRetConsumer1 || 0 == nRetConsumer2)
{
cout<<"threads creats failed\r\n";
return -1;
}
HANDLE hProduce = (HANDLE)(nRetProduce);
HANDLE hConsumer1 = (HANDLE)(nRetConsumer1);
HANDLE hConsumer2 = (HANDLE)(nRetConsumer2);
DWORD dwProcudeExitCode = 0;
DWORD dwConmsumer1ExitCode = 0;
DWORD dwConmsumer2ExitCode = 0;
HANDLE hHandles[3] = {hProduce, hConsumer1, hConsumer2};
WaitForMultipleObjects(sizeof(hHandles) / sizeof(hHandles[0]), hHandles, TRUE, INFINITE);
return 0;
}
///////////////////////////////////////////