I tried to implement a simple barrier in my code that looks like this:
我尝试在我的代码中实现一个简单的屏障,如下所示:
void waitOnBarrier(int* barrier, int numberOfThreads) {
atomicIncrement(barrier); // atomic increment implemented in assembly
while(*barrier < numberOfThreads);
}
And then there is a barrier usage in the code:
然后在代码中使用屏障:
int g_barrier = 0; // a global variable
waitOnBarrier(&g_barrier, someKnownNumberOfThreads);
So far so good, but where should I reset my g_barrier variable back to zero? If I write something like
到目前为止这么好,但我应该在哪里将g_barrier变量重置为零?如果我写的东西像
g_barrier = 0;
right after the waitOnBarrier call, I will have a problem if one of the threads will be released faster than others from the barrier and nullify the g_barrier while all other threads are still performing the loop instructions, so eventually they will get stuck on the barrier forever.
在waitOnBarrier调用之后,如果其中一个线程比屏障中的其他线程释放得更快并且在所有其他线程仍在执行循环指令时使g_barrier无效,我将遇到问题,因此最终它们将永远卡在屏障上。
Explanation: waitOnBarrier will compile into something like this (pseudocode):
说明:waitOnBarrier将编译成这样的东西(伪代码):
1: mov rax, numberOfThreads
2: mov rbx, [barrier]
3: cmp rax, rbx
4: jmp(if smaller) to 2
So if we have 2 threads syncing on the barrier, and thread_1 being slow somewhere at instruction 3 or 4, while a faster thread_2 reaches the barrier, passes it and continues to the g_barrier nullification flow. Which means that after thread_1 will reach instruction 2 it will see a zero value at [barrier] and will stuck on the barrier forever!
因此,如果我们在屏障上同步2个线程,并且thread_1在指令3或4的某处缓慢,而更快的thread_2到达屏障,则传递它并继续到g_barrier无效流程。这意味着在thread_1达到指令2后,它将在[障碍]处看到零值并永远停留在障碍物上!
The question is, how should I nullify the g_barrier, what place for it in the code is "far enough" that I can be sure that by that time all the threads left the barrier? Or is there more correct way to implement a barrier?
问题是,我应该如何使g_barrier无效,它在代码中的位置是“足够远”,以至于我可以确定到那时所有的线程都离开了障碍?或者是否有更正确的方法来实施障碍?
4 个解决方案
#1
2
Barriers are actually quite difficult to implement, the main reason being that new waiters can begin arriving before all the old waiters have had a chance to execute, which precludes any kind of simple count based implementation. My preferred solution is to have the barrier object itself simply point to a "current barrier instance" that exists on the stack of the first thread arriving at the barrier, and which will also be the last thread to leave (since it cannot leave while other threads are still referencing its stack). A very nice sample implementation in terms of pthread primitives (which could be adapted to C11 locking primitives or whatever you have to work with) is included in Michael Burr's answer to my past question on the topic:
障碍实际上很难实施,主要原因是新服务员可以在所有旧服务员都有机会执行之前开始到达,这排除了任何简单的基于计数的实施。我的首选解决方案是让屏障对象本身简单地指向存在于第一个线程的堆栈上的“当前屏障实例”,该第一个线程到达屏障,并且它也将是最后一个离开的线程(因为它不能离开而其他线程仍在引用其堆栈)。 Michael Burr对我过去关于该主题的问题的回答中包含了一个非常好的pthread原语实例(可以适应C11锁定原语或者你必须使用的任何东西)。
https://*.com/a/5902671/379897
Yes it looks like a lot of work, but writing a barrier implementation that actually satisfies the contract of a barrier is non-trivial.
是的,它看起来像很多工作,但编写一个实际上满足障碍合同的屏障实现是非常重要的。
#2
1
Do not reset your barrier
variable back to zero.
不要将屏障变量重置为零。
When any of the thread is about to exit, atomically decrement the barrier
variable by one.
当任何线程即将退出时,将屏障变量原子递减1。
Your barrier looks like you do not want the number of working threads spawned to fall below numberOfThreads
.
你的障碍看起来好像你不希望产生的工作线程数量低于numberOfThreads。
#3
0
Try to implement the Barrier solution that is being explained in this book:
尝试实现本书中解释的Barrier解决方案:
The Little Book of Semaphores ,http://www.greenteapress.com/semaphores/
信号量小书,http://www.greenteapress.com/semaphores/
#4
0
I came across this question when trying to do something similar, so I thought I'd share my solution, in case someone else finds it useful. It's implemented in pure C++11 (sadly not C11, since the multithreading part of the standard is unsupported as of yet in gcc and msvc).
我在尝试做类似的事情时遇到了这个问题,所以我想我会分享我的解决方案,以防其他人发现它有用。它是在纯C ++ 11中实现的(遗憾的是不是C11,因为标准的多线程部分在gcc和msvc中尚不支持)。
Basically, you maintain two counters, whose usage is alternated. Below is the implementation and a usage example:
基本上,您维护两个计数器,其用法是交替使用的。以下是实现和用法示例:
#include <cstdio>
#include <thread>
#include <condition_variable>
// A barrier class; The barrier is initialized with the number of expected threads to synchronize
class barrier_t{
const size_t count;
size_t counter[2], * currCounter;
std::mutex mutex;
std::condition_variable cv;
public:
barrier_t(size_t count) : count(count), currCounter(&counter[0]) {
counter[0] = count;
counter[1] = 0;
}
void wait(){
std::unique_lock<std::mutex> lock(mutex);
if (!--*currCounter){
currCounter += currCounter == counter ? 1 : -1;
*currCounter = count;
cv.notify_all();
}
else {
size_t * currCounter_local = currCounter;
cv.wait(lock, [currCounter_local]{return *currCounter_local == 0; });
}
}
};
void testBarrier(size_t iters, size_t threadIdx, barrier_t *B){
for(size_t i = 0; i < iters; i++){
printf("Hello from thread %i for the %ith time!\n", threadIdx, i);
B->wait();
}
}
int main(void){
const size_t threadCnt = 4, iters = 8;
barrier_t B(threadCnt);
std::thread t[threadCnt];
for(size_t i = 0; i < threadCnt; i++) t[i] = std::thread(testBarrier, iters, i, &B);
for(size_t i = 0; i < threadCnt; i++) t[i].join();
return 0;
}
#1
2
Barriers are actually quite difficult to implement, the main reason being that new waiters can begin arriving before all the old waiters have had a chance to execute, which precludes any kind of simple count based implementation. My preferred solution is to have the barrier object itself simply point to a "current barrier instance" that exists on the stack of the first thread arriving at the barrier, and which will also be the last thread to leave (since it cannot leave while other threads are still referencing its stack). A very nice sample implementation in terms of pthread primitives (which could be adapted to C11 locking primitives or whatever you have to work with) is included in Michael Burr's answer to my past question on the topic:
障碍实际上很难实施,主要原因是新服务员可以在所有旧服务员都有机会执行之前开始到达,这排除了任何简单的基于计数的实施。我的首选解决方案是让屏障对象本身简单地指向存在于第一个线程的堆栈上的“当前屏障实例”,该第一个线程到达屏障,并且它也将是最后一个离开的线程(因为它不能离开而其他线程仍在引用其堆栈)。 Michael Burr对我过去关于该主题的问题的回答中包含了一个非常好的pthread原语实例(可以适应C11锁定原语或者你必须使用的任何东西)。
https://*.com/a/5902671/379897
Yes it looks like a lot of work, but writing a barrier implementation that actually satisfies the contract of a barrier is non-trivial.
是的,它看起来像很多工作,但编写一个实际上满足障碍合同的屏障实现是非常重要的。
#2
1
Do not reset your barrier
variable back to zero.
不要将屏障变量重置为零。
When any of the thread is about to exit, atomically decrement the barrier
variable by one.
当任何线程即将退出时,将屏障变量原子递减1。
Your barrier looks like you do not want the number of working threads spawned to fall below numberOfThreads
.
你的障碍看起来好像你不希望产生的工作线程数量低于numberOfThreads。
#3
0
Try to implement the Barrier solution that is being explained in this book:
尝试实现本书中解释的Barrier解决方案:
The Little Book of Semaphores ,http://www.greenteapress.com/semaphores/
信号量小书,http://www.greenteapress.com/semaphores/
#4
0
I came across this question when trying to do something similar, so I thought I'd share my solution, in case someone else finds it useful. It's implemented in pure C++11 (sadly not C11, since the multithreading part of the standard is unsupported as of yet in gcc and msvc).
我在尝试做类似的事情时遇到了这个问题,所以我想我会分享我的解决方案,以防其他人发现它有用。它是在纯C ++ 11中实现的(遗憾的是不是C11,因为标准的多线程部分在gcc和msvc中尚不支持)。
Basically, you maintain two counters, whose usage is alternated. Below is the implementation and a usage example:
基本上,您维护两个计数器,其用法是交替使用的。以下是实现和用法示例:
#include <cstdio>
#include <thread>
#include <condition_variable>
// A barrier class; The barrier is initialized with the number of expected threads to synchronize
class barrier_t{
const size_t count;
size_t counter[2], * currCounter;
std::mutex mutex;
std::condition_variable cv;
public:
barrier_t(size_t count) : count(count), currCounter(&counter[0]) {
counter[0] = count;
counter[1] = 0;
}
void wait(){
std::unique_lock<std::mutex> lock(mutex);
if (!--*currCounter){
currCounter += currCounter == counter ? 1 : -1;
*currCounter = count;
cv.notify_all();
}
else {
size_t * currCounter_local = currCounter;
cv.wait(lock, [currCounter_local]{return *currCounter_local == 0; });
}
}
};
void testBarrier(size_t iters, size_t threadIdx, barrier_t *B){
for(size_t i = 0; i < iters; i++){
printf("Hello from thread %i for the %ith time!\n", threadIdx, i);
B->wait();
}
}
int main(void){
const size_t threadCnt = 4, iters = 8;
barrier_t B(threadCnt);
std::thread t[threadCnt];
for(size_t i = 0; i < threadCnt; i++) t[i] = std::thread(testBarrier, iters, i, &B);
for(size_t i = 0; i < threadCnt; i++) t[i].join();
return 0;
}