深度解析sync WaitGroup源码及其实现原理

时间:2024-10-01 14:12:31

在这里插入图片描述

目录

  • WaitGroup介绍
  • WaitGroup的实现
    • Add
    • Done
    • Wait

WaitGroup介绍

waitGroup ,也是在go语言并发中比较常用的语法,所以在这里我们一起剖析 waitGroup 的使用方式及其源码解读。

WaitGroup 也是sync 包下一份子,用来解决任务编排的一个并发原语。它主要解决了并发-等待问题:比如现在有三个goroutine,分别为goroutineAgoroutineBgoroutineC,而goroutineA需要等待goroutineBgoroutineC这一组goroutine全部执行完毕后,才可以执行后续业务逻辑。此时就可以使用 WaitGroup 轻松解决。

在这个场景中,goroutineA为主goroutine,goroutineBgoroutineC为子goroutine。goroutineA则需要在检查点(checkout point) 等待goroutineBgoroutineC全部执行完毕,如果在执行任务的goroutine还没全部完成,那么goroutineA就会阻塞在检查点,直到所有goroutine都完成后才能继续执行。

代码实现:

package main

import (
  "fmt"
  "sync"
)

func goroutineB(wg *sync.WaitGroup) {
  defer wg.Done()
  fmt.Println("goroutineB Execute")
  time.Sleep(time.Second)
}

func goroutineC(wg *sync.WaitGroup) {
  defer wg.Done()
  fmt.Println("goroutineC Execute")
  time.Sleep(time.Second)
}

func main() {
  var wg sync.WaitGroup
  wg.Add(2)
  go goroutineB(&wg)
  go goroutineC(&wg)
  wg.Wait()
  fmt.Println("goroutineB and goroutineC finished...")
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

运行结果:

goroutineC Execute
goroutineB Execute
goroutineB and goroutineC finished...
  • 1
  • 2
  • 3

上述就是WaitGroup 的简单操作,它的语法也是比较简单,提供了三个方法,如下所示:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
  • 1
  • 2
  • 3
  • Add:用来设置WaitGroup的计数值(子goroutine的数量)

  • Done:用来将WaitGroup的计数值减1,起始就是调用Add(-1)

  • Wait:调用这个方法的goroutine会一直阻塞,直到WaitGroup的技术值变为0

接下来,我们进行剖析 WaitGroup 的源码实现,让其无处可遁,它源码比较少,除去注释,也就几十行,对新手来说也是一种不错的选择。

WaitGroup的实现

首先,我们看看 WaitGroup 的数据结构,它包括了一个noCopy 的辅助字段,一个具有复合意义的state1字段。

  • noCopy 的辅助字段:主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例。我会在后面和你详细分析这个字段

  • state1:具有复合意义的字段,包含WaitGroup计数值,阻塞在检查点的主gooutine和信号量

type WaitGroup struct {
    // 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
    noCopy noCopy
    // 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
    // 另外32bit是用作信号量的
    // 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
    // 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
    state1 [3]uint32
}


// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        // 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        // 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。

在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。

在这里插入图片描述

在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。

在这里插入图片描述

接下里,我们一一看 Add 方法、 Done 方法、 Wait 方法的实现原理。

Add

Add方法实现思路:

Add方法主要操作的state1字段中计数值部分。当Add方法被调用时,首先会将delta参数值左移32位(计数值在高32位),然后内部通过原子操作将这个值加到计数值上。需要注意的是,delta的取值范围可正可负,因为调用Done()方法时,内部通过Add(-1)方法实现的。

代码实现如下:

func (wg *WaitGroup) Add(delta int) {
  // statep表示wait数和计数值
  // 低32位表示wait数,高32位表示计数值
   statep, semap := wg.state()
   // uint64(delta)<<32 将delta左移32位
    // 因为高32位表示计数值,所以将delta左移32,增加到技术上
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   // 当前计数值
   v := int32(state >> 32)
   // 阻塞在检查点的wait数
   w := uint32(state)
   if v > 0 || w == 0 {
      return
   }
   
   // 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
    // 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
   *statep = 0
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Done

内部就是调用Add(-1)方法,这里就不细讲了。

// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() { 
  wg.Add(-1)
}
  • 1
  • 2
  • 3
  • 4

Wait

wait实现思路:

不断检查state值。如果其中的计数值为零,则说明所有的子goroutine已全部执行完毕,调用者不必等待,直接返回。如果计数值大于零,说明此时还有任务没有完成,那么调用者变成等待者,需要加入wait队列,并且阻塞自己。

代码实现如下:

func (wg *WaitGroup) Wait() {
   // statep表示wait数和计数值
   // 低32位表示wait数,高32位表示计数值
   statep, semap := wg.state()
   for {
      state := atomic.LoadUint64(statep)
      // 将state右移32位,表示当前计数值
      v := int32(state >> 32)
      // w表示waiter等待值
      w := uint32(state)
      if v == 0 {
         // 如果当前计数值为零,表示当前子goroutine已全部执行完毕,则直接返回
         return
      }
      // 否则使用原子操作将state值加一。
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
         // 阻塞休眠等待
         runtime_Semacquire(semap)
         // 被唤醒,不再阻塞,返回
         return
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

到此,waitGroup的基本使用和实现原理已介绍完毕了,相信大家已有不一样的收获,咱们下期见。

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。

在这里插入图片描述