基于Boost的数据处理器及线程安全队列、跨平台的信号量和互斥锁

时间:2021-04-05 23:30:04
近半年主要是开发公司行情系统Feedhandler(一共十几个Feedhandler,包括沪深L1、L2,港股,国内期货,国际股票,国际期货等。)。此系统要求跨平台、大吞吐量,超低延迟,属于CPU密集型系统。在项目过程中,有几个比较好的封装类,跟大家一起分享一下。以下所有源代码可至 http://download.csdn.net/detail/great3779/3998262 下载
一。基于Boost的跨平台锁。封装了Boost的mutex,提供了lock和unlock方法。代码示例如下: 
#pragma once

// Author: Huangzhidan | great3779@sina.com
// copyright: @Wind Information Co., Ltd (Wind Info) ShangHai
// Create time: 2011-09-10
// 封装了boost的mutex,能跨平台使用。

#include <boost/thread.hpp>

class CWnLock
{
public:
CWnLock(void) {}
~CWnLock(void) {}
public:
virtual void lock() {m_lock.lock();}
virtual void unlock() {m_lock.unlock();}
protected:
boost::mutex m_lock;
};

二。基于Boost的自动锁。相对之前的互斥锁,自动锁在发生异常时,能自动解锁,避免发生异常后,程序死锁。代码示例如下:
// WnScopedLock.h
#pragma once

// Author: Huangzhidan | great3779@sina.com
// copyright: @Wind Information Co., Ltd (Wind Info) ShangHai
// Create time: 2011-09-10
// 封装了boost的mutex的scoped_lock,能跨平台使用。相对于CWnLock,其优势在于发生异常时能自动解锁,避免线程死锁。

#include "WnLock.h"

class CWnScopedLock : public CWnLock
{
public:
CWnScopedLock(CWnLock& mutex);
virtual ~CWnScopedLock(void);
public:
virtual void lock() {return m_pMutex->lock();}
virtual void unlock() {return m_pMutex->unlock();}
private:
CWnLock* m_pMutex;
};

// WnScopedLock.cpp
#include "WnScopedLock.h"

CWnScopedLock::CWnScopedLock(CWnLock& mutex)
{
m_pMutex = &mutex;
m_pMutex->lock();
}

CWnScopedLock::~CWnScopedLock(void)
{
m_pMutex->unlock();
}

三。基于Boost的事件信号通知。由于Boost原本的事件通知在使用时还需要带一把锁,使用过程也比较麻烦,不好理解。因此对其进行封装,封装后的CWnEvent类使用时类似Windows下的CEvent类,且能跨平台使用,效率也比较高(使用了之前的互斥锁和自动锁)。代码示例如下:
#pragma once

// Author: Huangzhidan | great3779@sina.com
// copyright: @Wind Information Co., Ltd (Wind Info) ShangHai
// Create time: 2011-09-10
// 封装了boost的condition_variable,使其使用方法很接近Windows的Event。其优势在于能跨平台使用。

#include "WnLock.h"
#include "WnScopedLock.h"
#include <boost/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

class CWnEvent
{
public:
CWnEvent(void){}
virtual ~CWnEvent(void){}

public:
void wait()
{
CWnScopedLock scoped_lock(m_lock);
m_condition.wait(scoped_lock);
}

bool timed_wait(int nMillSec)
{
CWnScopedLock scoped_lock(m_lock);
return m_condition.timed_wait(scoped_lock, boost::posix_time::millisec(nMillSec));
}

void notify_one() { m_condition.notify_one(); }

void notify_all() { m_condition.notify_all(); }

private:
CWnLock m_lock;
boost::condition_variable_any m_condition;
};

四。一个非常高效、使用简单、扩展性强的数据处理器。此数据处理器主要优点如下:
1. 跨平台
2. 将线程通信间比较难的线程安全、信号通知等机制均封装在对象中
3. 由于数据的传递完全依靠事件通知,因此数据的流转效率以及吞吐量均非常高(已经使用在公司海外股票FeedHandler上,吞吐量轻松突破每秒500,000个包)
4. 接口简单,使用非常方便(可参考BoostDemo程序)
5. 由于采用了模板技术以及运行时绑定技术,因此可扩展性非常强。(公司FeedHandler项目,已经在CDataHandler类上派生了数十个类,扩展性非常好)

以下是示例代码:

#pragma once

// Author: Huangzhidan | great3779@sina.com
// copyright: @Wind Information Co., Ltd (Wind Info) ShangHai
// Create time: 2011-09-10

// 一个可用于线程间传递数据的类。此类的优势在于:
// 1. 跨平台
// 2. 将线程通信间比较难的线程安全、信号通知等机制均封装在对象中
// 3. 由于数据的传递完全依靠事件通知,因此数据的流转效率以及吞吐量均非常高(已经使用在公司海外股票FeedHandler上,吞吐量轻松突破每秒500,000个包)
// 4. 接口简单,使用非常方便(可参考BoostDemo程序)

// 使用方法
// CDataHandler是一个基类,使用时定义子类对其进行继承。
// 继承类重写DataThread和DataFunc方法(一般情况下仅需重写DataFunc方法即可)

#include "AtomQueue/WnQueue.h"
#include "Synchronous/WnEvent.h"
#include <boost/thread.hpp>

// a pure virtual function
template <class T> class CDataHandler
{
public:
CDataHandler(void) {Start();}
virtual ~CDataHandler() {}

public:
// 单个通知接口(一般不需调用)
void NotifyOne() {m_event.notify_one();}

// 全部通知接口(一般不需调用)
void NotifyAll() {m_event.notify_all();}

// 推入流数据
void Put(T& t)
{
m_record_set.push(t);

// 发送通知信号
m_event.notify_one();
}

// 获取缓冲区buffer size的接口
int BufferSize() { return m_record_set.size(); }

protected:
// 处理数据的线程,可在运行时绑定
virtual void DataThread()
{
while(true)
{
m_event.wait();
while(!m_record_set.empty())
{
T t;
if(m_record_set.get(t))
{
DataFunc(t);
}
}
}
}

// 处理数据的函数,可在运行时绑定
virtual void DataFunc(T& t) {}

// 以下为内部函数
protected:
// 开始运行
void Start()
{
boost::thread t(&CDataHandler::DataThread, this);
}

protected:
// 流数据集
CWnQueue<T> m_record_set;

// 信号
CWnEvent m_event;
};

基于此类的派生类使用,可以参考源代码:
http://download.csdn.net/detail/great3779/3998262


更新于20130701,在构造函数中,增加了可以设置消费线程数量以及缓冲区大小。

#pragma once

// Author: Huangzhidan | great3779@sina.com
// copyright: @Wind Information Co., Ltd (Wind Info) ShangHai
// Create time: 2011-09-10

// 一个可用于线程间传递数据的类。此类的优势在于:
// 1. 跨平台
// 2. 将线程通信间比较难的线程安全、信号通知等机制均封装在对象中
// 3. 由于数据的传递完全依靠事件通知,因此数据的流转效率以及吞吐量均非常高(已经使用在公司海外股票FeedHandler上,吞吐量轻松突破每秒500,000个包)
// 4. 接口简单,使用非常方便(可参考BoostDemo程序)

// 使用方法
// CDataHandler是一个基类,使用时定义子类对其进行继承。
// 一般情况下仅需重写DataFunc方法即可(可满足99%需求)。如果对发包逻辑有特殊需求,则重写DataThread线程方法。

// Revise: 2012-12-18
// 1. 与CWnQueue及CWnEvent的关系从依赖更新为继承,接口暴露更丰富且清晰.
// 2. 开放了缓冲区阈值设置,避免了系统缓冲区的溢出.

// Revise: 20130731
// 1. 构造函数中能指定消费线程数据,做为一个多消费者数据处理机。
// 2. 增加了手动启动的接口

#include "WnQueue.h"
#include "WnEvent.h"
#include <boost/thread.hpp>

// a data processor handler.
template <class T> class CDataHandler : public CWnQueue<T>, public CWnEvent
{
public:
///< threadNum: 消费线程数目,默认为1
///< threshold: 内部缓冲区阈值,默认为无限
CDataHandler(int threadNum = 1, bool autoStart = true, int threshold = 0) : m_thread_num(threadNum), m_threshold_size(threshold), m_bStart(false)
{
if(autoStart)
{
Start();
m_bStart = true;
}
}
virtual ~CDataHandler() {}

public:
// 推入元数据
void Put(T& t)
{
push(t);

if(m_threshold_size > 0 && size() > m_threshold_size)
clear();
else
notify_one(); // send semaphore
}

public:
// start a run thread.
void Start()
{
if(!m_bStart)
{
for(int i = 0; i < m_thread_num; ++i)
{
boost::thread t(&CDataHandler::DataThread, this);
}
}
}

public:
///< 设置消费线程数
void SetThreadNum(int threadNum) {m_thread_num = threadNum;}

///< 设置队列缓冲区阈值
void SetBufferThreshold(int threshold) {m_threshold_size = threshold;}

protected:
// 数据处理线程. 如有特殊需要,请在子类中重写.
virtual void DataThread()
{
while(true)
{
wait(); // wait semaphore
while(!empty())
{
T t;
if( get(t) )
{
DataFunc(t);
}
}
}
}

// 数据处理核心函数,请在子类中重写!
virtual void DataFunc(T& t) = 0;

private:
// 缓冲区阈值。一旦缓冲区达到此阈值,则清空所有缓冲区内容
int m_threshold_size;

///< 内部消费线程数目
int m_thread_num;

///< 处理机是否已启动的标识
bool m_bStart;
};