Go 语言任务编排 WaitGroup

时间:2024-09-29 15:49:28

        WaitGroup 是常用的 Go 同步原语之一,用来做任务编排。它要解决的就是并发-等待的问题: 现在有一个 goroutine A 在检查点 ( checkpoint ) 等待一组 goroutine 全部完成它们的任务,如果这些 goroutine 还没全部完成任务,那么 goroutine A 就会被阻塞在检查点,直到所有的 goroutine 都完成任务后才能继续执行。

        我们来看一个使用 WaitGroup 的场景。

        比如,我们要完成一个大任务,需要使用并行的 goroutine 执行三个小任务,只有这三个小任务都完成了,才能执行后面的任务。如果通过轮询的方式定时询问三个小任务是否完成,则会存在两个问题:一是性能比较低,因为三个小任务可能早就完成了,却要等很长时间才能被轮询到;二是会有很多无谓的轮询,空耗CPU资源。

        这个时候使用 WaitGroup 同步原语就比较有效了,它可以阻塞等待的 goroutine,等到三个小任务都完成了,再即时唤醒它们。 其实,很多操作系统和编程语言都提供了类似的同步原语,比如 Linux 中的 barrier、Pthread(POSIX 线程)中的 barrier、C ++ 中的 std::barrier、Java 中的 CyclicBarrier 和 CountDownLatch 等。

1. WaitGroup 的使用方法

        在 Go 官方提供的同步原中,最常用的几个类型使用起来很简单,这是很不容易的设计。WaitGroup 就是简单且常用的同步原语之一,它只有三个方法。

  • Add(delta int): 给 WaitGroup 的计数值增加一个数值, delta 可以是负数。当 WaitGroup 的计数值减小到 0 时,任何阻塞在 Wait( )方法上的 goroutine 都会被解除封印,不再阻塞,可以继续执行。如果计数器的值为负数,则会出现 panic。
  • Done( ):表示一个 goroutine 完成了任务,WaitGroup 的计数值减 1。
  • Wait( ): 此方法的调用者会被阻塞,直到 WaitGroup 的计数值减小到 0。

        WaitGroup 的功能就是等待一组 goroutine 都完成任务。一般主 goroutine 会设置要等待的 goroutine 的数量 n,也就是将计数器的值设置为 n,这些 goroutine 运行完毕后调用 Done 方法,告诉 WaitGroup 自己已经光荣完成任务了。主 goroutine 调用 Wait 方法偶尔会被阻塞,直到这 n 个 goroutine 全部完成任务。

        下面是一个访问搜索引擎的例子。

        我们首先定义了一个 WaitGroup 的变量wg,然后在访问搜索引擎的groutine启动之前,通过Add(3) 将 wg 计数值设置为3。

         访问每个搜索引擎都使用一个独立的 goroutine, 当该 goroutine 执行完毕的时候,调用 ( ),计数器的值减1。

主 goroutine 调用 Wait 方法被阻塞,直到这三个访问搜索引擎的 goroutine 都执行完毕。

  1. package main
  2. import (
  3. "log"
  4. "net/http"
  5. "sync"
  6. "time"
  7. )
  8. func main() {
  9. var wg
  10. var urls = []string{"", "", ""}
  11. //访问三个搜索引擎
  12. =
  13. (3) //设置三个子任务
  14. for i := 0; i < 3; i++ {
  15. go func(url string) { //启动三个子goroutine来执行
  16. defer () //执行完毕,标记着自己完成,WaitGroup的计数值减1
  17. ("fetching", url) //以下为正常访问网页的代码
  18. resp, err := (url)
  19. if err != nil {
  20. return
  21. }
  22. ()
  23. }(urls[i])
  24. }
  25. () //等待三个子任务完成。等它们都调用Done之后,WaitGroup的计数值变为0,才会执行下一步
  26. ("done")
  27. }

执行完这个程序,将会得到正确的输出,显示三个 goroutine 都获得到了网页信息.

  1. 2024/07/24 13:32:27 fetching http://
  2. 2024/07/24 13:32:27 fetching http://
  3. 2024/07/24 13:32:27 fetching http://
  4. 2024/07/24 13:32:28 done

 WaitGroup 在使用中有如下一些特点:

  • 通常要预先设置计数器的值,也就是预先调用 Add 方法。
  • 通常将计数器的值设置为要等待的 goroutine 的数量 如果你偏偏不想这样做,程序也能运行,只不过显得另类而已。
  • 你可以多次调用 Wait 方法,只要 WaitGroup 的计数值为 0 ,所有的 Wait 就不再发生阻塞。
  • 对于一个零值的 WaitGroup,或者计数值已经为 0 的 WaitGroup,如果直接调用它的 Wait 方法,调用者不会被阻塞。
  1. var wg
  2. ()

        如果你想获取等待的那些 goroutine 执行的结果,则需要使用额外的变量,而 WaitGroup 本身是不保存额外信息的。现在把上面访问搜索引擎的例子改一下,收集访问引擎成功与否的结果:

  1. package main
  2. import (
  3. "log"
  4. "net/http"
  5. "sync"
  6. "time"
  7. )
  8. func main() {
  9. var wg
  10. ()
  11. var urls = []string{"", "", ""}
  12. var result = make([]bool, len(urls)) //使用result记录三个子任务的结果
  13. =
  14. (3) //设置WaitGroup 的计数值为3
  15. for i := 0; i < 3; i++ {
  16. i := i
  17. go func(url string) { //启动三个子任务
  18. defer ()
  19. ("fetching", url)
  20. resp, err := (url)
  21. if err != nil {
  22. result[i] = false
  23. return
  24. }
  25. result[i] = ==
  26. ()
  27. }(urls[i])
  28. }
  29. ()
  30. ("done") //子任务完成,result中保证有值
  31. for i := 0; i < 3; i++ {
  32. (urls[i], ":", result[i]) //输出结果
  33. }
  34. }

        这里定义了一个收集 goroutine 执行结果的变量 result,如果搜索引擎正常返回 200,就认为执行成功了,否则返回 false 。

  1. 2024/07/24 14:10:57 fetching http://
  2. 2024/07/24 14:10:57 fetching http://
  3. 2024/07/24 14:10:57 fetching http://
  4. 2024/07/24 14:10:58 done
  5. 2024/07/24 14:10:58 http:// : true
  6. 2024/07/24 14:10:58 http:// : true
  7. 2024/07/24 14:10:58 http:// : false

        另外,WaitGroup 本身没有控制这些执行任务的 goroutine 中止的能力,它只能傻傻地等待这些 goroutine 执行完毕,把计数器的值降为 0。

2. WaitGroup 的实现

        WaitGroup 的实现也没有使用太多的代码,它也是我们学习 Go 语言的好素材,充分体现了 Go 团队的技术能力,值得我们好好钻研

        首先看 WaitGroup 的 struct 定义(以 Go 1.20 版本为例)

  1. type WaitGroup stuct {
  2. noCopy noCopy
  3. state atomic.Uint64 // 高32位为计数器的值,低 32 位为 waiter 的数量
  4. sema uint32 // 信号量
  5. }

        第一个字段 noCopy 是一个辅助字段,主要用于辅助 vet 工具检查是否通过 copy 复制这个 WaitGroup 实例。

        第二个字段是类型 atomic.Uint64 的 state。先前的 WaitGroup 为了 64 位对齐,避免原子操作时出问题,使用了特殊的方法,现在 atomic.Uint64 保证 64 位对齐,所以 state 字段总是能记录计数器的值和 waiter 的数量。

        第三个字段 sema 是信号量,用来唤醒 waiter。

        接下来,我们继续深入源码,看一下 Add、Done 和 Wait 这三个方法的实现。

        下面先梳理 Add 方法的逻辑。Add 方法主要操作的是 state 的计数部分。你可以为计数器的值增加一个 delta 值,内部通过原子操作把这个值加到计数器的值上。需要注意的是,这个 delta 值也可以是负数,相当于计数器的值减去一个值。

        先忽略各种 panic 检查。我们看到,如果计数器的值大于 0 或者 waiter 的数量为 0,则不需要做额外的处理,直接返回。

        但是,如果计数器的值为 0, 并且还有 waiter 被阻塞,则把 state 的计数清零,也就是把 waiter 的数量置 0, 并且唤醒那些被阻塞的 waiter。

        Done 方法的实现非常简单,就是一个辅助方法。

  1. func (wg *WaitGroup) Wait(){
  2. for {
  3. state := ()
  4. v := int32(state >> 32) //得到计数器的值
  5. w := uint32(state) //得到waiter的数量
  6. if v==0 { //计数器的值已经为0,直接返回
  7. return
  8. }
  9. //增加 waiter 的数量
  10. if (state,state+1){
  11. runtime_Semacquire(&)
  12. if ()!=0 {
  13. panic("sync:WaitGroup is reused before previous Wait has returned")
  14. }
  15. return
  16. }
  17. }
  18. }

        Wait 方法则尝试检查计数器的值 v,如果计数器的值为 0, 则返回,不会发生阻塞; 否则,原子操作state,把本 goroutine 加入 waiter 中。如果加入成功,它则被阻塞等待唤醒;否则,进行循环检查(因为可能同时有多个 waiter 调用 Wait 方法)。

        如果阻塞的Waiter 被唤醒,理论上, state 的计数值应该为 0 (从 Add 方法的实现中可以看到,是先把 state 的计数清零,再唤醒 waiter 的), 那么直接返回就好了,因为 state 的计数值等于 0 就意味着计数器的值也为 0 了。