(1)消息队列的实现
#ifndef NET_FRAME_CONCURRENT_QUEUE_H
#define NET_FRAME_CONCURRENT_QUEUE_H
#include <queue>
#include <mutex>
#include <condition_variable>
template<class Type>
/*消息队列实现*/
class ConcurrentQueue {
ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
ConcurrentQueue(const ConcurrentQueue& other) = delete;
public:
ConcurrentQueue() : _queue(), _mutex(), _condition() { }
virtual ~ConcurrentQueue() { }
void Push(Type record) {
std::lock_guard <std::mutex> lock(_mutex);
_queue.push(record);
_condition.notify_one();
}
bool Pop(Type& record, bool isBlocked = true) {
if (isBlocked) {
std::unique_lock <std::mutex> lock(_mutex);
while (_queue.empty()) {
_condition.wait(lock);
}
}
else // If user wants to retrieve data in non-blocking mode
{
std::lock_guard <std::mutex> lock(_mutex);
if (_queue.empty()) {
return false;
}
}
record = std::move(_queue.front());
_queue.pop();
return true;
}
int32_t Size() {
std::lock_guard <std::mutex> lock(_mutex);
return _queue.size();
}
bool Empty() {
std::lock_guard <std::mutex> lock(_mutex);
return _queue.empty();
}
private:
std::queue <Type> _queue;
mutable std::mutex _mutex;
std::condition_variable _condition;
};
#endif //NET_FRAME_CONCURRENT_QUEUE_H
(2)拥有消息队列的线程池的实现
.h文件如下
#ifndef NET_FRAME_THREAD_POOL_H
#define NET_FRAME_THREAD_POOL_H
#include "ConcurrentQueue.h"
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#define MIN_THREADS 10
template<class Type>
class ThreadPool {
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(const ThreadPool& other) = delete;
public:
ThreadPool(int32_t threads, std::function<void(Type& record)> handler);
virtual ~ThreadPool();
void Submit(Type record);
private:
private:
bool _shutdown;
int32_t _threads;
std::function<void(Type& record)> _handler;
std::vector <std::thread> _workers;
ConcurrentQueue <Type> _tasks;
};
template<class Type>
ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler)
: _shutdown(false),
_threads(threads),
_handler(handler),
_workers(),
_tasks() {
if (_threads < MIN_THREADS)
_threads = MIN_THREADS;
for (int32_t i = 0; i < _threads; ++i)
_workers.emplace_back(
[this] {
while (!_shutdown) {
Type record;
_tasks.Pop(record, true);
_handler(record);
}
}
);
}
template<class Type>
ThreadPool<Type>::~ThreadPool() {
for (std::thread &worker: _workers)
worker.join();
}
template<class Type>
void ThreadPool<Type>::Submit(Type record) {
_tasks.Push(record);
}
#endif //NET_FRAME_THREAD_POOL_H