windows操作系统同步

时间:2022-01-19 21:58:53

一:Critiacal_Section

1:使用临界区的目的是确保资源每次只能被一个线程所使用。一个线程进入某个临界区,另一个线程就不能够再进入同一个临界区。临界区不是核心对象,它只存在进程的内存空间。没有所谓的句柄,只能在同一进程中的线程间完成同步。

2:使用函数
    VOID InitializeCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
    VOID DeleteCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
    VOID EnterCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
    VOID LeaveCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
操作顺序:
    //开始
    CRITICAL_SECTION cs;
    InitializeCriticalSection(&cs);
    //线程1:
    EnterCriticalSection(&cs);
    LeaveCriticalSection(&cs);
    //线程2:
    EnterCriticalSection(&cs);
    LeaveCriticalSection(&cs);
    //最后:
    DeleteCriticalSection(&cs);

3:封装成类:两个线程对同一数据一读一写,因此需要让它们在这里互斥,不能同时访问。
    class InstanceLock;
    class InstanceLockBase
    {
        friend class InstanceLock;//可以正确的访问私有函数
        CRITICAL_SECTION cs;
        void Lock()//私有
        {
            ::EnterCriticalSection(&cs);//加锁
        }
        void Unlock()//私有
        {
            ::LeaveCriticalSection(&cs);//解锁
        }
    protected://继承
        InstanceLockBase()
        {
            ::InitializeCriticalSection(&cs);//构建时初始化
        }
        ~InstanceLockBase()
        {
            ::DeleteCriticalSection(&cs);//析构时释放
        }
    };

    //为了保证Lock和Unlock能成对调用,
    //C++对于构造函数和析构函数的调用是自动成对的,把对Lock和Unlock的调用专门写在一个类的构造函数和析构函数中
    class InstanceLock
    {
        InstanceLockBase* _pObj;
    public:
        InstanceLock(InstanceLockBase* pObj)
        {
            _pObj = pObj;
            if(NULL != _pObj)
                _pObj->Lock();
        }
        ~InstanceLock()
        {
            if(NULL != _pObj)
                _pObj->Unlock();
        }
    };

    void Say(char* text)
    {
        static int count = 0;
        SYSTEMTIME st;
        ::GetLocalTime(&st);
        printf("%03d [%02d:%02d:%02d.%03d]%s /n", ++count, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, text);
    }

    //1 让被锁类从InstanceLockBase继承
    //2 所有要访问被锁对象的代码前面声明InstanceLock的实例,并传入被锁对象的指针。
    class MyClass: public InstanceLockBase{};
    MyClass mc;

    //将InstanceLockBase中protected改称public后,也可以直接用基类
    //InstanceLockBase mc;

    DWORD CALLBACK ThreadProc(LPVOID param)
    {
        InstanceLock il(&mc);
        Say("in sub thread, lock");
        Sleep(2000);
        Say("in sub thread, unlock");
        return 0;
    }

    int _tmain(int argc, _TCHAR* argv[])
    {
        CreateThread(0, 0, ThreadProc, 0, 0, 0);
        {
            InstanceLock il(&mc);
            Say("in main thread, lock");
            Sleep(3000);
            Say("in main thread, lock");
        }

        Sleep(5000);

        return 0;
    }

4:另一种封装方法
    class InstanceLockBase
    {
        CRITICAL_SECTION cs;
    public:
        InstanceLockBase()
        {
            InitializeCriticalSection(&cs);
        }

        ~InstanceLockBase()
        {
            DeleteCriticalSection(&cs);
        }

        void Lock()
        {
            EnterCriticalSection(&cs);
        }

        void Unlock()
        {
            LeaveCriticalSection(&cs);       
        }
    };


    template <typename LockType>
    class AutoLock
    {
        LockType& lock;
    public:
        AutoLock(LockType& _lock) : lock(_lock)
        {
            lock.Lock();
        }

        ~AutoLock()
        {
            lock.Unlock();
        }
    };
    #endif
    //使用方法
    InstanceLockBase lock;
    AutoLock<InstanceLockBase> tmplock(lock);

    //将InstanceLockBase中protected改称public后,也可以直接用基类
    InstanceLockBase lock;
    InstanceLock il(&lock);

二:Mutex

1:和临界区(Critical Section)的区别
1)锁住一个未被拥有的mutex,比锁住一个未被拥有的critical section,所需花费几乎100倍的时间。因为critical section不需要进入操作系统核心。
2)互斥器可以跨进程使用(此时应指定名称。未命名的互斥器只能在同一进程内使用)。临界区只能够在同一进程内使用。
3)等待一个互斥器,可以指定“结束等待”的时间。临界区不可以。

2:使用函数
    HANDLE CreateMutex();
    HANDLE OpenMutex();
    DWORD WaitForSingleObject();
    DWORD WaitForMultipleObjects();
    DWORD MsgWaitForMultipleObjects();
    BOOL ReleaseMutex();
    BOOL CloseHandle();

    HANDLE CreateMutex(LPSECURITY_ATTRIBUTES lpMutexAttributes, // 指向安全属性的指针
                       BOOL bInitialOwner, // 初始化互斥对象的所有者
                         LPCTSTR lpName); // 指向互斥对象名的指针
注意:一旦不再需要,注意必须用CloseHandle函数将互斥体句柄关闭。从属于它的所有句柄都被关闭后,就会删除对象。
     根据lpName,系统中的任何线程都可以使用这个名称来处理该Mutex,Mutex名称对整个系统而言是全局的。

    HANDLE WINAPI OpenMutex(DWORD dwDesiredAccess,    //打开一个已经存在的Mutex。
                            BOOL bInheritHandle,
                            LPCTSTR lpName);


3:下列说明有两个线程需要操作资源,但是一个时刻只能有一个线程操作该资源
    #define THREADCOUNT 2

    HANDLE ghMutex;

    DWORD WINAPI WriteToDatabase( LPVOID );

    void main()
    {
        HANDLE aThread[THREADCOUNT];
        DWORD ThreadID;
        int i;

        ghMutex = CreateMutex( NULL, FALSE, NULL);
        if (ghMutex == NULL)
        {
            printf("CreateMutex error: %d/n", GetLastError());
            return;
        }

        for( i=0; i < THREADCOUNT; i++ )
        {
            aThread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) WriteToDatabase, NULL, 0, &ThreadID);
            if( aThread[i] == NULL )
            {
                printf("CreateThread error: %d/n", GetLastError());
                return;
            }
        }

        WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

        for( i=0; i < THREADCOUNT; i++ )
            CloseHandle(aThread[i]);

        CloseHandle(ghMutex);
    }

    DWORD WINAPI WriteToDatabase( LPVOID lpParam )
    {
        DWORD dwCount=0, dwWaitResult;

        while( dwCount < 20 )
        {
            //线程间只有一个能使用该信号量
            dwWaitResult = WaitForSingleObject(ghMutex,INFINITE);

            switch (dwWaitResult)
            {
            case WAIT_OBJECT_0:
                __try
                {
                    printf("Thread %d writing to database.../n", GetCurrentThreadId());
                    dwCount++;
                }

                __finally
                {
                    if (! ReleaseMutex(ghMutex))
                    {
                    }
                }
                break;
            case WAIT_ABANDONED:
                return FALSE;
            }
        }
        return TRUE;
    }

3:多个线程访问同一数据,一部分是读,一部分是写。我们知道只有读-写或写-写同时进行时可能会出现问题,而读-读则可以同时进行,因为它们不会对数据进行修改,所以也有必要在C++中封装一种方便的允许读-读并发、读-写与写-写互斥的锁。要实现这种锁,使用临界区就很困难了,不如改用内核对象,这里我使用的是互斥量(Mutex)
    class RWLock;
    class _RWLockBase
    {
        friend class RWLock;
    protected:
        virtual DWORD ReadLock(int timeout) = 0;
        virtual void ReadUnlock(int handleIndex) = 0;
        virtual DWORD WriteLock(int timeout) = 0;
        virtual void WriteUnlock() = 0;
    };

    template <int maxReadCount = 3>        //这里给一个缺省参数,尽量减少客户端代码量
    class RWLockBase: public _RWLockBase
    {
        //二是为了允许读-读并发,这里只声明一个Mutex是不够的,必须要声明多个Mutex,而且有多少个Mutex就同时允许多少个读线程并发
        HANDLE handles[maxReadCount];
        DWORD ReadLock(int timeout)   //加读锁,只要等到一个互斥量返回即可
        {
            return ::WaitForMultipleObjects(maxReadCount, handles, FALSE, timeout);
        }
        void ReadUnlock(int handleIndex) //解读锁,释放已获得的互斥量
        {
            ::ReleaseMutex(handles[handleIndex]);
        }
        DWORD WriteLock(int timeout)   //加写锁,等到所有互斥量,从而与其他所有线程互斥
        {
            return ::WaitForMultipleObjects(maxReadCount, handles, TRUE, timeout);
        }
        void WriteUnlock()                      //解写锁,释放所有的互斥量
        {
            for(int i = 0; i < maxReadCount; i++)
                ::ReleaseMutex(handles[i]);
        }
    protected:
        RWLockBase()                            //构造函数,初始化每个互斥量
        {
            for(int i = 0; i < maxReadCount; i++)
                handles[i] = ::CreateMutex(0, FALSE, 0);
        }
        ~RWLockBase()                          //析构函数,销毁对象
        {
            for(int i = 0; i < maxReadCount; i++)
                ::CloseHandle(handles[i]);
        }
    };

    class RWLock
    {
        bool lockSuccess;           //因为有可能超时,需要保存是否等待成功
        int readLockHandleIndex;    //对于读锁,需要知道获得的是哪个互斥量
        _RWLockBase* _pObj;         //目标对象基类指针
    public:
        //这里通过第二个参数决定是加读锁还是写锁,第三个参数为超时的时间
        RWLock(_RWLockBase* pObj, bool readLock = true, int timeout = 3000)
        {
            _pObj = pObj;
            lockSuccess = FALSE;
            readLockHandleIndex = -1;
            if(NULL == _pObj)
                return;

            if(readLock)          //读锁
            {
                DWORD retval = _pObj->ReadLock(timeout);
                if(retval < WAIT_ABANDONED) //返回值小于WAIT_ABANDONED表示成功
                {                                               //其值减WAIT_OBJECT_0就是数组下标
                    readLockHandleIndex = retval - WAIT_OBJECT_0;
                    lockSuccess = TRUE;
                }
            }
            else
            {
                DWORD retval = _pObj->WriteLock(timeout);
                if(retval < WAIT_ABANDONED) //写锁时获得了所有互斥量,无需保存下标
                    lockSuccess = TRUE;
            }
        }
        ~RWLock()
        {
            if(NULL == _pObj)
                return;
            if(readLockHandleIndex > -1)
                _pObj->ReadUnlock(readLockHandleIndex);
            else
                _pObj->WriteUnlock();
        }
        bool IsLockSuccess() const { return lockSuccess; }
    };

    class MyClass2: public RWLockBase<>
    {};
    MyClass2 mc2;

    void Say(char* text, int index)
    {
        static int count = 0;
        SYSTEMTIME st;
        ::GetLocalTime(&st);
        printf("%03d [%02d:%02d:%02d.%03d]%s %d/n", ++count, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, text, index);
    }

    //读线程
    DWORD CALLBACK ReadThreadProc(LPVOID param)
    {
        int i = (int)param;
        RWLock lock(&mc2);          //加读锁
        if(lock.IsLockSuccess())             //如果加锁成功
        {
            Say("read thread %d started", i);   //为了代码短一些,假设Say函数有这种能力
            Sleep(1000);
            Say("read thread %d ended", i);
        }
        else                                     //加锁超时,则显示超时信息
        {
            Say("read thread %d timeout", i);
        }
        return 0;
    }

    //写线程
    DWORD CALLBACK WriteThreadProc(LPVOID param)
    {
        int i = (int)param;
        RWLock lock(&mc2, false); //加写锁。
        if(lock.IsLockSuccess())
        {
            Say("write thread %d started", i);
            Sleep(600);
            Say("write thread %d ended", i);
        }
        else
        {
            Say("write thread %d timeout", i);
        }
        return 0;
    }


    int _tmain(int argc, _TCHAR* argv[])
    {
        int i;
        for(i = 0; i < 5; i++)
            ::CreateThread(0, 0, ReadThreadProc, (LPVOID)i, 0, 0);
        for(i = 0; i < 5; i++)
            ::CreateThread(0, 0, WriteThreadProc, (LPVOID)i, 0, 0);

        Sleep(10000);

        return 0;
    }

三:Seamphore

1:Semaphore是一件可以容纳N人的房间,如果人不满就可以进去,如果人满了,就要等待有人出来。对于N=1的情况,称为binarysemaphore。一般的用法是,用于限制对于某一资源的同时访问。而Mutex是一把钥匙,一个人拿了就可进入一个房间,出来的时候把钥匙交给队列的第一个。一般的用法是用于串行化对critical section代码的访问,保证这段代码不会被并行的运行。
和Mutex的区别
1)信号量没有所谓的”wait abandoned”状态可被其它线程侦测到;
2)拥有互斥器的线程无论再调用多少次wait…()函数都不会被阻塞。但对于信号量,如果锁定成功也不会收到信号量的拥有权——因为同时可以多个线程同时锁定一个信号量。信号量没有“独占锁定”这种事情,也没有所有权的观念,一个线程可以反复调用wait…()函数产生新的锁定,每锁定一次,信号量的现值就减1。
3)与互斥器不同,调用ReleaseSemaphore()的那个线程,并不一定就得是调用Wait…()函数的那个线程。任何线程都可以在任何时间调用ReleaseSemaphore(),解除被任何线程锁定的信号量。

2:常用函数
    HANDLE CreateSemaphore();
    HANDLE OpenSemaphore();
    DWORD WaitForSingleObject();
    DWORD WaitForMultipleObjects();
    DWORD MsgWaitForMultipleObjects();
    BOOL ReleaseSemaphore();
    BOOL CloseHandle();

    HANDLE CreateSemaphore(LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, //CE不支持
                           LONG lInitialCount, //信号量初始化计数值
                           LONG lMaximumCount, //信号量计数最大值,统一时间能锁定Seamphore之线程的最大个数。
                           LPCTSTR lpName); //信号量对象名称
                          
    BOOL ReleaseSemaphore(HANDLE hSemaphore, //信号量句柄
                          LONG lReleaseCount, //信号量计数增加的值
                          LPLONG lpPreviousCount); //输出量,表示上一次信号量计数
                     
3:下列可以理解为有12个人需要使用10个房间 ,当时一个房间同一时刻只能被一个人使用。
    #define MAX_SEM_COUNT 10
    #define THREADCOUNT 12

    HANDLE ghSemaphore;

    DWORD WINAPI ThreadProc( LPVOID );

    void main()
    {
        HANDLE aThread[THREADCOUNT];
        DWORD ThreadID;
        int i;

        ghSemaphore = CreateSemaphore( NULL, MAX_SEM_COUNT, MAX_SEM_COUNT, NULL);
        if (ghSemaphore == NULL)
        {
            printf("CreateSemaphore error: %d/n", GetLastError());
            return;
        }

        for( i=0; i < THREADCOUNT; i++ )
        {
            aThread[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) ThreadProc, NULL, 0, &ThreadID);
            if( aThread[i] == NULL )
            {
                printf("CreateThread error: %d/n", GetLastError());
                return;
            }
        }

        WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
        for( i=0; i < THREADCOUNT; i++ )
            CloseHandle(aThread[i]);

        CloseHandle(ghSemaphore);
    }

    DWORD WINAPI ThreadProc( LPVOID lpParam )
    {
        DWORD dwWaitResult;
        BOOL bContinue=TRUE;

        while(bContinue)
        {
            dwWaitResult = WaitForSingleObject( ghSemaphore, 0L);

            switch (dwWaitResult)
            {
            case WAIT_OBJECT_0:
                printf("Thread %d: wait succeeded/n", GetCurrentThreadId());
                bContinue=FALSE;           
                Sleep(5);
                if (!ReleaseSemaphore(ghSemaphore,  1, NULL) ) 
                {
                    printf("ReleaseSemaphore error: %d/n", GetLastError());
                }
                break;
            case WAIT_TIMEOUT:
                printf("Thread %d: wait timed out/n", GetCurrentThreadId());
                break;
            }
        }
        return TRUE;
    }

4:对列Queue的同步访问。即生产者消费者概念。
    class Queue
    {
    public:
        Queue();
        ~Queue();
        void push(const int* t, size_t count);
        int pop();
    private:
        HANDLE semaphore;
        queue<int> queue;
    };

    //为了保证能顺利释放信号变量,初始队列中没有产品
    Queue:Queue() : semaphore(NULL)
      {
          semaphore = CreateSemaphore(NULL,0,1024*1024*10,NULL);   
      }

      Queue::~Queue()
      {
          if (semaphore)
              CloseHandle(semaphore);   
      }
      //放入了Count了个产品后,Seamphore信号量重置值
      void Queue::push(const int* t, size_t count)
      {
          for (size_t i=0; i<count; i++)
          {
              queue_.push(t[i]);
          }       
          ReleaseSemaphore(semaphore_, count, NULL);
      }

      int Queue::pop()
      {
          WaitForSingleObject(semaphore_, INFINITE);   
          return queue_.pop();   
      }

四:Event

1:
如果一个事件是自动事件,那么当它处于激发状态时,可唤醒一个等待它的线程,线程被唤醒后,自动地转入非激发状态;
如果一个事件是手动事件,那么当它处于激发状态时,可唤醒所有等待它的线程,并且一直保持状态为激发状态,直到被明确地ResetEvent()后,才转入非激发状态。

2:常用函数
    HANDLE CreateEvent();
    HANDLE OpenEvent();
    BOOL SetEvent();
    BOOL PluseEvent();
    BOOL ResetEvent();
    DWORD WaitForSingleObject();
    DWORD WaitForMultipleObjects();
    DWORD MsgWaitForMultipleObjects();
    BOOL CloseHandle();
    HANDLE CreateEvent(LPSECURITY_ATTRIBUTES lpEventAttributes,   // 安全属性
                       BOOL bManualReset,   // 复位方式
                       BOOL bInitialState,   // 初始状态
                       LPCTSTR lpName);   // 对象名称

    bManualReset:指定将事件对象创建成手动复原还是自动复原。如果是TRUE,那么必须用ResetEvent函数来手工将事件的状态复原到无信号状态。如果设置为FALSE,当事件被一个等待线程释放以后,系统将会自动将事件状态复原为无信号状态。
    bInitialState:指定事件对象的初始状态。如果为TRUE,初始状态为有信号状态;否则为无信号状态。
    lpName:指定事件的对象的名称,任何线程或进程都可以根据这个名称使用这一Event对象。

3:以下通过事件控制当写时,四个读线程不可操作队列。
    #define THREADCOUNT 4

    HANDLE ghWriteEvent;
    HANDLE ghThreads[THREADCOUNT];

    DWORD WINAPI ThreadProc(LPVOID);

    void CreateEventsAndThreads(void)
    {
      int i;
      DWORD dwThreadID;
      //自动重置事件
      ghWriteEvent = CreateEvent(NULL,TRUE,FALSE,TEXT("WriteEvent"));
      if (ghWriteEvent == NULL)
      {
          printf("CreateEvent failed (%d)/n", GetLastError());
          return;
      }

      for(i = 0; i < THREADCOUNT; i++)
      {
          ghThreads[i] = CreateThread(NULL,  0,   ThreadProc, NULL, 0, &dwThreadID);
          if (ghThreads[i] == NULL)
          {
              printf("CreateThread failed (%d)/n", GetLastError());
              return;
          }
      }
    }

    void WriteToBuffer(VOID)
    {
      printf("Main thread writing to the shared buffer.../n");
      if (! SetEvent(ghWriteEvent) )
      {
          printf("SetEvent failed (%d)/n", GetLastError());
          return;
      }
    }

    void CloseEvents()
    {
      CloseHandle(ghWriteEvent);
    }

    void main()
    {
      DWORD dwWaitResult;

      CreateEventsAndThreads();
      //开始写数据,写完后激活事件
      WriteToBuffer();

      printf("Main thread waiting for threads to exit.../n");

      //四个线程都在等待激活事件
      dwWaitResult = WaitForMultipleObjects(THREADCOUNT,   ghThreads,  TRUE, INFINITE);

      switch (dwWaitResult)
      {
      case WAIT_OBJECT_0:
          printf("All threads ended, cleaning up for application exit.../n");
          break;

      default:
          printf("WaitForMultipleObjects failed (%d)/n", GetLastError());
          return;
      }

      // Close the events to clean up

      CloseEvents();
    }

    DWORD WINAPI ThreadProc(LPVOID lpParam)
    {
      DWORD dwWaitResult;

      printf("Thread %d waiting for write event.../n", GetCurrentThreadId());

      dwWaitResult = WaitForSingleObject(  ghWriteEvent, INFINITE);  

      switch (dwWaitResult)
      {
      case WAIT_OBJECT_0:
          printf("Thread %d reading from buffer/n",GetCurrentThreadId());
          break;

      default:
          printf("Wait error (%d)/n", GetLastError());
          return 0;
      }

      printf("Thread %d exiting/n", GetCurrentThreadId());
      return 1;
    }

五:Interlocked Variables
interlocked函数没有“等待”机能,它只是保证对某个特定的变量的存取是排他性的、原子性的。这相当于给某个变量设置了一个临界区或互斥量。

1:常用函数
    LONG InterlockedIncrement(LPLONG lpAddend);     //(*lpAddend)++;
    LONG InterlockedDecrement(LPLONG lpAddend);    //(*lpAddend)--;
    LONG InterlockedExchangeAdd(LPLONG Addend,LONG Increment);     //(*lpAddend) += Increment;
    LONG InterlockedExchange(LPLONG Target,LONG Value);     //(*lpAddend) = Value;
    PVOID InterlockedCompareExchange(PVOID *Destination,PVOIDExchange,PVOID Comperand);//if (*Destination == Comperand) *Destination= Exchange;

六:总结
1:Critical Section
    Critical section(临界区)用来实现“排他性占有”。适用范围是单一进程的各线程。它是:
    •    一个局部对象,不是核心对象;
    •    快速而有效率;
    •    不能够同时有一个以上的临界区被等待,否则容易陷入死锁;
    •    无法侦测是否被某个线程放弃。

2:Mutex
    Mutex是一个核心对象,可以在不同的线程之间实现“排他性占有”,甚至那些线程分属不同的进程。它是:
    •    一个核心对象;
    •    如果mutex拥有的那个线程结束,则会产生一个”abandoned”错误信息;
    •    可以使用wait…()函数等待一个mutex;
    •    可以具名,因此可以被其它进程开启;
    •    只能被拥有它的线程释放。

3:Semaphore
    Semaphore用来追踪有限的资源。它是:
    •    一个核心对象;
    •    没有拥有者;
    •    可以具名,因此可以被其它进程开启;
    •    可以被任何一个线程释放。

4:Event Object
    Event object通常用于overlapped I/O,或用来设计某些自定义的同步对象。它是:
    •    一个核心对象;
    •    完全在程序掌控之下;
    •    适用于设计新的同步对象;
    •    “要求苏醒”的请求并不会被存储起来,可能会遗失掉;
    •    可以具名,因此可以被其它进程开启;

5:Interlocked Variable
    Interlocked variable主要用于应用计数,不使用临界区或互斥器之类,对对4字节的数值操作有些基本的同步。