第八章 用户模式下的线程同步

时间:2022-11-10 23:33:49
//1.
有以下两个基本情况,线程之间需要相互通信:
(A):让多个线程同时访问一个共享资源并不破坏资源的完整性
(B):一个线程需要通知另一个线程某项任务已经完成

//2.
(A):原子访问:指的是一个线程在访问某个资源的时候确保没有其他线程会在同一时刻访问同一资源

(B):
//加减法
LONG WINAPI InterlockedExchangeAdd (__inout LONG volatile *Addend, __in LONG Value);												
FORCEINLINE LONGLONG InterlockedExchange64 (__inout LONGLONG volatile *Target, __in LONGLONG Value);

//交换
FORCEINLINE unsigned long InterlockedExchange(__inout __drv_interlocked unsigned long volatile *Target, __in unsigned long Value);		
FORCEINLINE LONGLONG InterlockedExchange64 (__inout LONGLONG volatile *Target, __in LONGLONG Value);

//比较后修改 *Destination = Comperand 则 *Destination = Exchange;
LONGLONG WINAPI InterlockedCompareExchange64 (__inout LONGLONG volatile *Destination, __in LONGLONG Exchange, __in LONGLONG Comperand);
FORCEINLINE unsigned InterlockedCompareExchange(__inout __drv_interlocked unsigned volatile *Destination, __in unsigned Exchange, __in unsigned Comperand);
PVOID InterlockedCompareExchangePointer (__inout  PVOID volatile *Destination, __in_opt PVOID Exchange, __in_opt PVOID Comperand);	//用于指针比较后修改

//基于 InterlockedCompareExchange64 的 and or xor
InterlockedAnd64
InterlockedOr64
InterlockedXor64

以上函数都会返回原来的值
没有哪一个 Interlocked 系列函数只用于读取一个值,因为没有必要,如果线程只需读取一个值,而这个值始终通过 Interlocked 系列函数修改该的,那么读取到的值不会有任何问题

(C):我们必须确保传给以上函数的变量地址是经过对齐的,否则这些函数可能会失败
void * __cdecl _aligned_malloc(_In_ size_t _Size, _In_ size_t _Alignment);	此函数可以分配一块对齐后的内存,其第二个参数表示对齐字节边界,必须为2的整数次幂
void   __cdecl _aligned_free(_Post_ptr_invalid_ void * _Memory);			对应的释放内存函数

(D):Interlocked 系列函数,执行的极快。调用一次通常只占用几个CPU周期(通常小于50),而且不需要在内核与用户模式之间进行切换(这个切换通常需要耗费1000个以上CPU周期)

(E):Interlocked 单项链表
InitializeSListHead:创建一个空栈
InterlockedPushEntrySList:栈顶添加一个元素
InterlockedPopEntrySList:移除位于栈顶的元素并返回
InterlockedFlushSList:清空栈
QueryDepthSList:返回栈中的元素

https://msdn.microsoft.com/zh-cn/library/windows/desktop/ms683482(v=vs.85).aspx
备注:All list items must be aligned on a MEMORY_ALLOCATION_ALIGNMENT boundary. Unaligned items can cause unpredictable results

void FunExample()
{
	struct STest
	{
		SLIST_ENTRY sListEntry;	//此成员要作为第一个成员,否则取出来的东西是错误的
		char aBuff[10];
		int nValue;
	};

	vector<STest*> vecRecord;
	SLIST_HEADER* pSlistHead = static_cast<SLIST_HEADER*>(_aligned_malloc(sizeof SLIST_HEADER, MEMORY_ALLOCATION_ALIGNMENT));
	InitializeSListHead(pSlistHead);

	for (int i = 0; i < 5; ++i)
	{
		STest* pTest = static_cast<STest*>(_aligned_malloc(sizeof STest, MEMORY_ALLOCATION_ALIGNMENT));
		vecRecord.emplace_back(pTest);
		InterlockedPushEntrySList(pSlistHead, &pTest->sListEntry);
		sprintf_s(pTest->aBuff, "%d", i);
		pTest->nValue = i + 1;
	}

	unsigned short nSize = QueryDepthSList(pSlistHead);	//nSize = 5

	for (int i = 0; i < 4; ++i)
	{
		STest* pTest = reinterpret_cast<STest*>(InterlockedPopEntrySList(pSlistHead));
		printf("%s %d\n", pTest->aBuff, pTest->nValue);
	}

	/*
	输出值:
	4 5
	3 4
	2 3 
	1 2
	*/

	nSize = QueryDepthSList(pSlistHead);	//nSize = 1
	InterlockedFlushSList(pSlistHead);
	nSize = QueryDepthSList(pSlistHead);	//nSize = 0

	while(!vecRecord.empty())
	{
		_aligned_free(vecRecord.back());
		vecRecord.pop_back();
	}

	_aligned_free(pSlistHead);
}

//3.
高速缓存行:
CPU 为了更快的执行代码。于是当从内存中读取数据时,并不是只读自己想要的部分。而是读取足够的字节来填入高速缓存行。
根据不同的 CPU,高速缓存行大小不同。如 X86 是 32BYTES ,而 ALPHA 是 64BYTES 。并且始终在第 32 个字节或第 64 个字节处对齐。
这样,当 CPU 访问相邻的数据时,就不必每次都从内存中读取,提高了速度。 因为访问内存要比访问高速缓存用的时间多得多。
但是,多核发达的年代。情况就不能那么简单了。试想下面这样一个情况。
	1、CPU1 读取了一个字节,以及它和它相邻的字节被读入 CPU1 的高速缓存。
	2、CPU2 做了上面同样的工作。这样 CPU1 , CPU2 的高速缓存拥有同样的数据。
	3、CPU1 修改了那个字节,被修改后,那个字节被放回 CPU1 的高速缓存行。但是该信息并没有被写入 RAM 。
	4、CPU2 访问该字节,但由于 CPU1 并未将数据写入 RAM ,导致了数据不同步。
为了解决这个问题,芯片设计者制定了一个规则。当一个 CPU 修改高速缓存行中的字节时,计算机中的其它 CPU 会被通知,它们的高速缓存将视为无效。
于是,在上面的情况下, CPU2 发现自己的高速缓存中数据已无效, CPU1 将立即把自己的数据写回 RAM ,然后 CPU2 重新读取该数据。 可以看出,高速缓存行在多处理器上会导致一些不利。
从上面的情况可以看出,在设计数据结构的时候,应该尽量将只读数据与读写数据分开,并具尽量将同一时间访问的数据组合在一起。这样 CPU 能一次将需要的数据读入。

GetLogicalProcessorInformation 可以得到系统的高速缓存行大小
__declspec(align(#)) 可以对字段对其加以控制
最好是只让一个线程访问数据或始终让一个CPU访问数据(函数参数与局部变量),这样就能避免高速缓存行的问题了

//4.
关键段:
(A):关键段是一小段代码,它在执行之前需要独占对一些共享资源的访问性
(B):关键段在内部使用 Interlocked 函数,因此执行速度非常快,但是无法用其在多个进程之间对线程进行同步
(C):
CRITICAL_SECTION:利用此结构来进行线程同步
InitializeCriticalSection:初始化 CRITICAL_SECTION 结构
EnterCriticalSection:
	(a):如果没有线程正在访问资源,那么此函数会更新 CRITICAL_SECTION 中的成员变量,以表示调用线程已获取对资源的访问,并立即返回
	(b):如果成员变量表示调用线程已经获准访问资源,那么此函数会更新变量,以表示调用线程被获准访问的次数,当连续调用此函数时就会发生这种情况
	(c):若其他线程已取得资源的访问权,那么此函数会使用一个事件内核对象来把主调线程切换到等待状态
	(d):其实此函数会引发超时, 注册表中默认超时时间大约是 30 天,此值可以通过修改注册表进行修改
LeaveCriticalSection:
	(a):此函数会检查结构内部的成员变量并将计数器减一,该计数器表示调用线程获准访问资源次数
	(b):如果计数器大于0,此函数直接返回,否则其会更新成员变量来指示当前资源没有被任何线程访问,同时检查有无线程因调用 EnterCriticalSection 而陷入等待,若有则会将其切换为可调度状态
TryEnterCriticalSection:
	(a):此函数不会让主调线程进入等待状态,通过返回值来表示当前是否获准访问资源,当其返回 TRUE 时,即表明主调线程已获取了资源访问权,则需要对应调用 LeaveCriticalSection 来释放资源
DeleteCriticalSection:删除 CRITICAL_SECTION 结构,进行资源清理
(D):当线程试图进入一个关键段,但这个关键段正被另一个线程占用的时候,函数会立刻把调用线程切换到等待状态。这就意味着线程必须由用户模式切换到内核模式(大约1000个CPU周期)
	为了提高关键段的性能,可以将旋转锁结合到关键段中
	InitializeCriticalSectionAndSpinCount:其第二个参数可以传入0-0x00FFFFFF之间的任何一个值,代表旋转锁循环次数,在线程切换到等待状态之前,函数会尝试使用旋转锁来获取资源的访问权
	SetCriticalSectionSpinCount:改变旋转锁的旋转次数。用来保护进程堆的关键段的旋转次数大约是4000
(E):InitializeCriticalSection 此函数有可能会失败,其内部有可能会进行内存分配用于内部调试信息,若此过程失败则函数抛出 STATUS_NO_MEMORY 异常。
	EnterCriticalSection 多个线程在同一时刻争夺同一个关键段时,关键段内部会使用一个事件内核对象,只有在第一次需要此内核对象的时候,系统才会真正创建他,这种情况下也会产生异常
	InitializeCriticalSectionAndSpinCount 将第二个参数最高位设为1,会在初始化的时候就创建好事件内核对象,则使用 EnterCriticalSection 时,就绝不会产生异常
	鉴于 InitializeCriticalSection 以及 EnterCriticalSection 在极低情况下会抛出异常的情况下,以及模式切换时浪费的CPU,所以使用 InitializeCriticalSectionAndSpinCount 是很有好处的

//5.
Slim读写锁:
(A):可以区分读线程和写线程,所有读线程在同一时刻可以同时访问资源,当一个写线程占据资源时,任何其他线程无法进行资源访问
(B):首先分配一个 SRWLOCK 结构,并使用 InitializeSRWLock 进行初始化
(C):写入线程可以调用 AcquireSRWLockExclusive 用于取得资源独占权,稍后可以使用 ReleaseSRWLockExclusive 进行解除资源占用
(D):读取线程可以调用 AcquireSRWLockShared 用于取得资源读取权,稍后使用 ReleaseSRWLockShared 进行解除资源占用
(E):不存在用于删除或销毁 SRWLOCK 的函数,系统会自动执行清理工作
(F):不同于关键代码段,对于已经被占用的锁,不能再次获取
(G):Slim读写锁的性能也是相当高的,利用Slim代替关键段,可以提高程序性能

//6.
对于关键段与Slim锁的注意点:
(A):当涉及多个关键段的时候,为了防止线程死锁,我们必须在代码中的任何地方使用完全相同的顺序来进入这几个资源的锁
(B):为了程序的效率与性能,不要长时间占用锁

//7.
不同同步技术性能比较(执行相同任务,一百万次,具体什么任务不重要,只是为了展示性能)
第八章 用户模式下的线程同步

 

//8.
条件变量:
InitializeConditionVariable	Initializes a condition variable.
SleepConditionVariableCS	Sleeps on the specified condition variable and releases the specified critical section as an atomic operation.
SleepConditionVariableSRW	Sleeps on the specified condition variable and releases the specified SRW lock as an atomic operation.
WakeAllConditionVariable	Wakes all threads waiting on the specified condition variable.
WakeConditionVariable		Wakes a single thread waiting on the specified condition variable.

Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs. 
Condition variables are user-mode objects that cannot be shared across processes.
Condition variables enable threads to atomically release a lock and enter the sleeping state. They can be used with critical sections or slim reader/writer (SRW) locks. 
Condition variables support operations that "wake one" or "wake all" waiting threads. After a thread is woken, it re-acquires the lock it released when the thread entered the sleeping state.
Note that the caller must allocate a CONDITION_VARIABLE structure and initialize it by either calling InitializeConditionVariable (to initialize the structure dynamically) 
or assign the constant CONDITION_VARIABLE_INIT to the structure variable (to initialize the structure statically).

Windows Server 2003 and Windows XP:  Condition variables are not supported.

https://msdn.microsoft.com/en-us/library/windows/desktop/ms682052(v=vs.85).aspx

//一个例子:
#include <Windows.h>
#include <process.h>


struct STest 
{
public:
	STest()
	{
		InitializeCriticalSection(&cs);
		InitializeConditionVariable(&cv);
	}

	~STest()
	{
		DeleteCriticalSection(&cs);
	}

public:
	CRITICAL_SECTION cs;
	CONDITION_VARIABLE cv;
};


unsigned int __stdcall ThdFun(void* pVoid)
{
	STest* pTest = static_cast<STest*>(pVoid);
	while(true)
	{
		EnterCriticalSection(&pTest->cs);
		if (SleepConditionVariableCS(&pTest->cv, &pTest->cs, INFINITE))
		{
			//此处条件满足,可以对占用的资源干点事情
		}
		LeaveCriticalSection(&pTest->cs);
	}
}


int _tmain(int argc, _TCHAR* argv[])
{
	STest Test;

	CloseHandle(reinterpret_cast<HANDLE>(_beginthreadex(nullptr, 0, ThdFun, &Test, 0, nullptr)));
	CloseHandle(reinterpret_cast<HANDLE>(_beginthreadex(nullptr, 0, ThdFun, &Test, 0, nullptr)));

	Sleep(1000);

	while (true)
	{
		EnterCriticalSection(&Test.cs);
		//可以在这里做点什么事情来达成线程工作的条件
		LeaveCriticalSection(&Test.cs);

		//此处,已经干了点事情,导致线程可以开始工作
		WakeConditionVariable(&Test.cv);

		Sleep(1000);
	}

	system("pause");
	return 0;
}