trinitycore 魔兽服务器源码分析(三) 多线程相关

时间:2022-09-29 17:00:02

先看LockedQueue.h

template <class T, typename StorageType = std::deque<T> >
class LockedQueue{......}

一个带锁的多线程可安全访问的类,容器默认使用std::deque

常规代码 push进T类型的元素 pop出T类型的元素

使用锁定 保证线程安全

相比C++11之前的繁琐做法 现在加锁可以使用 std::lock_guard

当使用std::lock_guard<std::mutex> lock(_lock)后 代码进入加锁模式

脱出lock的变量生存周期时候 自动解锁

代码如下

 /*
* Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
* Copyright (C) 2005-2008 MaNGOS <http://getmangos.com/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/ #ifndef LOCKEDQUEUE_H
#define LOCKEDQUEUE_H #include <deque>
#include <mutex> template <class T, typename StorageType = std::deque<T> >
class LockedQueue
{
//! Lock access to the queue.
std::mutex _lock; //! Storage backing the queue.
StorageType _queue; //! Cancellation flag.
volatile bool _canceled; public: //! Create a LockedQueue.
LockedQueue()
: _canceled(false)
{
} //! Destroy a LockedQueue.
virtual ~LockedQueue()
{
} //! Adds an item to the queue.
void add(const T& item)
{
lock(); _queue.push_back(item); unlock();
} //! Adds items back to front of the queue
template<class Iterator>
void readd(Iterator begin, Iterator end)
{
std::lock_guard<std::mutex> lock(_lock);
_queue.insert(_queue.begin(), begin, end);
} //! Gets the next result in the queue, if any.
bool next(T& result)
{
std::lock_guard<std::mutex> lock(_lock); if (_queue.empty())
return false; result = _queue.front();
_queue.pop_front(); return true;
} template<class Checker>
bool next(T& result, Checker& check)
{
std::lock_guard<std::mutex> lock(_lock); if (_queue.empty())
return false; result = _queue.front();
if (!check.Process(result))
return false; _queue.pop_front();
return true;
} //! Peeks at the top of the queue. Check if the queue is empty before calling! Remember to unlock after use if autoUnlock == false.
T& peek(bool autoUnlock = false)
{
lock(); T& result = _queue.front(); if (autoUnlock)
unlock(); return result;
} //! Cancels the queue.
void cancel()
{
std::lock_guard<std::mutex> lock(_lock); _canceled = true;
} //! Checks if the queue is cancelled.
bool cancelled()
{
std::lock_guard<std::mutex> lock(_lock);
return _canceled;
} //! Locks the queue for access.
void lock()
{
this->_lock.lock();
} //! Unlocks the queue.
void unlock()
{
this->_lock.unlock();
} ///! Calls pop_front of the queue
void pop_front()
{
std::lock_guard<std::mutex> lock(_lock);
_queue.pop_front();
} ///! Checks if we're empty or not with locks held
bool empty()
{
std::lock_guard<std::mutex> lock(_lock);
return _queue.empty();
}
};
#endif

//=========================================================

MPSCQueue.h

这是一个操作原子类型的元素指针的链表队列

template<typename T>
class MPSCQueue
{
std::atomic<Node*> _head;
std::atomic<Node*> _tail;
}

Node就是一个链表的类型  当我们使用exchange交换Node的指针时 是多线程安全的

而std::memory_order_acq_rel 类似的原子操作的内存模式参数

可参考

http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue

https://www.zhihu.com/question/24301047

源码

 /*
* Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/ #ifndef MPSCQueue_h__
#define MPSCQueue_h__ #include <atomic>
#include <utility> // C++ implementation of Dmitry Vyukov's lock free MPSC queue
// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
template<typename T>
class MPSCQueue
{
public:
MPSCQueue() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed))
{
Node* front = _head.load(std::memory_order_relaxed);
front->Next.store(nullptr, std::memory_order_relaxed);
} ~MPSCQueue()
{
T* output;
while (this->Dequeue(output))
; Node* front = _head.load(std::memory_order_relaxed);
delete front;
} void Enqueue(T* input)
{
Node* node = new Node(input);
Node* prevHead = _head.exchange(node, std::memory_order_acq_rel);
prevHead->Next.store(node, std::memory_order_release);
} bool Dequeue(T*& result)
{
Node* tail = _tail.load(std::memory_order_relaxed);
Node* next = tail->Next.load(std::memory_order_acquire);
if (!next)
return false; result = next->Data;
_tail.store(next, std::memory_order_release);
delete tail;
return true;
} private:
struct Node
{
Node() = default;
explicit Node(T* data) : Data(data) { Next.store(nullptr, std::memory_order_relaxed); } T* Data;
std::atomic<Node*> Next;
}; std::atomic<Node*> _head;
std::atomic<Node*> _tail; MPSCQueue(MPSCQueue const&) = delete;
MPSCQueue& operator=(MPSCQueue const&) = delete;
}; #endif // MPSCQueue_h__

//=========================================================================

使用多线程安全的队列 设计出一个生产消费者队列

template <typename T>
class ProducerConsumerQueue
{

private:
std::mutex _queueLock;        //进行操作时候的锁
std::queue<T> _queue;        // 存储元素的队列
std::condition_variable _condition;   //条件变量
std::atomic<bool> _shutdown;     //原子类型的标记 标记此队列是否关闭

}

//构造函数

ProducerConsumerQueue<T>() : _shutdown(false) { }

// 加锁情况下  生产者push元素进队列 并通过条件变量提示消费者进行处理

void Push(const T& value)
{
std::lock_guard<std::mutex> lock(_queueLock);
_queue.push(std::move(value));

_condition.notify_one();
}

//加锁模式下 判断队列是否为空

bool Empty()
{
std::lock_guard<std::mutex> lock(_queueLock);

return _queue.empty();
}

//加锁模式下 队列不空且没有shuntdown关闭标记 进行元素弹出操作

bool Pop(T& value)
{
std::lock_guard<std::mutex> lock(_queueLock);

if (_queue.empty() || _shutdown)
return false;

value = _queue.front();

_queue.pop();

return true;
}

//消费者调用 加锁模式在未有元素的处理下进行等待元素

void WaitAndPop(T& value)
{
std::unique_lock<std::mutex> lock(_queueLock);

// we could be using .wait(lock, predicate) overload here but it is broken
// https://connect.microsoft.com/VisualStudio/feedback/details/1098841
while (_queue.empty() && !_shutdown)
_condition.wait(lock);

if (_queue.empty() || _shutdown)
return;

value = _queue.front();

_queue.pop();
}

void Cancel()
{
std::unique_lock<std::mutex> lock(_queueLock);

while (!_queue.empty())
{
T& value = _queue.front();

DeleteQueuedObject(value);

_queue.pop();
}

_shutdown = true;

_condition.notify_all();
}

//c++ 模板 template的小技巧 SFINAE (substitution-failure-is-not-an-error) 原则
//根据传入的模板参数是否是指针 而适配到不同的处理函数中 如果是指针类型则在删除时 delete指针
//否则适配到另个函数 删除时候什么都不做

template<typename E = T>
typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj) { delete obj; }

template<typename E = T>
typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*packet*/) { }

 /*
* Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/ #ifndef _PCQ_H
#define _PCQ_H #include <condition_variable>
#include <mutex>
#include <queue>
#include <atomic>
#include <type_traits> template <typename T>
class ProducerConsumerQueue
{
private:
std::mutex _queueLock;
std::queue<T> _queue;
std::condition_variable _condition;
std::atomic<bool> _shutdown; public: ProducerConsumerQueue<T>() : _shutdown(false) { } void Push(const T& value)
{
std::lock_guard<std::mutex> lock(_queueLock);
_queue.push(std::move(value)); _condition.notify_one();
} bool Empty()
{
std::lock_guard<std::mutex> lock(_queueLock); return _queue.empty();
} bool Pop(T& value)
{
std::lock_guard<std::mutex> lock(_queueLock); if (_queue.empty() || _shutdown)
return false; value = _queue.front(); _queue.pop(); return true;
} void WaitAndPop(T& value)
{
std::unique_lock<std::mutex> lock(_queueLock); // we could be using .wait(lock, predicate) overload here but it is broken
// https://connect.microsoft.com/VisualStudio/feedback/details/1098841
while (_queue.empty() && !_shutdown)
_condition.wait(lock); if (_queue.empty() || _shutdown)
return; value = _queue.front(); _queue.pop();
} void Cancel()
{
std::unique_lock<std::mutex> lock(_queueLock); while (!_queue.empty())
{
T& value = _queue.front(); DeleteQueuedObject(value); _queue.pop();
} _shutdown = true; _condition.notify_all();
} private:
template<typename E = T>
typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj) { delete obj; } template<typename E = T>
typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*packet*/) { }
}; #endif