This is the code to create a thread_group and execute all threads in parallel:
这是创建thread_group并并行执行所有线程的代码:
boost::thread_group group;
for (int i = 0; i < 15; ++i)
group.create_thread(aFunctionToExecute);
group.join_all();
This code will execute all threads at once. What I want to do is to execute them all but 4 maximum in parallel. When on is terminated, another one is executed until there are no more to execute.
此代码将立即执行所有线程。我想要做的是并行执行除4最大值以外的所有操作。当on终止时,执行另一个,直到不再执行为止。
4 个解决方案
#1
3
Another, more efficient solution would be to have each thread callback to the primary thread when they are finished, and the handler on the primary thread could launch a new thread each time. This prevents the repetitive calls to timed_join, as the primary thread won't do anything until the callback is triggered.
另一个更有效的解决方案是让每个线程在完成后回调到主线程,并且主线程上的处理程序每次都可以启动一个新线程。这可以防止重复调用timed_join,因为主要线程在触发回调之前不会执行任何操作。
#2
0
I have something like this:
我有这样的事情:
boost::mutex mutex_;
boost::condition_variable condition_;
const size_t throttle_;
size_t size_;
bool wait_;
template <typename Env, class F>
void eval_(const Env &env, const F &f) {
{
boost::unique_lock<boost::mutex> lock(mutex_);
size_ = std::min(size_+1, throttle_);
while (throttle_ <= size_) condition_.wait(lock);
}
f.eval(env);
{
boost::lock_guard<boost::mutex> lock(mutex_);
--size_;
}
condition_.notify_one();
}
#3
0
I think you are looking for a thread_pool implementation, which is available here.
我想你正在寻找一个thread_pool实现,可以在这里找到。
Additionally I have noticed that if you create a vector of std::future and store futures of many std::async_tasks in it and you do not have any blocking code in the function passed to the thread, VS2013 (atleast from what I can confirm) will launch exactly the appropriate no of threads your machine can handle. It reuses the threads once created.
另外我注意到如果你创建一个std :: future的向量并在其中存储许多std :: async_tasks的未来,并且在传递给该线程的函数中没有任何阻塞代码,VS2013(至少我可以确认) )将准确启动您的机器可以处理的相应的线程数。它重用一次创建的线程。
#4
0
I created my own simplified interface of boost::thread_group
to do this job:
我创建了自己的boost :: thread_group简化接口来完成这项工作:
class ThreadGroup : public boost::noncopyable
{
private:
boost::thread_group group;
std::size_t maxSize;
float sleepStart;
float sleepCoef;
float sleepMax;
std::set<boost::thread*> running;
public:
ThreadGroup(std::size_t max_size = 0,
float max_sleeping_time = 1.0f,
float sleeping_time_coef = 1.5f,
float sleeping_time_start = 0.001f) :
boost::noncopyable(),
group(),
maxSize(max_size),
sleepStart(sleeping_time_start),
sleepCoef(sleeping_time_coef),
sleepMax(max_sleeping_time),
running()
{
if(max_size == 0)
this->maxSize = (std::size_t)std::max(boost::thread::hardware_concurrency(), 1u);
assert(max_sleeping_time >= sleeping_time_start);
assert(sleeping_time_start > 0.0f);
assert(sleeping_time_coef > 1.0f);
}
~ThreadGroup()
{
this->joinAll();
}
template<typename F> boost::thread* createThread(F f)
{
float sleeping_time = this->sleepStart;
while(this->running.size() >= this->maxSize)
{
for(std::set<boost::thread*>::iterator it = running.begin(); it != running.end();)
{
const std::set<boost::thread*>::iterator jt = it++;
if((*jt)->timed_join(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time))))
running.erase(jt);
}
if(sleeping_time < this->sleepMax)
{
sleeping_time *= this->sleepCoef;
if(sleeping_time > this->sleepMax)
sleeping_time = this->sleepMax;
}
}
return *this->running.insert(this->group.create_thread(f)).first;
}
void joinAll()
{
this->group.join_all();
}
void interruptAll()
{
#ifdef BOOST_THREAD_PROVIDES_INTERRUPTIONS
this->group.interrupt_all();
#endif
}
std::size_t size() const
{
return this->group.size();
}
};
Here is an example of use, very similar to boost::thread_group
with the main difference that the creation of the thread is a waiting point:
这是一个使用示例,与boost :: thread_group非常相似,主要区别在于线程的创建是一个等待点:
{
ThreadGroup group(4);
for(int i = 0; i < 15; ++i)
group.createThread(aFunctionToExecute);
} // join all at destruction
#1
3
Another, more efficient solution would be to have each thread callback to the primary thread when they are finished, and the handler on the primary thread could launch a new thread each time. This prevents the repetitive calls to timed_join, as the primary thread won't do anything until the callback is triggered.
另一个更有效的解决方案是让每个线程在完成后回调到主线程,并且主线程上的处理程序每次都可以启动一个新线程。这可以防止重复调用timed_join,因为主要线程在触发回调之前不会执行任何操作。
#2
0
I have something like this:
我有这样的事情:
boost::mutex mutex_;
boost::condition_variable condition_;
const size_t throttle_;
size_t size_;
bool wait_;
template <typename Env, class F>
void eval_(const Env &env, const F &f) {
{
boost::unique_lock<boost::mutex> lock(mutex_);
size_ = std::min(size_+1, throttle_);
while (throttle_ <= size_) condition_.wait(lock);
}
f.eval(env);
{
boost::lock_guard<boost::mutex> lock(mutex_);
--size_;
}
condition_.notify_one();
}
#3
0
I think you are looking for a thread_pool implementation, which is available here.
我想你正在寻找一个thread_pool实现,可以在这里找到。
Additionally I have noticed that if you create a vector of std::future and store futures of many std::async_tasks in it and you do not have any blocking code in the function passed to the thread, VS2013 (atleast from what I can confirm) will launch exactly the appropriate no of threads your machine can handle. It reuses the threads once created.
另外我注意到如果你创建一个std :: future的向量并在其中存储许多std :: async_tasks的未来,并且在传递给该线程的函数中没有任何阻塞代码,VS2013(至少我可以确认) )将准确启动您的机器可以处理的相应的线程数。它重用一次创建的线程。
#4
0
I created my own simplified interface of boost::thread_group
to do this job:
我创建了自己的boost :: thread_group简化接口来完成这项工作:
class ThreadGroup : public boost::noncopyable
{
private:
boost::thread_group group;
std::size_t maxSize;
float sleepStart;
float sleepCoef;
float sleepMax;
std::set<boost::thread*> running;
public:
ThreadGroup(std::size_t max_size = 0,
float max_sleeping_time = 1.0f,
float sleeping_time_coef = 1.5f,
float sleeping_time_start = 0.001f) :
boost::noncopyable(),
group(),
maxSize(max_size),
sleepStart(sleeping_time_start),
sleepCoef(sleeping_time_coef),
sleepMax(max_sleeping_time),
running()
{
if(max_size == 0)
this->maxSize = (std::size_t)std::max(boost::thread::hardware_concurrency(), 1u);
assert(max_sleeping_time >= sleeping_time_start);
assert(sleeping_time_start > 0.0f);
assert(sleeping_time_coef > 1.0f);
}
~ThreadGroup()
{
this->joinAll();
}
template<typename F> boost::thread* createThread(F f)
{
float sleeping_time = this->sleepStart;
while(this->running.size() >= this->maxSize)
{
for(std::set<boost::thread*>::iterator it = running.begin(); it != running.end();)
{
const std::set<boost::thread*>::iterator jt = it++;
if((*jt)->timed_join(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time))))
running.erase(jt);
}
if(sleeping_time < this->sleepMax)
{
sleeping_time *= this->sleepCoef;
if(sleeping_time > this->sleepMax)
sleeping_time = this->sleepMax;
}
}
return *this->running.insert(this->group.create_thread(f)).first;
}
void joinAll()
{
this->group.join_all();
}
void interruptAll()
{
#ifdef BOOST_THREAD_PROVIDES_INTERRUPTIONS
this->group.interrupt_all();
#endif
}
std::size_t size() const
{
return this->group.size();
}
};
Here is an example of use, very similar to boost::thread_group
with the main difference that the creation of the thread is a waiting point:
这是一个使用示例,与boost :: thread_group非常相似,主要区别在于线程的创建是一个等待点:
{
ThreadGroup group(4);
for(int i = 0; i < 15; ++i)
group.createThread(aFunctionToExecute);
} // join all at destruction