从零开始构建一个Reactor模式的网络库(一) 线程同步Mutex和Condition

时间:2023-03-09 04:01:38
从零开始构建一个Reactor模式的网络库(一) 线程同步Mutex和Condition

最近在学习陈硕大神的muduo库,感觉写的很专业,以及有一些比较“高级”的技巧和设计方式,自己写会比较困难。

于是打算自己写一个简化版本的Reactor模式网络库,就取名叫mini吧,同样只基于Linux平台,不使用boost库,去掉一些比较复杂的部分,只实现比较基本的功能。

写作的过程中,参考了https://github.com/chenshuo/muduo(原始版本的实现),以及https://github.com/AlexStocks/muduo(去掉boost库的依赖,改用C++11)

就先从用于线程同步的互斥锁和条件变量的封装开始吧,基础部分还会包括一个很简单的日志类、线程封装和简单的线程池。

Linux环境下线程同步的方式有很多,互斥锁、读写锁、自旋锁、条件变量、屏障等都可以作为同步的方式,muduo库使用的是互斥锁+条件变量的方式,原因也很简单,就是简单易用,同时也不失高效性。

为了通用性,使用的都是POSIX的同步原语以及线程实现。

首先是对互斥量的封装:

 #ifndef MUTEX_H
#define MUTEX_H
#include <pthread.h>
namespace mini
{
//used as class member
class MutexLock
{
public:
MutexLock()
{
pthread_mutex_init(&mutex_,NULL);
}
~MutexLock()
{
pthread_mutex_destroy(&mutex_);
}
void lock()
{
pthread_mutex_lock(&mutex_);
}
void unlock()
{
pthread_mutex_unlock(&mutex_);
} pthread_mutex_t* getPthreadMutex()
{
return &mutex_;
} private:
pthread_mutex_t mutex_;
};
//used as RAII obj
class MutexLockGuard
{
public:
MutexLockGuard(MutexLock& mutex)
:mutex_(mutex)
{
mutex_.lock();
}
~MutexLockGuard()
{
mutex_.unlock();
} private:
MutexLock& mutex_;
};
} #endif // MUTEX_H

MutexLock是对pthread_mutex的简单封装,包括初始化、加锁、解锁以及销毁,主要用作类的成员变量(比如Condition类、ThreadPool类等)。

MutexLockGuard是一个RAII类,构造时自动加锁,析构时自动解锁,一般用在整个过程都需要加锁的块内(比如一个作用于临界区的函数),可以避免忘记解锁引起的死锁。

然后是对于条件变量的封装:

 #ifndef CONDITION_H
#define CONDITION_H
#include "Mutex.h"
#include <pthread.h>
namespace mini
{
class Condition{
public:
Condition(MutexLock& mutex)
:mutex_(mutex)
{
pthread_cond_init(&cond_,NULL);
} ~Condition()
{
pthread_cond_destroy(&cond_);
} void wait()
{
pthread_cond_wait(&cond_,mutex_.getPthreadMutex());
} void notify()
{
pthread_cond_signal(&cond_);
} void notifyAll()
{
pthread_cond_broadcast(&cond_);
} private:
MutexLock& mutex_;//reference, not hold
pthread_cond_t cond_;
};
} #endif // CONDITION_H

Condition类是对pthread_cond的封装,因为条件变量本来就要与mutex配合使用,故而持有一个MutexLock的引用,主要操作是lock()、notify()和notifyAll()。

下面通过一个简单的例子来看看这两个类的使用:

 #include <iostream>
#include "Condition.h"
#include <unistd.h> using namespace std;
using namespace mini; mini::MutexLock mutex;
mini::Condition cond(mutex); int count=; void* threadFuncAdd(void*)
{
sleep();
cout<<"ThreadAdd run!"<<endl;
while(count<=)
count++;
cout<<"ThreadAdd finish!"<<endl;
cond.notify();
}
void* threadFuncPrint(void*)
{
mutex.lock();
while(count<=)
{
cout<<"ThreadPrint wait!"<<endl;
cond.wait();
} cout<<"ThreadPrint wake up!"<<endl;
} int main()
{
//count=0;
pthread_t p1,p2;
pthread_create(&p1,NULL,threadFuncAdd,NULL);
pthread_create(&p2,NULL,threadFuncPrint,NULL); sleep(); return ;
}

简单来说,主线程创建了两个线程,一个用来对count进行自加,一个用来等待count值到达100并输出一句话。为了确保print线程会先等待条件,让add线程睡眠了1s。

从零开始构建一个Reactor模式的网络库(一) 线程同步Mutex和Condition

结果与预期一致,threadprint先进入等待状态,然后threadadd开始执行,并在count自加到1000后,notify() threadprint,threadprint被唤醒。