Because the lack of condition variable in windows(though it is introduced since vista, it's not supported in windows XP and 2003), it is not very easy to implement a thread-safe queue in c++. Strategies for Implementing POSIX Condition Variables on Win32. What I required is to just use CriticalSection or Mutex and Event without using semaphore and condition variable.
由于windows中缺少条件变量(虽然它是vista以来引入的,但是windows XP和2003中不支持它),所以在c++中实现线程安全队列并不容易。在Win32上实现POSIX条件变量的策略。我需要的只是使用CriticalSection或互斥和事件,而不使用信号量和条件变量。
I also tried to find an exact implementation that just using win32 native API, but no luck. So I finished one by myself. The problem is I am not 100% sure the code is thread-safe. Who can tell me it is OK or not?
我还试图找到一个只使用win32本机API的精确实现,但没有运气。所以我自己完成了一个。问题是我不能百分之百地确定代码是线程安全的。谁能告诉我这是好是坏?
class CEventSyncQueue
{
public:
CEventSyncQueue(int nCapacity = -1);
virtual ~CEventSyncQueue();
virtual void Put(void* ptr);
virtual void* Get();
protected:
int m_nCapacity;
CPtrList m_list;
CRITICAL_SECTION m_lock;
HANDLE m_hGetEvent;
HANDLE m_hPutEvent;
};
CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
m_nCapacity = nCapacity;
::InitializeCriticalSection(&m_lock);
m_hPutEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_hGetEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
}
CEventSyncQueue::~CEventSyncQueue()
{
m_list.RemoveAll();
::CloseHandle(m_hGetEvent);
::CloseHandle(m_hPutEvent);
::DeleteCriticalSection(&m_lock);
}
void CEventSyncQueue::Put(void* ptr)
{
::EnterCriticalSection(&m_lock);
while(m_nCapacity > 0 && m_list.GetCount() >= m_nCapacity)
{
::LeaveCriticalSection(&m_lock);
//wait
if(::WaitForSingleObject(m_hPutEvent, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
::EnterCriticalSection(&m_lock);
}
if(m_nCapacity > 0)
{
ASSERT(m_list.GetCount() < m_nCapacity);
}
m_list.AddTail(ptr);
::SetEvent(m_hGetEvent); //notifyAll
::LeaveCriticalSection(&m_lock);
}
void* CEventSyncQueue::Get()
{
::EnterCriticalSection(&m_lock);
while(m_list.IsEmpty())
{
::LeaveCriticalSection(&m_lock);
//wait
if(::WaitForSingleObject(m_hGetEvent, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
::EnterCriticalSection(&m_lock);
}
ASSERT(!m_list.IsEmpty());
void* ptr = m_list.RemoveHead();
::SetEvent(m_hPutEvent); //notifyAll
::LeaveCriticalSection(&m_lock);
return ptr;
}
3 个解决方案
#1
0
On second thoughts, it's hardly necessary to explicitly implement a semaphore. Instead, just think about how you would implement a semaphore using events, and approach your the problem that way. My first attempt used manual-reset events, which was inefficient but manifestly correct, and then I optimized.
仔细想想,几乎没有必要显式地实现一个信号量。相反,只需考虑如何使用事件实现信号量,并以这种方式处理您的问题。我的第一次尝试使用人工重置事件,这是低效的,但显然是正确的,然后我进行了优化。
Please note that I haven't debugged (or even compiled!) either of these code fragments, but they should give you the right idea. Here's the manual-reset version:
请注意,我没有调试(甚至编译!)这些代码片段,但是它们应该给您正确的想法。这是手工重置版:
class CEventSyncQueue
{
public:
CEventSyncQueue(int nCapacity = -1);
virtual ~CEventSyncQueue();
virtual void Put(void* ptr);
virtual void* Get();
protected:
int m_nCapacity;
CPtrList m_list;
CRITICAL_SECTION m_lock;
HANDLE m_queue_not_empty;
HANDLE m_queue_not_full;
};
CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
m_nCapacity = nCapacity;
::InitializeCriticalSection(&m_lock);
m_queue_not_empty = ::CreateEvent(NULL, TRUE, FALSE, NULL);
m_queue_not_full = ::CreateEvent(NULL, TRUE, TRUE, NULL);
}
CEventSyncQueue::~CEventSyncQueue()
{
m_list.RemoveAll();
::CloseHandle(m_queue_not_empty);
::CloseHandle(m_queue_not_full);
::DeleteCriticalSection(&m_lock);
}
void CEventSyncQueue::Put(void* ptr)
{
bool done = false;
while (!done)
{
// If the queue is full, we must wait until it isn't.
if(::WaitForSingleObject(m_queue_not_full, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
// However, we might not be the first to respond to the event,
// so we still need to check whether the queue is full and loop
// if it is.
::EnterCriticalSection(&m_lock);
if (m_nCapacity <= 0 || m_list.GetCount() < m_nCapacity)
{
m_list.AddTail(ptr);
done = true;
// The queue is definitely not empty.
SetEvent(m_queue_not_empty);
// Check whether the queue is now full.
if (m_nCapacity > 0 && m_list.GetCount() >= m_nCapacity)
{
ResetEvent(m_queue_not_full);
}
}
::LeaveCriticalSection(&m_lock);
}
}
void* CEventSyncQueue::Get()
{
void *result = nullptr;
while (result == nullptr)
{
// If the queue is empty, we must wait until it isn't.
if(::WaitForSingleObject(m_queue_not_empty, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
// However, we might not be the first to respond to the event,
// so we still need to check whether the queue is empty and loop
// if it is.
::EnterCriticalSection(&m_lock);
if (!m_list.IsEmpty())
{
result = m_list.RemoveHead();
ASSERT(result != nullptr);
// The queue shouldn't be full at this point!
ASSERT(m_nCapacity <= 0 || m_list.GetCount() < m_nCapacity);
SetEvent(m_queue_not_full);
// Check whether the queue is now empty.
if (m_list.IsEmpty())
{
ResetEvent(m_queue_not_empty);
}
}
::LeaveCriticalSection(&m_lock);
}
}
And here's the more efficient, auto-reset events version:
下面是更有效的自动重置事件版本:
class CEventSyncQueue
{
public:
CEventSyncQueue(int nCapacity = -1);
virtual ~CEventSyncQueue();
virtual void Put(void* ptr);
virtual void* Get();
protected:
int m_nCapacity;
CPtrList m_list;
CRITICAL_SECTION m_lock;
HANDLE m_queue_not_empty;
HANDLE m_queue_not_full;
};
CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
m_nCapacity = nCapacity;
::InitializeCriticalSection(&m_lock);
m_queue_not_empty = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_queue_not_full = ::CreateEvent(NULL, FALSE, TRUE, NULL);
}
CEventSyncQueue::~CEventSyncQueue()
{
m_list.RemoveAll();
::CloseHandle(m_queue_not_empty);
::CloseHandle(m_queue_not_full);
::DeleteCriticalSection(&m_lock);
}
void CEventSyncQueue::Put(void* ptr)
{
if (m_nCapacity <= 0)
{
::EnterCriticalSection(&m_lock);
m_list.AddTail(ptr);
SetEvent(m_queue_not_empty);
::LeaveCriticalSection(&m_lock);
return;
}
bool done = false;
while (!done)
{
// If the queue is full, we must wait until it isn't.
if(::WaitForSingleObject(m_queue_not_full, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
// However, under some (rare) conditions we'll get here and find
// the queue is already full again, so be prepared to loop.
::EnterCriticalSection(&m_lock);
if (m_list.GetCount() < m_nCapacity)
{
m_list.AddTail(ptr);
done = true;
SetEvent(m_queue_not_empty);
if (m_list.GetCount() < m_nCapacity)
{
SetEvent(m_queue_not_full);
}
}
::LeaveCriticalSection(&m_lock);
}
}
void* CEventSyncQueue::Get()
{
void *result = nullptr;
while (result == nullptr)
{
// If the queue is empty, we must wait until it isn't.
if(::WaitForSingleObject(m_queue_not_empty, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
// However, under some (rare) conditions we'll get here and find
// the queue is already empty again, so be prepared to loop.
::EnterCriticalSection(&m_lock);
if (!m_list.IsEmpty())
{
result = m_list.RemoveHead();
ASSERT(result != nullptr);
// The queue shouldn't be full at this point!
if (m_nCapacity <= 0) ASSERT(m_list.GetCount() < m_nCapacity);
SetEvent(m_queue_not_full);
if (!m_list.IsEmpty())
{
SetEvent(m_queue_not_empty);
}
}
::LeaveCriticalSection(&m_lock);
}
}
#2
1
It's trivial to implement a thread-safe queue in Windows. I've done it in Delphi, C++, BCB etc.
在Windows中实现线程安全队列很简单。我用Delphi, c++, BCB等做过。
Why do you think that a condition variable is required? How do you think that Windows Message Queues work?
为什么你认为条件变量是必需的?您认为Windows消息队列如何工作?
Events are the wrong primitive to use for P-C queues. Easiest/clearest way is to use a semaphore.
事件是用于P-C队列的错误原语。最简单/最清楚的方法是使用信号量。
Simple unbounded producer-consumer queue.
简单的队列的生产国和消费国。
template <typename T> class PCSqueue{
CRITICAL_SECTION access;
deque<T> *objectQueue;
HANDLE queueSema;
public:
PCSqueue(){
objectQueue=new deque<T>;
InitializeCriticalSection(&access);
queueSema=CreateSemaphore(NULL,0,MAXINT,NULL);
};
void push(T ref){
EnterCriticalSection(&access);
objectQueue->push_front(ref);
LeaveCriticalSection(&access);
ReleaseSemaphore(queueSema,1,NULL);
};
bool pop(T *ref,DWORD timeout){
if (WAIT_OBJECT_0==WaitForSingleObject(queueSema,timeout)) {
EnterCriticalSection(&access);
*ref=objectQueue->back();
objectQueue->pop_back();
LeaveCriticalSection(&access);
return(true);
}
else
return(false);
};
};
Edit - a bounded queue would not be much more difficult - you need another semaphre to count the empty spaces. I don't use bounded queues, but I'm sure it would be OK - a bounded queue with 2 semaphores and a mutex/CS is s standard pattern.
编辑-一个有界的队列不会更困难—你需要另一个信号来计数空的空间。我不使用有界队列,但我确信它是可以的——有界队列有两个信号量和互斥/CS是s标准模式。
Edit: Use PostMessage() or PostThreadMessage() API calls - they are explicitly declared to be safe from the 'waveOutProc' callback. MSDN says that calling 'other wave functions' will cause deadlock - semaphore calls are not in that set and I would be very surprised indeed if SetEvent() was allowed but ReleaseSemaphore() was not. In fact, I would be surprised if SetEvent() was allowed while ReleaseSemaphore() was not ANYWHERE in Windows.
编辑:使用PostMessage()或PostThreadMessage() API调用——它们被显式地声明为对“waveOutProc”回调是安全的。MSDN说调用“其他wave函数”将会导致死锁——信号量调用不在那个集合中,如果SetEvent()被允许而ReleaseSemaphore()不允许的话,我会非常惊讶。事实上,如果在Windows的任何地方都不允许使用SetEvent(),那么我将会感到惊讶。
#3
0
condition variable? Do you mean Interlocked* functions? These have been around for a long time - I used them in Windows 2000. you can use them to build a concurrency system, but you'll still have to do a bit of work yourself.
条件变量?你是说连锁的*函数吗?它们已经存在很长一段时间了——我在Windows 2000中使用过它们。您可以使用它们来构建并发系统,但是您仍然需要自己做一些工作。
Alternatively, try OpenMP. To use this you'll need Visual Studio 2008 or greater.
或者,试着OpenMP。要使用它,您需要Visual Studio 2008或更高版本。
#1
0
On second thoughts, it's hardly necessary to explicitly implement a semaphore. Instead, just think about how you would implement a semaphore using events, and approach your the problem that way. My first attempt used manual-reset events, which was inefficient but manifestly correct, and then I optimized.
仔细想想,几乎没有必要显式地实现一个信号量。相反,只需考虑如何使用事件实现信号量,并以这种方式处理您的问题。我的第一次尝试使用人工重置事件,这是低效的,但显然是正确的,然后我进行了优化。
Please note that I haven't debugged (or even compiled!) either of these code fragments, but they should give you the right idea. Here's the manual-reset version:
请注意,我没有调试(甚至编译!)这些代码片段,但是它们应该给您正确的想法。这是手工重置版:
class CEventSyncQueue
{
public:
CEventSyncQueue(int nCapacity = -1);
virtual ~CEventSyncQueue();
virtual void Put(void* ptr);
virtual void* Get();
protected:
int m_nCapacity;
CPtrList m_list;
CRITICAL_SECTION m_lock;
HANDLE m_queue_not_empty;
HANDLE m_queue_not_full;
};
CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
m_nCapacity = nCapacity;
::InitializeCriticalSection(&m_lock);
m_queue_not_empty = ::CreateEvent(NULL, TRUE, FALSE, NULL);
m_queue_not_full = ::CreateEvent(NULL, TRUE, TRUE, NULL);
}
CEventSyncQueue::~CEventSyncQueue()
{
m_list.RemoveAll();
::CloseHandle(m_queue_not_empty);
::CloseHandle(m_queue_not_full);
::DeleteCriticalSection(&m_lock);
}
void CEventSyncQueue::Put(void* ptr)
{
bool done = false;
while (!done)
{
// If the queue is full, we must wait until it isn't.
if(::WaitForSingleObject(m_queue_not_full, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
// However, we might not be the first to respond to the event,
// so we still need to check whether the queue is full and loop
// if it is.
::EnterCriticalSection(&m_lock);
if (m_nCapacity <= 0 || m_list.GetCount() < m_nCapacity)
{
m_list.AddTail(ptr);
done = true;
// The queue is definitely not empty.
SetEvent(m_queue_not_empty);
// Check whether the queue is now full.
if (m_nCapacity > 0 && m_list.GetCount() >= m_nCapacity)
{
ResetEvent(m_queue_not_full);
}
}
::LeaveCriticalSection(&m_lock);
}
}
void* CEventSyncQueue::Get()
{
void *result = nullptr;
while (result == nullptr)
{
// If the queue is empty, we must wait until it isn't.
if(::WaitForSingleObject(m_queue_not_empty, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
// However, we might not be the first to respond to the event,
// so we still need to check whether the queue is empty and loop
// if it is.
::EnterCriticalSection(&m_lock);
if (!m_list.IsEmpty())
{
result = m_list.RemoveHead();
ASSERT(result != nullptr);
// The queue shouldn't be full at this point!
ASSERT(m_nCapacity <= 0 || m_list.GetCount() < m_nCapacity);
SetEvent(m_queue_not_full);
// Check whether the queue is now empty.
if (m_list.IsEmpty())
{
ResetEvent(m_queue_not_empty);
}
}
::LeaveCriticalSection(&m_lock);
}
}
And here's the more efficient, auto-reset events version:
下面是更有效的自动重置事件版本:
class CEventSyncQueue
{
public:
CEventSyncQueue(int nCapacity = -1);
virtual ~CEventSyncQueue();
virtual void Put(void* ptr);
virtual void* Get();
protected:
int m_nCapacity;
CPtrList m_list;
CRITICAL_SECTION m_lock;
HANDLE m_queue_not_empty;
HANDLE m_queue_not_full;
};
CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
m_nCapacity = nCapacity;
::InitializeCriticalSection(&m_lock);
m_queue_not_empty = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_queue_not_full = ::CreateEvent(NULL, FALSE, TRUE, NULL);
}
CEventSyncQueue::~CEventSyncQueue()
{
m_list.RemoveAll();
::CloseHandle(m_queue_not_empty);
::CloseHandle(m_queue_not_full);
::DeleteCriticalSection(&m_lock);
}
void CEventSyncQueue::Put(void* ptr)
{
if (m_nCapacity <= 0)
{
::EnterCriticalSection(&m_lock);
m_list.AddTail(ptr);
SetEvent(m_queue_not_empty);
::LeaveCriticalSection(&m_lock);
return;
}
bool done = false;
while (!done)
{
// If the queue is full, we must wait until it isn't.
if(::WaitForSingleObject(m_queue_not_full, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
// However, under some (rare) conditions we'll get here and find
// the queue is already full again, so be prepared to loop.
::EnterCriticalSection(&m_lock);
if (m_list.GetCount() < m_nCapacity)
{
m_list.AddTail(ptr);
done = true;
SetEvent(m_queue_not_empty);
if (m_list.GetCount() < m_nCapacity)
{
SetEvent(m_queue_not_full);
}
}
::LeaveCriticalSection(&m_lock);
}
}
void* CEventSyncQueue::Get()
{
void *result = nullptr;
while (result == nullptr)
{
// If the queue is empty, we must wait until it isn't.
if(::WaitForSingleObject(m_queue_not_empty, INFINITE) != WAIT_OBJECT_0)
{
ASSERT(FALSE);
}
// However, under some (rare) conditions we'll get here and find
// the queue is already empty again, so be prepared to loop.
::EnterCriticalSection(&m_lock);
if (!m_list.IsEmpty())
{
result = m_list.RemoveHead();
ASSERT(result != nullptr);
// The queue shouldn't be full at this point!
if (m_nCapacity <= 0) ASSERT(m_list.GetCount() < m_nCapacity);
SetEvent(m_queue_not_full);
if (!m_list.IsEmpty())
{
SetEvent(m_queue_not_empty);
}
}
::LeaveCriticalSection(&m_lock);
}
}
#2
1
It's trivial to implement a thread-safe queue in Windows. I've done it in Delphi, C++, BCB etc.
在Windows中实现线程安全队列很简单。我用Delphi, c++, BCB等做过。
Why do you think that a condition variable is required? How do you think that Windows Message Queues work?
为什么你认为条件变量是必需的?您认为Windows消息队列如何工作?
Events are the wrong primitive to use for P-C queues. Easiest/clearest way is to use a semaphore.
事件是用于P-C队列的错误原语。最简单/最清楚的方法是使用信号量。
Simple unbounded producer-consumer queue.
简单的队列的生产国和消费国。
template <typename T> class PCSqueue{
CRITICAL_SECTION access;
deque<T> *objectQueue;
HANDLE queueSema;
public:
PCSqueue(){
objectQueue=new deque<T>;
InitializeCriticalSection(&access);
queueSema=CreateSemaphore(NULL,0,MAXINT,NULL);
};
void push(T ref){
EnterCriticalSection(&access);
objectQueue->push_front(ref);
LeaveCriticalSection(&access);
ReleaseSemaphore(queueSema,1,NULL);
};
bool pop(T *ref,DWORD timeout){
if (WAIT_OBJECT_0==WaitForSingleObject(queueSema,timeout)) {
EnterCriticalSection(&access);
*ref=objectQueue->back();
objectQueue->pop_back();
LeaveCriticalSection(&access);
return(true);
}
else
return(false);
};
};
Edit - a bounded queue would not be much more difficult - you need another semaphre to count the empty spaces. I don't use bounded queues, but I'm sure it would be OK - a bounded queue with 2 semaphores and a mutex/CS is s standard pattern.
编辑-一个有界的队列不会更困难—你需要另一个信号来计数空的空间。我不使用有界队列,但我确信它是可以的——有界队列有两个信号量和互斥/CS是s标准模式。
Edit: Use PostMessage() or PostThreadMessage() API calls - they are explicitly declared to be safe from the 'waveOutProc' callback. MSDN says that calling 'other wave functions' will cause deadlock - semaphore calls are not in that set and I would be very surprised indeed if SetEvent() was allowed but ReleaseSemaphore() was not. In fact, I would be surprised if SetEvent() was allowed while ReleaseSemaphore() was not ANYWHERE in Windows.
编辑:使用PostMessage()或PostThreadMessage() API调用——它们被显式地声明为对“waveOutProc”回调是安全的。MSDN说调用“其他wave函数”将会导致死锁——信号量调用不在那个集合中,如果SetEvent()被允许而ReleaseSemaphore()不允许的话,我会非常惊讶。事实上,如果在Windows的任何地方都不允许使用SetEvent(),那么我将会感到惊讶。
#3
0
condition variable? Do you mean Interlocked* functions? These have been around for a long time - I used them in Windows 2000. you can use them to build a concurrency system, but you'll still have to do a bit of work yourself.
条件变量?你是说连锁的*函数吗?它们已经存在很长一段时间了——我在Windows 2000中使用过它们。您可以使用它们来构建并发系统,但是您仍然需要自己做一些工作。
Alternatively, try OpenMP. To use this you'll need Visual Studio 2008 or greater.
或者,试着OpenMP。要使用它,您需要Visual Studio 2008或更高版本。