目录
- WaitGroup介绍
- WaitGroup的实现
- Add
- Done
- Wait
WaitGroup介绍
waitGroup
,也是在go语言并发中比较常用的语法,所以在这里我们一起剖析 waitGroup 的使用方式及其源码解读。
WaitGroup
也是sync 包下一份子,用来解决任务编排的一个并发原语。它主要解决了并发-等待问题:比如现在有三个goroutine
,分别为goroutineA
,goroutineB
,goroutineC
,而goroutineA
需要等待goroutineB
和goroutineC
这一组goroutine全部执行完毕后,才可以执行后续业务逻辑。此时就可以使用 WaitGroup
轻松解决。
在这个场景中,goroutineA
为主goroutine,goroutineB
和goroutineC
为子goroutine。goroutineA
则需要在检查点(checkout point) 等待goroutineB
和goroutineC
全部执行完毕,如果在执行任务的goroutine
还没全部完成,那么goroutineA
就会阻塞在检查点,直到所有goroutine
都完成后才能继续执行。
代码实现:
package main
import (
"fmt"
"sync"
)
func goroutineB(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("goroutineB Execute")
time.Sleep(time.Second)
}
func goroutineC(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("goroutineC Execute")
time.Sleep(time.Second)
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go goroutineB(&wg)
go goroutineC(&wg)
wg.Wait()
fmt.Println("goroutineB and goroutineC finished...")
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
运行结果:
goroutineC Execute
goroutineB Execute
goroutineB and goroutineC finished...
- 1
- 2
- 3
上述就是WaitGroup 的简单操作,它的语法也是比较简单,提供了三个方法,如下所示:
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
- 1
- 2
- 3
-
Add:用来设置WaitGroup的计数值(子goroutine的数量)
-
Done:用来将WaitGroup的计数值减1,起始就是调用Add(-1)
-
Wait:调用这个方法的goroutine会一直阻塞,直到WaitGroup的技术值变为0
接下来,我们进行剖析 WaitGroup 的源码实现,让其无处可遁,它源码比较少,除去注释,也就几十行,对新手来说也是一种不错的选择。
WaitGroup的实现
首先,我们看看 WaitGroup 的数据结构,它包括了一个noCopy 的辅助字段,一个具有复合意义的state1字段。
-
noCopy 的辅助字段:主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例。我会在后面和你详细分析这个字段
-
state1:具有复合意义的字段,包含WaitGroup计数值,阻塞在检查点的主gooutine和信号量
type WaitGroup struct {
// 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
noCopy noCopy
// 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
// 另外32bit是用作信号量的
// 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
// 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
state1 [3]uint32
}
// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。
在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。
在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。
接下里,我们一一看 Add 方法、 Done 方法、 Wait 方法的实现原理。
Add
Add方法实现思路:
Add方法主要操作的state1字段中计数值部分。当Add方法被调用时,首先会将delta参数值左移32位(计数值在高32位),然后内部通过原子操作将这个值加到计数值上。需要注意的是,delta的取值范围可正可负,因为调用Done()方法时,内部通过Add(-1)方法实现的。
代码实现如下:
func (wg *WaitGroup) Add(delta int) {
// statep表示wait数和计数值
// 低32位表示wait数,高32位表示计数值
statep, semap := wg.state()
// uint64(delta)<<32 将delta左移32位
// 因为高32位表示计数值,所以将delta左移32,增加到技术上
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 当前计数值
v := int32(state >> 32)
// 阻塞在检查点的wait数
w := uint32(state)
if v > 0 || w == 0 {
return
}
// 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
// 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
Done
内部就是调用Add(-1)方法,这里就不细讲了。
// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
- 1
- 2
- 3
- 4
Wait
wait实现思路:
不断检查state值。如果其中的计数值为零,则说明所有的子goroutine已全部执行完毕,调用者不必等待,直接返回。如果计数值大于零,说明此时还有任务没有完成,那么调用者变成等待者,需要加入wait队列,并且阻塞自己。
代码实现如下:
func (wg *WaitGroup) Wait() {
// statep表示wait数和计数值
// 低32位表示wait数,高32位表示计数值
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
// 将state右移32位,表示当前计数值
v := int32(state >> 32)
// w表示waiter等待值
w := uint32(state)
if v == 0 {
// 如果当前计数值为零,表示当前子goroutine已全部执行完毕,则直接返回
return
}
// 否则使用原子操作将state值加一。
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
到此,waitGroup的基本使用和实现原理已介绍完毕了,相信大家已有不一样的收获,咱们下期见。
文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。