WaitGroup 是常用的 Go 同步原语之一,用来做任务编排。它要解决的就是并发-等待的问题: 现在有一个 goroutine A 在检查点 ( checkpoint ) 等待一组 goroutine 全部完成它们的任务,如果这些 goroutine 还没全部完成任务,那么 goroutine A 就会被阻塞在检查点,直到所有的 goroutine 都完成任务后才能继续执行。
我们来看一个使用 WaitGroup 的场景。
比如,我们要完成一个大任务,需要使用并行的 goroutine 执行三个小任务,只有这三个小任务都完成了,才能执行后面的任务。如果通过轮询的方式定时询问三个小任务是否完成,则会存在两个问题:一是性能比较低,因为三个小任务可能早就完成了,却要等很长时间才能被轮询到;二是会有很多无谓的轮询,空耗CPU资源。
这个时候使用 WaitGroup 同步原语就比较有效了,它可以阻塞等待的 goroutine,等到三个小任务都完成了,再即时唤醒它们。 其实,很多操作系统和编程语言都提供了类似的同步原语,比如 Linux 中的 barrier、Pthread(POSIX 线程)中的 barrier、C ++ 中的 std::barrier、Java 中的 CyclicBarrier 和 CountDownLatch 等。
1. WaitGroup 的使用方法
在 Go 官方提供的同步原中,最常用的几个类型使用起来很简单,这是很不容易的设计。WaitGroup 就是简单且常用的同步原语之一,它只有三个方法。
- Add(delta int): 给 WaitGroup 的计数值增加一个数值, delta 可以是负数。当 WaitGroup 的计数值减小到 0 时,任何阻塞在 Wait( )方法上的 goroutine 都会被解除封印,不再阻塞,可以继续执行。如果计数器的值为负数,则会出现 panic。
- Done( ):表示一个 goroutine 完成了任务,WaitGroup 的计数值减 1。
- Wait( ): 此方法的调用者会被阻塞,直到 WaitGroup 的计数值减小到 0。
WaitGroup 的功能就是等待一组 goroutine 都完成任务。一般主 goroutine 会设置要等待的 goroutine 的数量 n,也就是将计数器的值设置为 n,这些 goroutine 运行完毕后调用 Done 方法,告诉 WaitGroup 自己已经光荣完成任务了。主 goroutine 调用 Wait 方法偶尔会被阻塞,直到这 n 个 goroutine 全部完成任务。
下面是一个访问搜索引擎的例子。
我们首先定义了一个 WaitGroup 的变量wg,然后在访问搜索引擎的groutine启动之前,通过Add(3) 将 wg 计数值设置为3。
访问每个搜索引擎都使用一个独立的 goroutine, 当该 goroutine 执行完毕的时候,调用 ( ),计数器的值减1。
主 goroutine 调用 Wait 方法被阻塞,直到这三个访问搜索引擎的 goroutine 都执行完毕。
-
package main
-
-
import (
-
"log"
-
"net/http"
-
"sync"
-
"time"
-
)
-
-
func main() {
-
var wg
-
-
var urls = []string{"", "", ""}
-
//访问三个搜索引擎
-
=
-
(3) //设置三个子任务
-
for i := 0; i < 3; i++ {
-
go func(url string) { //启动三个子goroutine来执行
-
defer () //执行完毕,标记着自己完成,WaitGroup的计数值减1
-
-
("fetching", url) //以下为正常访问网页的代码
-
resp, err := (url)
-
if err != nil {
-
return
-
}
-
()
-
}(urls[i])
-
}
-
() //等待三个子任务完成。等它们都调用Done之后,WaitGroup的计数值变为0,才会执行下一步
-
("done")
-
}
执行完这个程序,将会得到正确的输出,显示三个 goroutine 都获得到了网页信息.
-
2024/07/24 13:32:27 fetching http://
-
2024/07/24 13:32:27 fetching http://
-
2024/07/24 13:32:27 fetching http://
-
2024/07/24 13:32:28 done
WaitGroup 在使用中有如下一些特点:
- 通常要预先设置计数器的值,也就是预先调用 Add 方法。
- 通常将计数器的值设置为要等待的 goroutine 的数量 如果你偏偏不想这样做,程序也能运行,只不过显得另类而已。
- 你可以多次调用 Wait 方法,只要 WaitGroup 的计数值为 0 ,所有的 Wait 就不再发生阻塞。
- 对于一个零值的 WaitGroup,或者计数值已经为 0 的 WaitGroup,如果直接调用它的 Wait 方法,调用者不会被阻塞。
-
var wg
-
()
如果你想获取等待的那些 goroutine 执行的结果,则需要使用额外的变量,而 WaitGroup 本身是不保存额外信息的。现在把上面访问搜索引擎的例子改一下,收集访问引擎成功与否的结果:
-
package main
-
-
import (
-
"log"
-
"net/http"
-
"sync"
-
"time"
-
)
-
-
func main() {
-
var wg
-
()
-
-
var urls = []string{"", "", ""}
-
var result = make([]bool, len(urls)) //使用result记录三个子任务的结果
-
=
-
-
(3) //设置WaitGroup 的计数值为3
-
-
for i := 0; i < 3; i++ {
-
i := i
-
go func(url string) { //启动三个子任务
-
defer ()
-
-
("fetching", url)
-
resp, err := (url)
-
if err != nil {
-
result[i] = false
-
return
-
}
-
result[i] = ==
-
()
-
}(urls[i])
-
}
-
-
()
-
("done") //子任务完成,result中保证有值
-
for i := 0; i < 3; i++ {
-
(urls[i], ":", result[i]) //输出结果
-
}
-
}
这里定义了一个收集 goroutine 执行结果的变量 result,如果搜索引擎正常返回 200,就认为执行成功了,否则返回 false 。
-
2024/07/24 14:10:57 fetching http://
-
2024/07/24 14:10:57 fetching http://
-
2024/07/24 14:10:57 fetching http://
-
2024/07/24 14:10:58 done
-
2024/07/24 14:10:58 http:// : true
-
2024/07/24 14:10:58 http:// : true
-
2024/07/24 14:10:58 http:// : false
另外,WaitGroup 本身没有控制这些执行任务的 goroutine 中止的能力,它只能傻傻地等待这些 goroutine 执行完毕,把计数器的值降为 0。
2. WaitGroup 的实现
WaitGroup 的实现也没有使用太多的代码,它也是我们学习 Go 语言的好素材,充分体现了 Go 团队的技术能力,值得我们好好钻研
首先看 WaitGroup 的 struct 定义(以 Go 1.20 版本为例)
-
type WaitGroup stuct {
-
noCopy noCopy
-
-
state atomic.Uint64 // 高32位为计数器的值,低 32 位为 waiter 的数量
-
sema uint32 // 信号量
-
}
第一个字段 noCopy 是一个辅助字段,主要用于辅助 vet 工具检查是否通过 copy 复制这个 WaitGroup 实例。
第二个字段是类型 atomic.Uint64 的 state。先前的 WaitGroup 为了 64 位对齐,避免原子操作时出问题,使用了特殊的方法,现在 atomic.Uint64 保证 64 位对齐,所以 state 字段总是能记录计数器的值和 waiter 的数量。
第三个字段 sema 是信号量,用来唤醒 waiter。
接下来,我们继续深入源码,看一下 Add、Done 和 Wait 这三个方法的实现。
下面先梳理 Add 方法的逻辑。Add 方法主要操作的是 state 的计数部分。你可以为计数器的值增加一个 delta 值,内部通过原子操作把这个值加到计数器的值上。需要注意的是,这个 delta 值也可以是负数,相当于计数器的值减去一个值。
先忽略各种 panic 检查。我们看到,如果计数器的值大于 0 或者 waiter 的数量为 0,则不需要做额外的处理,直接返回。
但是,如果计数器的值为 0, 并且还有 waiter 被阻塞,则把 state 的计数清零,也就是把 waiter 的数量置 0, 并且唤醒那些被阻塞的 waiter。
Done 方法的实现非常简单,就是一个辅助方法。
-
func (wg *WaitGroup) Wait(){
-
for {
-
state := ()
-
v := int32(state >> 32) //得到计数器的值
-
w := uint32(state) //得到waiter的数量
-
if v==0 { //计数器的值已经为0,直接返回
-
return
-
}
-
//增加 waiter 的数量
-
if (state,state+1){
-
runtime_Semacquire(&)
-
if ()!=0 {
-
panic("sync:WaitGroup is reused before previous Wait has returned")
-
}
-
return
-
}
-
}
-
}
Wait 方法则尝试检查计数器的值 v,如果计数器的值为 0, 则返回,不会发生阻塞; 否则,原子操作state,把本 goroutine 加入 waiter 中。如果加入成功,它则被阻塞等待唤醒;否则,进行循环检查(因为可能同时有多个 waiter 调用 Wait 方法)。
如果阻塞的Waiter 被唤醒,理论上, state 的计数值应该为 0 (从 Add 方法的实现中可以看到,是先把 state 的计数清零,再唤醒 waiter 的), 那么直接返回就好了,因为 state 的计数值等于 0 就意味着计数器的值也为 0 了。