使用Condition Variables实现一个线程安全队列
测试机: i7-4800MQ .7GHz, logical core, physical core,
8G memory, 256GB SSD,
-bit windows
compiler: VS2010, boost 1.44
性能测试结果: case1:
9百万数据, 生产者,3消费者,12秒 ::,[INFO]: =============================== Started ====================
===========
::,[INFO]: million
::,[INFO]: [ONLY TEST] all data pop-ed, exitting...
::,[INFO]: [ONLY TEST] all data pop-ed, exitting...
::,[INFO]: [ONLY TEST] all data pop-ed, exitting...
::,[INFO]: =============================== end ===============================
Press any key to continue . . .
Basic Thread Safety 使用mutex实现简单的线程安全
template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
public:
void push(const Data& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
} bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
} Data& front()
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.front();
} Data const& front() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.front();
} void pop()
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.pop();
}
};
front
和pop会互相竞争。
但是对于一个消费者的系统就没事。 但是假如队列是空的话多个线程有可能无事可做,进入loop 等待->check->等待...:while(some_queue.empty())
{
boost::this_thread::sleep(boost::posix_time::milliseconds(50));
}
尽管sleep()相较于忙等待避免了大量cpu资源的浪费,这个设计还是有些不足。首先线程必须每隔50ms(或者其他间隔)唤醒一次用来锁定mutex、检查队列、解锁mutex、强制上下文切换。 其次,睡眠的间隔时间相当于强加了一个限制给响应时间:数据被加到队列后到线程响应的响应时间。——— 0ms到50ms都有可能,平均是25ms。
使用Condition Variable等待
concurrent_queue里实现了个成员方法
:template<typename Data>
class concurrent_queue
{
private:
boost::condition_variable the_condition_variable;
public:
void wait_for_data()
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
}
}
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
bool const was_empty=the_queue.empty();
the_queue.push(data);
if(was_empty)
{
the_condition_variable.notify_one();
}
}
// rest as before
};
当你执行唤醒操作的时候要小心
template<typename Data>
class concurrent_queue
{
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
bool const was_empty=the_queue.empty();
the_queue.push(data); lock.unlock(); // unlock the mutex if(was_empty)
{
the_condition_variable.notify_one();
}
}
// rest as before
};
减少锁的开销
wait_for_data
, front
以及pop
全都要锁mutex,消费者还是会快速交替调用锁操作。 吧wait和pop整合为一个操作可以减少加锁解锁操作:template<typename Data>
class concurrent_queue
{
public:
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
} popped_value=the_queue.front();
the_queue.pop();
} // rest as before
};
boost::optional来避免一些问题。
处理多个消费者
wait_and_pop
不仅移掉了锁的间接开销还带来了额外的好处。 —— 现在自动允许多个消费者了。std::vector
thread-safe — 你需要外部锁去做许多共同的工作,让内部锁变得浪费资源)。notify_all()
而不是 notify_one()来唤醒线程。
notify_one()而即使之前队列里面不是空的。
template<typename Data>
class concurrent_queue
{
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
lock.unlock();
the_condition_variable.notify_one();//每次都唤醒一个
}
// rest as before
};
template<typename Data>
class concurrent_queue
{
public:
bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if(the_queue.empty())
{
return false;
} popped_value=the_queue.front();
the_queue.pop();
return true;
} // rest as before
};
front
and pop
方法,我们这个简单而又单纯的实现,现在已经变成了一个可用的多生产者多消费者队列。最终方案
多生产者多消费者队列的最终方案:
/*'''
Created on Nov 10, 2014
@author: ScottGu<150316990@qq.com, gu.kai.66@gmail.com>
performance tested:
10 million data (100B each), 3 producers and 3 consumers:
*need about 10 seconds to process
environment: 64bit win7, i7-4800MQ, 8GB
'''*/
//
// there's a spurious wake. issue, if there're many consumers (consumer number is depends on
// environment and performance requirement, usually hundreds consumers),
// cpu may waste many time here in this loop, causing performance issue. --scott
/
#pragma once #include "stdafx.h"
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp> #include "log4cxx/xml/domconfigurator.h" #include <string>
#include <vector>
#include <queue> template<typename Data>
class Concurrent_Queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable; public: void push(Data const& data)
{
/*if (data == NULL)
return;
*/
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
lock.unlock();
the_condition_variable.notify_one();
} bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
} void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex); while (the_queue.empty())
{
//log4cxx::Logger::getRootLogger()->debug("conQueue waiting...");
the_condition_variable.wait(lock);
} //log4cxx::Logger::getRootLogger()->debug("conQueue poping...");
popped_value = the_queue.front();
the_queue.pop();
} bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if (the_queue.empty())
{
return false;
} popped_value = the_queue.front();
the_queue.pop();
return true;
} //
// return value: pop-ed return true; otherwise false;
//
template<typename Duration>
bool timed_wait_and_pop(Data& popped_value,
Duration const& wait_duration)
{
boost::mutex::scoped_lock lock(the_mutex);
if (!the_condition_variable.timed_wait(lock, wait_duration,
queue_not_empty(the_queue)))
return false;
popped_value = the_queue.front();
the_queue.pop();
return true;
} struct queue_not_empty
{
std::queue<Data>& queue; queue_not_empty(std::queue<Data>& queue_) :
queue(queue_)
{}
bool operator()() const
{
return !queue.empty();
}
};
};
需要注意的问题or可以改进的方向:
1. 内存占用太大耗尽系统内存:当队列大小超过设定的值时阻塞生产者。
2. 消费者太多会显著降低性能,原因是假唤醒问题。(当生产者消费者数量都是内核数量4倍时,每毫秒仅能处理60条消息,使用的是i7处理器8个逻辑内核)
测试代码:
void publish_data(Concurrent_Queue<std::string>* qData, int msg_cnt){
for (int i = ; i <= msg_cnt; i++)
{
std::string msg = "thread(*) news " + boost::lexical_cast<std::string>(i);
if (i % ( * ) == && i >= ( * ))
{
log4cxx::Logger::getRootLogger()->info("1 million produced..."
+ boost::lexical_cast<std::string>(boost::this_thread::get_id()));
}
qData->push(msg);
}
} void consume_data(Concurrent_Queue<std::string>* qData){
bool hasMore = true;
int consumed_cnt = ;
std::string msg = "s"; while (hasMore)
{
consumed_cnt++; boost::posix_time::time_duration duration = boost::posix_time::milliseconds();
hasMore = qData->timed_wait_and_pop<boost::posix_time::time_duration>(msg, duration); if (consumed_cnt % ( * ) == && consumed_cnt >= ( * ))
{
log4cxx::Logger::getRootLogger()->info("1 Million Consumed !!"
+ boost::lexical_cast<std::string>(boost::this_thread::get_id()));
}
} log4cxx::Logger::getRootLogger()->info("Done 3 sec ago, thread:"
+ boost::lexical_cast<std::string>(boost::this_thread::get_id())+" \nNum consumed:\n"
+ boost::lexical_cast<std::string>(consumed_cnt));
} void run_test_concurrent_queue(){ int max_msg_cnt = * ; int isFinish, publisherCnt, consumerCnt; while (isFinish != )
{
std::cout << "enter isFinish:" << std::endl;
std::cin >> isFinish;
std::cout << "enter publisherCnt:" << std::endl;
std::cin >> publisherCnt;
std::cout << "enter consumerCnt:" << std::endl;
std::cin >> consumerCnt; Concurrent_Queue<std::string>* pQData = new Concurrent_Queue<std::string>();
log4cxx::Logger::getRootLogger()->info("10 million data\nhere we go! ...");
std::vector<boost::thread *> pool; // producer
for (int j = ; j < publisherCnt; ++j)
{
pool.push_back(new boost::thread(boost::bind(publish_data, pQData, max_msg_cnt / publisherCnt)));
} for (int i = ; i < consumerCnt; ++i)
{
pool.push_back(new boost::thread(boost::bind(consume_data, pQData)));
} for (int k = ; k < publisherCnt+consumerCnt; ++k)
{
pool[k]->join();
delete pool[k];
} log4cxx::Logger::getRootLogger()->info("10 million done!!");
delete pQData; } }