I'll preface this by saying that I'm delving into multithreading for the first time. Despite a lot of reading on concurrency and synchronization, I'm not readily seeing a solution for the requirements I've been given.
我先说这是我第一次钻研多线程。尽管在并发和同步方面有很多阅读,但我并没有很容易地看到我已经给出的要求的解决方案。
Using C++11 and Boost, I'm trying to figure out how to send data from a worker thread to a main thread. The worker thread is spawned at the start of the application and continuously monitors a lock free queue. Objects populate this queue at various intervals. This part is working.
使用C ++ 11和Boost,我试图弄清楚如何将数据从工作线程发送到主线程。工作线程在应用程序启动时生成,并持续监视无锁队列。对象以不同的间隔填充此队列。这部分正在运作。
Once the data is available, it needs to be processed by the main thread since another signal will be sent to the rest of the application which cannot be on a worker thread. This is what I'm having trouble with.
一旦数据可用,它就需要由主线程处理,因为另一个信号将被发送到不能在工作线程上的应用程序的其余部分。这就是我遇到的麻烦。
If I have to block the main thread through a mutex or a condition variable until the worker thread is done, how will that improve responsiveness? I might as well just stay with a single thread so I have access to the data. I must be missing something here.
如果我必须通过互斥锁或条件变量阻塞主线程直到工作线程完成,那么如何提高响应能力呢?我不妨留下一个线程,所以我可以访问数据。我必须在这里遗漏一些东西。
I have posted a couple questions, thinking that Boost::Asio was the way to go. There is an example of how signals and data can be sent between threads, but as the responses indicate, things get quickly overly-complicated and it's not working perfectly:
我发布了几个问题,认为Boost :: Asio是要走的路。有一个例子可以说明如何在线程之间发送信号和数据,但正如响应所示,事情变得过于复杂并且不能完美地运行:
How to connect signal to boost::asio::io_service when posting work on different thread?
如何在不同线程上发布工作时将信号连接到boost :: asio :: io_service?
Boost::Asio with Main/Workers threads - Can I start event loop before posting work?
使用Main / Workers线程增强:: Asio - 我可以在发布工作之前启动事件循环吗?
After speaking with some colleagues, it was suggested that two queues be used -- one input, one output. This would be in shared space and the output queue would be populated by the worker thread. The worker thread is always going but there would need to be a Timer, probably at the application level, that would force the main thread to examine the output queue to see if there were any pending tasks.
在与一些同事交谈后,建议使用两个队列 - 一个输入,一个输出。这将在共享空间中,输出队列将由工作线程填充。工作线程总是在运行,但需要有一个Timer,可能是在应用程序级别,它会强制主线程检查输出队列以查看是否有任何挂起的任务。
Any ideas on where I should direct my attention? Are there any techniques or strategies that might work for what I'm trying to do? I'll be looking at Timers next.
关于我应该引起注意的任何想法?是否有任何技术或策略可以用于我正在尝试做的事情?我接下来会看着计时器。
Thanks.
Edit: This is production code for a plugin system that post-processes simulation results. We are using C++11 first wherever possible, followed by Boost. We are using Boost's lockfree::queue. The application is doing what we want on a single thread but now we are trying to optimize where we see that there are performance issues (in this case, a calculation happening through another library). The main thread has a lot of responsibilities, including database access, which is why I want to limit what the worker thread actually does.
编辑:这是一个后处理模拟结果的插件系统的生产代码。我们首先使用C ++ 11,然后是Boost。我们正在使用Boost的lockfree :: queue。应用程序在单个线程上执行我们想要的操作,但现在我们正在尝试优化我们看到存在性能问题的位置(在这种情况下,通过另一个库进行计算)。主线程有很多职责,包括数据库访问,这就是我想限制工作线程实际执行的操作的原因。
Update: I have already been successful in using std::thread to launch a worker thread that examines a Boost lock::free queue and processes tasks placed it in. It's step 5 in @Pressacco's response that I'm having trouble with. Any examples returning a value to the main thread when a worker thread is finished and informing the main thread, rather than simply waiting for the worker to finish?
更新:我已经成功地使用std :: thread来启动一个工作线程来检查一个Boost lock :: free队列并处理放入它的任务。这是@Pressacco的回复中的第5步,我遇到了麻烦。任何一个示例在工作线程完成时将值返回给主线程并通知主线程,而不是简单地等待工作者完成?
2 个解决方案
#1
If your objective is develop the solution from scratch (using native threads, queues, etc.):
如果您的目标是从头开发解决方案(使用本机线程,队列等):
- create a thread save queue queue (Mutex/CriticalSection around add/remove)
- create a counting semaphore that is associated with the queue
- have one or more worker threads wait on the counting semaphore (i.e. the thread will block)
- the semaphore is more efficient than having the thread constantly poll the queue
信号量比线程不断轮询队列更有效
- as messages/jobs are added to the queue, increment the semaphore
- a thread will wake up
- the thread should remove one message
一个线程会醒来
线程应删除一条消息
- if a result needs to be returned...
- setup another:
Queue
+Semaphore
+WorkerThread
s
设置另一个:Queue + Semaphore + WorkerThreads
- setup another:
创建一个线程保存队列队列(Mutex / CriticalSection周围添加/删除)
创建与队列关联的计数信号量
有一个或多个工作线程等待计数信号量(即线程将阻塞)信号量比线程不断轮询队列更有效
当消息/作业被添加到队列中时,增加信号量线程将唤醒线程应删除一条消息
如果需要返回结果...设置另一个:Queue + Semaphore + WorkerThreads
ADDITIONAL NOTES
If you decide to implement a thread safe queue from scratch, take a look at:
如果您决定从头开始实现线程安全队列,请查看:
- Synchronization between threads using Critical Section
使用Critical Section在线程之间进行同步
With that said, I would take another look at BOOST. I haven't used the library, but from what I hear it will most likely contain some relevant data structures (e.g. a thread safe queue).
话虽如此,我会再看看BOOST。我没有使用过库,但据我所知,它很可能包含一些相关的数据结构(例如线程安全队列)。
My favorite quote from the MSDN:
我最喜欢的MSDN引用:
"When you use multithreading of any sort, you potentially expose yourself to very serious and complex bugs"
“当你使用任何类型的多线程时,你可能会暴露自己非常严重和复杂的错误”
SIDEBAR
Since you are looking at concurrent programming for the first time, you may wish to consider:
由于您是第一次查看并发编程,因此您可能需要考虑:
- Is your objective to build production worthy code , or is this simply a learning exercise?
- production? consider us existing proven libraries
- learning? consider writing the code from scratch
生产?考虑我们现有的成熟图书馆
学习?考虑从头开始编写代码
- Consider using a thread pool with an asynchronous callback instead of native threads.
- more threads != better
- Are threads really needed?
- Follow the KISS principle.
您的目标是建立生产有价值的代码,还是仅仅是一个学习练习?生产?考虑我们现有的成熟图书馆学习考虑从头开始编写代码
考虑使用带有异步回调的线程池而不是本机线程。
更多线程!=更好
真的需要线程吗?
遵循KISS原则。
#2
The feedback above led me in the right direction for what I needed. The solution was definitely simpler than having to use signals/slots or Boost::Asio as I had previously attempted. I have two lock-free queues, one for input (on a worker thread) and one for output (on the main thread, populated by the worker thread). I use a timer to schedule when the output queue is processed. The code is below; perhaps it is of use to somebody:
上面的反馈使我朝着正确的方向前进,满足了我的需求。该解决方案绝对比我之前尝试使用信号/插槽或Boost :: Asio更简单。我有两个无锁队列,一个用于输入(在工作线程上)和一个用于输出(在主线程上,由工作线程填充)。我使用计时器来安排何时处理输出队列。代码如下;也许这对某人有用:
//Task.h
#include <iostream>
#include <thread>
class Task
{
public:
Task(bool shutdown = false) : _shutdown(shutdown) {};
virtual ~Task() {};
bool IsShutdownRequest() { return _shutdown; }
virtual int Execute() = 0;
private:
bool _shutdown;
};
class ShutdownTask : public Task
{
public:
ShutdownTask() : Task(true) {}
virtual int Execute() { return -1; }
};
class TimeSeriesTask : public Task
{
public:
TimeSeriesTask(int value) : _value(value) {};
virtual int Execute()
{
std::cout << "Calculating on thread " << std::this_thread::get_id() << std::endl;
return _value * 2;
}
private:
int _value;
};
// Main.cpp : Defines the entry point for the console application.
#include "stdafx.h"
#include "afxwin.h"
#include <boost/lockfree/spsc_queue.hpp>
#include "Task.h"
static UINT_PTR ProcessDataCheckTimerID = 0;
static const int ProcessDataCheckPeriodInMilliseconds = 100;
class Manager
{
public:
Manager()
{
//Worker Thread with application lifetime that processes a lock free queue
_workerThread = std::thread(&Manager::ProcessInputData, this);
};
virtual ~Manager()
{
_workerThread.join();
};
void QueueData(int x)
{
if (x > 0)
{
_inputQueue.push(std::make_shared<TimeSeriesTask>(x));
}
else
{
_inputQueue.push(std::make_shared<ShutdownTask>());
}
}
void ProcessOutputData()
{
//process output data on the Main Thread
_outputQueue.consume_one([&](int value)
{
if (value < 0)
{
PostQuitMessage(WM_QUIT);
}
else
{
int result = value - 1;
std::cout << "Final result is " << result << " on thread " << std::this_thread::get_id() << std::endl;
}
});
}
private:
void ProcessInputData()
{
bool shutdown = false;
//Worker Thread processes input data indefinitely
do
{
_inputQueue.consume_one([&](std::shared_ptr<Task> task)
{
std::cout << "Getting element from input queue on thread " << std::this_thread::get_id() << std::endl;
if (task->IsShutdownRequest()) { shutdown = true; }
int result = task->Execute();
_outputQueue.push(result);
});
} while (shutdown == false);
}
std::thread _workerThread;
boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> _inputQueue;
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024>> _outputQueue;
};
std::shared_ptr<Manager> g_pMgr;
//timer to force Main Thread to process Manager's output queue
void CALLBACK TimerCallback(HWND hWnd, UINT nMsg, UINT nIDEvent, DWORD dwTime)
{
if (nIDEvent == ProcessDataCheckTimerID)
{
KillTimer(NULL, ProcessDataCheckPeriodInMilliseconds);
ProcessDataCheckTimerID = 0;
//call function to process data
g_pMgr->ProcessOutputData();
//reset timer
ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
}
}
int main()
{
std::cout << "Main thread is " << std::this_thread::get_id() << std::endl;
g_pMgr = std::make_shared<Manager>();
ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
//queue up some dummy data
for (int i = 1; i <= 10; i++)
{
g_pMgr->QueueData(i);
}
//queue a shutdown request
g_pMgr->QueueData(-1);
//fake the application's message loop
MSG msg;
bool shutdown = false;
while (shutdown == false)
{
if (GetMessage(&msg, NULL, 0, 0))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
else
{
shutdown = true;
}
}
return 0;
}
#1
If your objective is develop the solution from scratch (using native threads, queues, etc.):
如果您的目标是从头开发解决方案(使用本机线程,队列等):
- create a thread save queue queue (Mutex/CriticalSection around add/remove)
- create a counting semaphore that is associated with the queue
- have one or more worker threads wait on the counting semaphore (i.e. the thread will block)
- the semaphore is more efficient than having the thread constantly poll the queue
信号量比线程不断轮询队列更有效
- as messages/jobs are added to the queue, increment the semaphore
- a thread will wake up
- the thread should remove one message
一个线程会醒来
线程应删除一条消息
- if a result needs to be returned...
- setup another:
Queue
+Semaphore
+WorkerThread
s
设置另一个:Queue + Semaphore + WorkerThreads
- setup another:
创建一个线程保存队列队列(Mutex / CriticalSection周围添加/删除)
创建与队列关联的计数信号量
有一个或多个工作线程等待计数信号量(即线程将阻塞)信号量比线程不断轮询队列更有效
当消息/作业被添加到队列中时,增加信号量线程将唤醒线程应删除一条消息
如果需要返回结果...设置另一个:Queue + Semaphore + WorkerThreads
ADDITIONAL NOTES
If you decide to implement a thread safe queue from scratch, take a look at:
如果您决定从头开始实现线程安全队列,请查看:
- Synchronization between threads using Critical Section
使用Critical Section在线程之间进行同步
With that said, I would take another look at BOOST. I haven't used the library, but from what I hear it will most likely contain some relevant data structures (e.g. a thread safe queue).
话虽如此,我会再看看BOOST。我没有使用过库,但据我所知,它很可能包含一些相关的数据结构(例如线程安全队列)。
My favorite quote from the MSDN:
我最喜欢的MSDN引用:
"When you use multithreading of any sort, you potentially expose yourself to very serious and complex bugs"
“当你使用任何类型的多线程时,你可能会暴露自己非常严重和复杂的错误”
SIDEBAR
Since you are looking at concurrent programming for the first time, you may wish to consider:
由于您是第一次查看并发编程,因此您可能需要考虑:
- Is your objective to build production worthy code , or is this simply a learning exercise?
- production? consider us existing proven libraries
- learning? consider writing the code from scratch
生产?考虑我们现有的成熟图书馆
学习?考虑从头开始编写代码
- Consider using a thread pool with an asynchronous callback instead of native threads.
- more threads != better
- Are threads really needed?
- Follow the KISS principle.
您的目标是建立生产有价值的代码,还是仅仅是一个学习练习?生产?考虑我们现有的成熟图书馆学习考虑从头开始编写代码
考虑使用带有异步回调的线程池而不是本机线程。
更多线程!=更好
真的需要线程吗?
遵循KISS原则。
#2
The feedback above led me in the right direction for what I needed. The solution was definitely simpler than having to use signals/slots or Boost::Asio as I had previously attempted. I have two lock-free queues, one for input (on a worker thread) and one for output (on the main thread, populated by the worker thread). I use a timer to schedule when the output queue is processed. The code is below; perhaps it is of use to somebody:
上面的反馈使我朝着正确的方向前进,满足了我的需求。该解决方案绝对比我之前尝试使用信号/插槽或Boost :: Asio更简单。我有两个无锁队列,一个用于输入(在工作线程上)和一个用于输出(在主线程上,由工作线程填充)。我使用计时器来安排何时处理输出队列。代码如下;也许这对某人有用:
//Task.h
#include <iostream>
#include <thread>
class Task
{
public:
Task(bool shutdown = false) : _shutdown(shutdown) {};
virtual ~Task() {};
bool IsShutdownRequest() { return _shutdown; }
virtual int Execute() = 0;
private:
bool _shutdown;
};
class ShutdownTask : public Task
{
public:
ShutdownTask() : Task(true) {}
virtual int Execute() { return -1; }
};
class TimeSeriesTask : public Task
{
public:
TimeSeriesTask(int value) : _value(value) {};
virtual int Execute()
{
std::cout << "Calculating on thread " << std::this_thread::get_id() << std::endl;
return _value * 2;
}
private:
int _value;
};
// Main.cpp : Defines the entry point for the console application.
#include "stdafx.h"
#include "afxwin.h"
#include <boost/lockfree/spsc_queue.hpp>
#include "Task.h"
static UINT_PTR ProcessDataCheckTimerID = 0;
static const int ProcessDataCheckPeriodInMilliseconds = 100;
class Manager
{
public:
Manager()
{
//Worker Thread with application lifetime that processes a lock free queue
_workerThread = std::thread(&Manager::ProcessInputData, this);
};
virtual ~Manager()
{
_workerThread.join();
};
void QueueData(int x)
{
if (x > 0)
{
_inputQueue.push(std::make_shared<TimeSeriesTask>(x));
}
else
{
_inputQueue.push(std::make_shared<ShutdownTask>());
}
}
void ProcessOutputData()
{
//process output data on the Main Thread
_outputQueue.consume_one([&](int value)
{
if (value < 0)
{
PostQuitMessage(WM_QUIT);
}
else
{
int result = value - 1;
std::cout << "Final result is " << result << " on thread " << std::this_thread::get_id() << std::endl;
}
});
}
private:
void ProcessInputData()
{
bool shutdown = false;
//Worker Thread processes input data indefinitely
do
{
_inputQueue.consume_one([&](std::shared_ptr<Task> task)
{
std::cout << "Getting element from input queue on thread " << std::this_thread::get_id() << std::endl;
if (task->IsShutdownRequest()) { shutdown = true; }
int result = task->Execute();
_outputQueue.push(result);
});
} while (shutdown == false);
}
std::thread _workerThread;
boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> _inputQueue;
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024>> _outputQueue;
};
std::shared_ptr<Manager> g_pMgr;
//timer to force Main Thread to process Manager's output queue
void CALLBACK TimerCallback(HWND hWnd, UINT nMsg, UINT nIDEvent, DWORD dwTime)
{
if (nIDEvent == ProcessDataCheckTimerID)
{
KillTimer(NULL, ProcessDataCheckPeriodInMilliseconds);
ProcessDataCheckTimerID = 0;
//call function to process data
g_pMgr->ProcessOutputData();
//reset timer
ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
}
}
int main()
{
std::cout << "Main thread is " << std::this_thread::get_id() << std::endl;
g_pMgr = std::make_shared<Manager>();
ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
//queue up some dummy data
for (int i = 1; i <= 10; i++)
{
g_pMgr->QueueData(i);
}
//queue a shutdown request
g_pMgr->QueueData(-1);
//fake the application's message loop
MSG msg;
bool shutdown = false;
while (shutdown == false)
{
if (GetMessage(&msg, NULL, 0, 0))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
else
{
shutdown = true;
}
}
return 0;
}