从零开始构建一个Reactor模式的网络库(一) 线程同步Mutex和Condition

时间:2022-09-09 15:02:32

最近在学习陈硕大神的muduo库,感觉写的很专业,以及有一些比较“高级”的技巧和设计方式,自己写会比较困难。

于是打算自己写一个简化版本的Reactor模式网络库,就取名叫mini吧,同样只基于Linux平台,不使用boost库,去掉一些比较复杂的部分,只实现比较基本的功能。

写作的过程中,参考了https://github.com/chenshuo/muduo(原始版本的实现),以及https://github.com/AlexStocks/muduo(去掉boost库的依赖,改用C++11)     

就先从用于线程同步的互斥锁和条件变量的封装开始吧,基础部分还会包括一个很简单的日志类、线程封装和简单的线程池。

 

Linux环境下线程同步的方式有很多,互斥锁、读写锁、自旋锁、条件变量、屏障等都可以作为同步的方式,muduo库使用的是互斥锁+条件变量的方式,原因也很简单,就是简单易用,同时也不失高效性。

为了通用性,使用的都是POSIX的同步原语以及线程实现。

首先是对互斥量的封装:

 1 #ifndef MUTEX_H
 2 #define MUTEX_H
 3 #include <pthread.h>
 4 namespace mini
 5 {
 6 //used as class member
 7 class MutexLock
 8 {
 9 public:
10     MutexLock()
11     {
12         pthread_mutex_init(&mutex_,NULL);
13     }
14     ~MutexLock()
15     {
16         pthread_mutex_destroy(&mutex_);
17     }
18     void lock()
19     {
20         pthread_mutex_lock(&mutex_);
21     }
22     void unlock()
23     {
24         pthread_mutex_unlock(&mutex_);
25     }
26 
27     pthread_mutex_t* getPthreadMutex()
28     {
29         return &mutex_;
30     }
31 
32 private:
33     pthread_mutex_t mutex_;
34 };
35 //used as RAII obj
36 class MutexLockGuard
37 {
38 public:
39     MutexLockGuard(MutexLock& mutex)
40         :mutex_(mutex)
41     {
42         mutex_.lock();
43     }
44     ~MutexLockGuard()
45     {
46         mutex_.unlock();
47     }
48 
49 private:
50     MutexLock& mutex_;
51 };
52 }
53 
54 #endif // MUTEX_H

MutexLock是对pthread_mutex的简单封装,包括初始化、加锁、解锁以及销毁,主要用作类的成员变量(比如Condition类、ThreadPool类等)。

MutexLockGuard是一个RAII类,构造时自动加锁,析构时自动解锁,一般用在整个过程都需要加锁的块内(比如一个作用于临界区的函数),可以避免忘记解锁引起的死锁。

然后是对于条件变量的封装:

 1 #ifndef CONDITION_H
 2 #define CONDITION_H
 3 #include "Mutex.h"
 4 #include <pthread.h>
 5 namespace mini
 6 {
 7 class Condition{
 8 public:
 9     Condition(MutexLock& mutex)
10         :mutex_(mutex)
11     {
12         pthread_cond_init(&cond_,NULL);
13     }
14 
15     ~Condition()
16     {
17         pthread_cond_destroy(&cond_);
18     }
19 
20     void wait()
21     {
22         pthread_cond_wait(&cond_,mutex_.getPthreadMutex());
23     }
24 
25     void notify()
26     {
27         pthread_cond_signal(&cond_);
28     }
29 
30     void notifyAll()
31     {
32         pthread_cond_broadcast(&cond_);
33     }
34 
35 private:
36     MutexLock& mutex_;//reference, not hold
37     pthread_cond_t cond_;
38 };
39 }
40 
41 #endif // CONDITION_H

Condition类是对pthread_cond的封装,因为条件变量本来就要与mutex配合使用,故而持有一个MutexLock的引用,主要操作是lock()、notify()和notifyAll()。

下面通过一个简单的例子来看看这两个类的使用:

 1 #include <iostream>
 2 #include "Condition.h"
 3 #include <unistd.h>
 4 
 5 using namespace std;
 6 using namespace mini;
 7 
 8 mini::MutexLock mutex;
 9 mini::Condition cond(mutex);
10 
11 int count=0;
12 
13 
14 void* threadFuncAdd(void*)
15 {
16     sleep(1);
17     cout<<"ThreadAdd run!"<<endl;
18     while(count<=1000)
19         count++;
20     cout<<"ThreadAdd finish!"<<endl;
21     cond.notify();
22 }
23 void* threadFuncPrint(void*)
24 {
25     mutex.lock();
26     while(count<=1000)
27     {
28         cout<<"ThreadPrint wait!"<<endl;
29         cond.wait();
30     }
31 
32     cout<<"ThreadPrint wake up!"<<endl;
33 }
34 
35 
36 int main()
37 {
38     //count=0;
39     pthread_t p1,p2;
40     pthread_create(&p1,NULL,threadFuncAdd,NULL);
41     pthread_create(&p2,NULL,threadFuncPrint,NULL);
42 
43     sleep(2);
44 
45     return 0;
46 }

 

 

简单来说,主线程创建了两个线程,一个用来对count进行自加,一个用来等待count值到达100并输出一句话。为了确保print线程会先等待条件,让add线程睡眠了1s。

从零开始构建一个Reactor模式的网络库(一) 线程同步Mutex和Condition

结果与预期一致,threadprint先进入等待状态,然后threadadd开始执行,并在count自加到1000后,notify() threadprint,threadprint被唤醒。