To give some background information, I am processing a saved file, and after using a regular expression to split the file into it's component objects, I then need to process the object's data based on which type of object it is.
为了给出一些背景信息,我正在处理一个保存的文件,在使用正则表达式将文件拆分成它的组件对象之后,我需要根据它的对象类型处理对象的数据。
My current thought is to use parallelism to get a little bit of a performance gain as loading each object is independent of each other. So I was going to define a LoadObject
function accepting a std::string
for each type of object I'm going to be handling and then calling std::async
as follows:
我目前的想法是使用并行性来获得一点性能提升,因为加载每个对象是相互独立的。所以我要定义一个LoadObject函数,为我将要处理的每种类型的对象接受一个std :: string,然后调用std :: async,如下所示:
void LoadFromFile( const std::string& szFileName )
{
static const std::regex regexObject( "=== ([^=]+) ===\\n((?:.|\\n)*)\\n=== END \\1 ===", std::regex_constants::ECMAScript | std::regex_constants::optimize );
std::ifstream inFile( szFileName );
inFile.exceptions( std::ifstream::failbit | std::ifstream::badbit );
std::string szFileData( (std::istreambuf_iterator<char>(inFile)), (std::istreambuf_iterator<char>()) );
inFile.close();
std::vector<std::future<void>> vecFutures;
for( std::sregex_iterator itObject( szFileData.cbegin(), szFileData.cend(), regexObject ), end; itObject != end; ++itObject )
{
// Determine what type of object we're loading:
if( (*itObject)[1] == "Type1" )
{
vecFutures.emplace_back( std::async( LoadType1, (*itObject)[2].str() ) );
}
else if( (*itObject)[1] == "Type2" )
{
vecFutures.emplace_back( std::async( LoadType2, (*itObject)[2].str() ) );
}
else
{
throw std::runtime_error( "Unexpected type encountered whilst reading data file." );
}
}
// Make sure all our tasks completed:
for( auto& future : vecFutures )
{
future.get();
}
}
Note that there will be more than 2 types in the application (this was just a short example) and potentially thousands of objects in the file to be read.
请注意,应用程序中将有两种以上的类型(这只是一个简短的示例),并且可能需要读取文件中的数千个对象。
I am aware that creating too many threads is often a bad thing for performance when it exceeds the maximum hardware concurrency due to context switches, but if my memory serves me correctly the C++ runtime is supposed to monitor the number of threads created and schedule std::async
appropriately (I believe in Microsoft's case their ConcRT library is responsible for this?), so the above code may still result in a performance improvement?
我知道,由于上下文切换超过最大硬件并发性,创建太多线程通常对性能不利,但如果我的内存正确地为我服务,那么C ++运行时应该监视创建的线程数并安排std: :async恰当(我相信在微软的情况下他们的ConcRT库负责这个?),所以上面的代码仍然可以带来性能提升?
Thanks in advance!
提前致谢!
1 个解决方案
#1
14
the C++ runtime is supposed to monitor the number of threads created and schedule std::async appropriately
C ++运行时应该监视创建的线程数并适当地调度std :: async
No. If the asynchronous tasks are in fact run asynchronously (rather than deferred) then all that's required is that they are run as if on a new thread. It is perfectly valid for a new thread to be created and started for every task, without any regard for the hardware's limited capacity for parallelism.
不可以。如果异步任务实际上是异步运行(而不是延迟),那么所需要的只是在新线程上运行它们。对于为每个任务创建和启动的新线程完全有效,而不考虑硬件的并行容量有限。
There's a note:
有一个说明:
[ Note: If this policy is specified together with other policies, such as when using a policy value of launch::async | launch::deferred, implementations should defer invocation or the selection of the policy when no more concurrency can be effectively exploited. —end note ]
[注意:如果此策略与其他策略一起指定,例如使用launch :: async |的策略值时launch :: deferred,当不再能够有效利用并发时,实现应该推迟调用或选择策略。 - 尾注]
However, this is non-normative and in any case it indicates that once no more concurrency can be exploited the tasks may become deferred, and therefore get executed when someone waits on the result, rather than still being asynchronous and running immediately after one of the previous asynchronous tasks is finished, as would be desirable for maximum parallelism.
但是,这是非规范性的,并且无论如何它表明一旦不再可以利用并发性,任务可能会被延迟,因此当有人等待结果时执行,而不是仍然是异步并且在其中一个之后立即运行完成先前的异步任务,这对于最大并行性是期望的。
That is, if we have 10 long running tasks and the implementation can only execute 4 in parallel, then the first 4 will be asynchronous and then the last 6 may be deferred. Waiting on the futures in sequence would execute the deferred tasks on a single thread in sequence, eliminating parallel execution for those tasks.
也就是说,如果我们有10个长时间运行的任务,并且实现只能并行执行4个,那么前4个将是异步的,然后最后6个可能会被延迟。按顺序等待期货将按顺序在单个线程上执行延期任务,从而消除这些任务的并行执行。
The note does also say that instead of deferring invocation, the selection of the policy may be deferred. That is, the function may still run asynchronously but that decision may be delayed, say, until one of the earlier tasks completes, freeing up a core for a new task. But again, this is not required, the note is non-normative, and as far as I know Microsoft's implementation is the only one that behaves this way. When I looked at another implementation, libc++, it simply ignores this note altogether so that using either std::launch::async
or std::launch::any
policies result in asynchronous execution on a new thread.
该说明还指出,可以推迟选择策略,而不是推迟调用。也就是说,该函数可能仍然异步运行,但该决定可能会延迟,例如,直到其中一个早期任务完成,从而为新任务释放核心。但同样,这不是必需的,该注释是非规范性的,据我所知,微软的实现是唯一一种行为方式。当我查看另一个实现libc ++时,它完全忽略了这个注释,因此使用std :: launch :: async或std :: launch :: any策略会导致在新线程上异步执行。
(I believe in Microsoft's case their ConcRT library is responsible for this?)
(我相信在微软的情况下,他们的ConcRT库负责这个?)
Microsoft's implementation does indeed behave as you describe, however this is not required and a portable program cannot rely on that behavior.
Microsoft的实现确实与您描述的一样,但这不是必需的,并且可移植程序不能依赖于该行为。
One way to portably limit how many threads are actually running is to use something like a semaphore:
可移植地限制实际运行的线程数的一种方法是使用类似信号量的东西:
#include <future>
#include <mutex>
#include <cstdio>
// a semaphore class
//
// All threads can wait on this object. When a waiting thread
// is woken up, it does its work and then notifies another waiting thread.
// In this way only n threads will be be doing work at any time.
//
class Semaphore {
private:
std::mutex m;
std::condition_variable cv;
unsigned int count;
public:
Semaphore(int n) : count(n) {}
void notify() {
std::unique_lock<std::mutex> l(m);
++count;
cv.notify_one();
}
void wait() {
std::unique_lock<std::mutex> l(m);
cv.wait(l, [this]{ return count!=0; });
--count;
}
};
// an RAII class to handle waiting and notifying the next thread
// Work is done between when the object is created and destroyed
class Semaphore_waiter_notifier {
Semaphore &s;
public:
Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); }
~Semaphore_waiter_notifier() { s.notify(); }
};
// some inefficient work for our threads to do
int fib(int n) {
if (n<2) return n;
return fib(n-1) + fib(n-2);
}
// for_each algorithm for iterating over a container but also
// making an integer index available.
//
// f is called like f(index, element)
template<typename Container, typename F>
F for_each(Container &c, F f) {
Container::size_type i = 0;
for (auto &e : c)
f(i++, e);
return f;
}
// global semaphore so that lambdas don't have to capture it
Semaphore thread_limiter(4);
int main() {
std::vector<int> input(100);
for_each(input, [](int i, int &e) { e = (i%10) + 35; });
std::vector<std::future<int>> output;
for_each(input, [&output](int i, int e) {
output.push_back(std::async(std::launch::async, [] (int task, int n) -> int {
Semaphore_waiter_notifier w(thread_limiter);
std::printf("Starting task %d\n", task);
int res = fib(n);
std::printf("\t\t\t\t\t\tTask %d finished\n", task);
return res;
}, i, e));
});
for_each(output, [](int i, std::future<int> &e) {
std::printf("\t\t\tWaiting on task %d\n", i);
int res = e.get();
std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res);
});
}
#1
14
the C++ runtime is supposed to monitor the number of threads created and schedule std::async appropriately
C ++运行时应该监视创建的线程数并适当地调度std :: async
No. If the asynchronous tasks are in fact run asynchronously (rather than deferred) then all that's required is that they are run as if on a new thread. It is perfectly valid for a new thread to be created and started for every task, without any regard for the hardware's limited capacity for parallelism.
不可以。如果异步任务实际上是异步运行(而不是延迟),那么所需要的只是在新线程上运行它们。对于为每个任务创建和启动的新线程完全有效,而不考虑硬件的并行容量有限。
There's a note:
有一个说明:
[ Note: If this policy is specified together with other policies, such as when using a policy value of launch::async | launch::deferred, implementations should defer invocation or the selection of the policy when no more concurrency can be effectively exploited. —end note ]
[注意:如果此策略与其他策略一起指定,例如使用launch :: async |的策略值时launch :: deferred,当不再能够有效利用并发时,实现应该推迟调用或选择策略。 - 尾注]
However, this is non-normative and in any case it indicates that once no more concurrency can be exploited the tasks may become deferred, and therefore get executed when someone waits on the result, rather than still being asynchronous and running immediately after one of the previous asynchronous tasks is finished, as would be desirable for maximum parallelism.
但是,这是非规范性的,并且无论如何它表明一旦不再可以利用并发性,任务可能会被延迟,因此当有人等待结果时执行,而不是仍然是异步并且在其中一个之后立即运行完成先前的异步任务,这对于最大并行性是期望的。
That is, if we have 10 long running tasks and the implementation can only execute 4 in parallel, then the first 4 will be asynchronous and then the last 6 may be deferred. Waiting on the futures in sequence would execute the deferred tasks on a single thread in sequence, eliminating parallel execution for those tasks.
也就是说,如果我们有10个长时间运行的任务,并且实现只能并行执行4个,那么前4个将是异步的,然后最后6个可能会被延迟。按顺序等待期货将按顺序在单个线程上执行延期任务,从而消除这些任务的并行执行。
The note does also say that instead of deferring invocation, the selection of the policy may be deferred. That is, the function may still run asynchronously but that decision may be delayed, say, until one of the earlier tasks completes, freeing up a core for a new task. But again, this is not required, the note is non-normative, and as far as I know Microsoft's implementation is the only one that behaves this way. When I looked at another implementation, libc++, it simply ignores this note altogether so that using either std::launch::async
or std::launch::any
policies result in asynchronous execution on a new thread.
该说明还指出,可以推迟选择策略,而不是推迟调用。也就是说,该函数可能仍然异步运行,但该决定可能会延迟,例如,直到其中一个早期任务完成,从而为新任务释放核心。但同样,这不是必需的,该注释是非规范性的,据我所知,微软的实现是唯一一种行为方式。当我查看另一个实现libc ++时,它完全忽略了这个注释,因此使用std :: launch :: async或std :: launch :: any策略会导致在新线程上异步执行。
(I believe in Microsoft's case their ConcRT library is responsible for this?)
(我相信在微软的情况下,他们的ConcRT库负责这个?)
Microsoft's implementation does indeed behave as you describe, however this is not required and a portable program cannot rely on that behavior.
Microsoft的实现确实与您描述的一样,但这不是必需的,并且可移植程序不能依赖于该行为。
One way to portably limit how many threads are actually running is to use something like a semaphore:
可移植地限制实际运行的线程数的一种方法是使用类似信号量的东西:
#include <future>
#include <mutex>
#include <cstdio>
// a semaphore class
//
// All threads can wait on this object. When a waiting thread
// is woken up, it does its work and then notifies another waiting thread.
// In this way only n threads will be be doing work at any time.
//
class Semaphore {
private:
std::mutex m;
std::condition_variable cv;
unsigned int count;
public:
Semaphore(int n) : count(n) {}
void notify() {
std::unique_lock<std::mutex> l(m);
++count;
cv.notify_one();
}
void wait() {
std::unique_lock<std::mutex> l(m);
cv.wait(l, [this]{ return count!=0; });
--count;
}
};
// an RAII class to handle waiting and notifying the next thread
// Work is done between when the object is created and destroyed
class Semaphore_waiter_notifier {
Semaphore &s;
public:
Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); }
~Semaphore_waiter_notifier() { s.notify(); }
};
// some inefficient work for our threads to do
int fib(int n) {
if (n<2) return n;
return fib(n-1) + fib(n-2);
}
// for_each algorithm for iterating over a container but also
// making an integer index available.
//
// f is called like f(index, element)
template<typename Container, typename F>
F for_each(Container &c, F f) {
Container::size_type i = 0;
for (auto &e : c)
f(i++, e);
return f;
}
// global semaphore so that lambdas don't have to capture it
Semaphore thread_limiter(4);
int main() {
std::vector<int> input(100);
for_each(input, [](int i, int &e) { e = (i%10) + 35; });
std::vector<std::future<int>> output;
for_each(input, [&output](int i, int e) {
output.push_back(std::async(std::launch::async, [] (int task, int n) -> int {
Semaphore_waiter_notifier w(thread_limiter);
std::printf("Starting task %d\n", task);
int res = fib(n);
std::printf("\t\t\t\t\t\tTask %d finished\n", task);
return res;
}, i, e));
});
for_each(output, [](int i, std::future<int> &e) {
std::printf("\t\t\tWaiting on task %d\n", i);
int res = e.get();
std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res);
});
}