c++11 线程池学习笔记 (一) 任务队列

时间:2023-12-25 19:45:01

学习内容来自一下地址

http://www.cnblogs.com/qicosmos/p/4772486.html

github https://github.com/qicosmos/cosmos

主要使用c++11的多线程编程的互斥 同步等功能 编写一个生产消费者队列 用于任务的传递

将任务的接受处理进行分离 更加简洁 易于处理和扩展修改

template<typename T> //队列传递的元素  为了适配更多情况 这里使用了模板
class SyncQueue {
...................
}

c++11 线程池学习笔记  (一)  任务队列

//初始化函数 定义队列最大可容纳元素数目和停止表示

SyncQueue(int maxSize):m_maxSize(maxSize),m_needStop(false){}

//Put函数即是提价元素 之所以使用两种函数 是考虑左值和右值 提高元素拷贝效率

void Put(const T& x) {
Add(x);
}
void Put(T&& x) {
Add(std::forward<T>(x));
}

//查看Add函数

template<typename F>
void Add(F&& x) {
std::unique_lock<std::mutex> locker(m_mutex);   //加锁
m_notFull.wait(locker, [this] {return m_needStop || NotFull(); }); // 使条件变量等待 队列未满或者标记停止标识
if (m_needStop) //停止退出
return;
m_queue.push_back(std::forward<F>(x));  //则将元素添加进容器
m_notEmpty.notify_one();
}
//加锁情况下 使条件变量等待 队列未满或者标记停止标识 则将元素添加进容器或者停止退出

//元素取出函数 一个是取出容器内全部元素 一个是取出单个元素

void Take(std::list<T>& list)

void Take(T& t)

//void Take(std::list<T>& list) 取出容器内全部元素 直接将容器去除 并清除队列内的容器

//代码里直接使用了move函数  并notify Take函数中的条件变量

void Take(std::list<T>& list) {
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
if (m_needStop)
return;
list = std::move(m_queue);
m_notFull.notify_one();
}

//void Take(T& t) 加锁情况下 使用条件变量等待停止标识或者容器非空
//将容器内元素弹出 并notify Add函数中的条件变量

void Take(T& t) {
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
if (m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}

源码如下 (仅仅队列代码,测试代码将同线程池一同测试)

 #pragma once

 #include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream> using namespace std; template<typename T>
class SyncQueue {
public:
SyncQueue(int maxSize):m_maxSize(maxSize),m_needStop(false){}
void Put(const T& x) {
Add(x);
}
void Put(T&& x) {
Add(std::forward<T>(x));
}
void Take(std::list<T>& list) {
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
if (m_needStop)
return;
list = std::move(m_queue);
m_notFull.notify_one();
} void Take(T& t) {
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
if (m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
} void Stop() {
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
} bool Empty() {
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
} bool Full() {
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
} size_t Size() {
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
} int Count() {
return m_queue.size();
}
private:
bool NotFull()const {
bool full = m_queue.size() >= m_maxSize;
if (full)
cout << "buffer area is full,wait..." << endl;
return !full;
}
bool NotEmpty()const {
bool empty = m_queue.empty();
if (empty)
cout << "buffer area is empty,wait... " <<
" threadID: " << this_thread::get_id() <<endl;
return !empty;
} template<typename F>
void Add(F&& x) {
std::unique_lock<std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
if (m_needStop)
return;
m_queue.push_back(std::forward<F>(x));
m_notEmpty.notify_one();
}
private:
std::list<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_notEmpty;
std::condition_variable m_notFull;
int m_maxSize;
bool m_needStop;
};