并发相关
CAS
CAS算法(Compare And Swap),是原⼦操作的⼀种,,CAS 算法是⼀种有名的⽆锁算法。⽆锁编程,即不使⽤锁的情况下实现多线程之间的变量同步。可⽤于在多线程编程中实现不被打断的数据交换操作,从⽽避免多线程同时改写某⼀数据时由于执⾏顺序不确定性以及中断的不可预知性产⽣的数据不⼀致问题。
Go中的CAS操作是借⽤了CPU提供的原⼦性指令来实现。 CAS操作修改共享变量时候不需要对共享变量加锁,⽽是通过类似乐观锁的⽅式进⾏检查,本质还是不断的占⽤CPU 资源换取加锁带来的开销(⽐如上下⽂切换销)。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var (
counter int32 //计数器
wg sync.WaitGroup //信号量
)
func main() {
threadNum := 5
wg.Add(threadNum)
for i := 0; i < threadNum; i++ {
go incCounter(i)
}
wg.Wait()
}
func incCounter(index int) {
defer wg.Done()
spinNum := 0
for {
// 原⼦操作
old := counter
ok := atomic.CompareAndSwapInt32(&counter, old, old+1)
if ok {
break
} else {
spinNum++
}
}
fmt.Printf("thread,%d,spinnum,%d\n", index, spinNum)
}
当主函数 main ⾸先创建了5个信号量,然后开启五个线程执⾏ incCounter ⽅法,incCounter 内部执⾏, 使⽤ CAS 操作递增 counter 的值,atomic.CompareAndSwapInt32 具有三个参数,第⼀个是变量的地址,第⼆个是变量当前值,第三个是要修改变量为多少,该函数如果发现传递的 old 值等于当前变量的值,则使⽤第三个变量替换变量的值并返回 true,否则返回 false。
这⾥之所以使⽤⽆限循环是因为在⾼并发下每个线程执⾏ CAS 并不是每次都成功,失败了的线程需要重写获取变量当前的值,然后重新执⾏ CAS 操作。
go 中 CAS 操作可以有效的减少使⽤锁所带来的开销,但是需要注意在⾼并发下这是使⽤ cpu 资源做交换的。
什么是并发编程
首先,要明确并行和并发的概念。
并行,是指两个或者多个事件在同一时刻发生。并发,是指两个或多个事件在同一时间间隔发生。
并发偏重于多个任务交替执行,而多个任务之间还可能是串行的。而并行才是真正意义上的“同时执行”。
**并发编程是指在一台机器上“同时”处理多个任务。**并发是在同一实体上的多个事件,多个事件在同一时间间隔发生。并发编程的目标是充分的利用处理器的每一个核,以达到最高的处理性能。
go 并发机制及 CSP 并发模型
CSP 模型
CSP 模型是上个世纪七十年代提出的,不同于传统的多线程通过共享内存来通信,CSP 讲究的是“以通信的方式来共享内存”。用于描述两个独立的并发实体通过共享的通讯 channel 进行通信的并发模型。CSP 中的 channel 是第一类对象,它不关注发送消息的实体,而是关注发送消息时使用的 channel。
go 中 channel 一般创建后,用于协程之间传递信息,类似于一个消息队列。
go 并发机制
goroutine 是 go 实际并发执行的实体,它底层使用协程实现并发,协程是一种运行在用户态的用户线程,类似于 greenthread,go 底层选择使用协程的原因如下:
- 它在用户空间操作,避免了内核态和用户态的切换成本
- 可以由语言和框架层进行调度
- 相较于线程,拥有更小的栈空间,允许创建大量的协程
goroutine 的特性
go 中有三个对象:
- G(goroutine):我们说的协程,是用户级的轻量级线程,每个 goroutine 对象中的 sched 保存着它的上下文信息
- M(work thread):对 OS 内核级线程的封装,数量对应真实的 CPU 数量(一般远大于)
- P(processor):逻辑处理器,即为 G 和 M 的调度对象,用来调度 G 和 M 之间的关联关系,数量可以通过 GOMAXPROCS() 来设置,默认为核心数
正常情况下,一个 CPU 对象启动一个 M,M 检查并执行 G。遇到 G 阻塞的时候,会启动一个新的 M,以充分利用 CPU 资源。因此,有时候 M 数量会远大于 CPU个数。
GPM 如何调度
- 新创建的 G 会先保存在 P 的本地队列中,如果本地队列已满,那么就会保存在全局的 G 队列中,等待被 P 执行。
- M 和 P 绑定之后,M 会不断从 P 的本地队列中无锁地取出 G,并切换到这个 G 的堆栈执行。当 P 的本地 G 队列空了,就会从全局 G 队列获取一批 G。当全局队列中也没有待运行的 G 时,就会尝试从其他的 P 窃取部分 G 来执行,相当于 P 之间的负载均衡。
一轮调度
其实,一般我们知道上面的调度逻辑就差不多了。不过,如果需要深入了解的话,还是从最开始的一轮调度开始理一下吧。
引导程序会为 Go 程序的运行建立必要的环境,在完成一系列初始化工作之后,Go 程序的 main 函数才会真正执行。引导程序会在最后让调度器进行一轮调度,这样才能让封装了 main 函数的 G 能马上执行。
-
**调度器先判断当前 M 是否被锁定。**一般来说,Go 的调度器会按照一定策略动态地关联 M、P 和 G,并以此高效的执行并发程序,一般来说并不需要用户程序的任何干预。但是在极少情况下,用户程序不得已需要对 Go 的运行时调度进行干预。
**锁定 M 和 G 可以说是为了 CGO 准备的。**因为有些 C 语言的函数库(如 OpenGL)会用到线程本地存储技术,将数据存储在当前内核线程的私有缓存中。因此,包含了调用此类 C 函数库的代码的 G 会变得特殊:**在特定时间只能与同一个 M 产生关联,这样会对调度效率造成负面影响。**从上面的流程图可以看出,就算锁定了,一定要尽量减少锁定的时间。
-
如果发现当前 M 已经和某个 G 锁定了,那么会立即停止调度并停止当前 M。一旦和它锁定的 G 处于可运行状态,它就会被唤醒并继续运行这个 G。
-
如果判断当前 M 未与任何 G 锁定,那么调度器会检查是否有运行时串行任务正在等待执行(此类任务执行时需要停止 Go 调度器,STW)。如果 gcwaiting 不为 0,那么会停止并阻塞当前 M 以等待运行时串行任务执行完成。
-
否则,开始真正的可运行 G 寻找之旅。一旦找到一个可运行的 G,调度器会在判定该 G 未与任何 M 锁定之后,立即让当前 M 运行它。(查找的过程:先尝试获取执行 GC 任务的 G -> 从全局可运行 G 队列获取 G -> 从本地 P 得可运行 G 队列获取 G,最后到全力查找可运行的 G)
全力查找可运行的 G
概括来说,就是会多次尝试从各个地方查找可运行的 G:
- **获取执行终结器的 G:**所有的终结函数的执行都会由一个专用的 G 负责,如果这个专用 G 已完成任务,调度器就会获取它,把它置为 Grunable 状态并放入本地 P 的可运行 G 队列
- **从本地 P 得可运行 G 队列获取 G:**调度器尝试从这里获取一个 G,并返回
- **从调度器的可运行 G 队列获取 G:**调度器尝试从这里获取一个 G,并返回
- **从网络 I/O 轮询器获取 G:**如果 netpoller 已经被初始化且有过网络 I/O 操作,那么调度器就会尝试从这里获取一个 G 列表,并且把表头的 G 返回,同时把其他的 G 都放入调度器的可运行 G 队列。如果没有初始化或者没有过网络 I/O 操作,就跳过。这里没有获取成功也不会阻塞。
- **从其他 P 的可运行 G 队列获取 G:**条件允许时,调度器会使用伪随机算法从全局 P 列表中选取 P,然后尝试从它们的可运行 G 队列中盗取一半的 G 到本地 P 的可运行 G 队列。选取 P 和盗取 G 会重复多次,成功就停止。如果一直没有成功,那么第一阶段结束。
- **获取执行 GC 标记任务的 G:**第二阶段,调度器会先判断是否正处于 GC 的标记阶段,以及本地 P 是否可以用于 GC 标记任务,如果都是 true,就将本地 P 持有的 GC 标记专用 G 置为 Grunnable 并返回
- 从调度器的可运行 G 队列获取 G:调度器再次尝试从这里获取一个 G,并返回。如果还是找不到,就会解除本地 P 与当前 M 地关联,并把该 P 放入调度器的空闲 P 列表
- **从全局 P 列表中的每个 P 得可运行 G 队列获取 G:**遍历全局 P 中的所有 P,检查他们的可运行 G 队列。只要某个 P 的可运行 G 队列不为空,那就从调度器的空闲 P 列表中取一个 P,判定其可用后,将其和当前 M 关联在一起,然后返回第一阶段(第五步),否则继续后续;
- **获取执行 GC 标记任务的 G:**判断是否正处于 GC 的标记阶段,以及与 GC 标记任务相关的全局资源是否可用,如果都是 true,调度器从空闲 P 列表拿出一个 P。如果这个 P 持有一个 GC 标记专用 G,就关联该 P 与当前 M,然后再次执行第二阶段(从6 开始)
- **从网络 I/O 轮询器获取 G:**此步骤与步骤4 基本相同,但是这里的获取是阻塞的。只有当 netpoller 那里有可用的 G 时,阻塞才会解除。同样的,如果没有初始化或者没有过网络 I/O 操作,就跳过。
如果经过上述10个步骤之后,还没有找到可运行的 G,调度器就会停止当前 M。后续重新唤醒时,会重新进入查找可运行的 G 的子流程。
自旋状态: 意味着它还没有找到 G 来运行。一般来说,运行时系统中至少会有一个自旋的 M,调度器也会尽量保证有一个自旋的 M 存在。除非发现没有自旋的 M,调度器是不会新启用或恢复一个 M 去运行新 G 的。
为什么要有 P
- GM 模型的问题
- 存在单一的全局 mutex 和集中状态管理:mutex 需要保护所有与 goroutine 相关的操作,导致锁竞争严重;
- goroutine 交接问题:M 之间经常交接可运行的 goroutine,而且可能会导致延迟增加和额外开销。每个 M 必须能够执行任何可运行的 G;
- 每个 M 都需要做内存缓存:会导致资源消耗过大,数据局部性差
- 存在系统调用的情况下,线程经常会被阻塞和解阻塞。这增加了很多额外的性能开销
- GMP 模型的优化
- 每个 P 都有自己的本地队列,大幅度减轻了对全局队列的直接依赖,所带来的效果就是锁竞争的减少,而 GM 模型的性能开销大头就是锁竞争;
- 每个 P 相对的平衡上,在 GMP 模型中也实现了 Work Stealing 算法,如果 P 的本地队列为空,会从全局队列或者其他 P 得本地队列窃取可运行的 G 来运行,减少空转,提高了资源利用率;
一般来说,M 的数量都会多于 P。在 Go 中,M 的数量默认是 10000,P 的默认数量是 CPU 核数。由于 M 的属性,也就是如果存在系统阻塞调用,阻塞了 M,又不够用的情况下,M 会不断增加。
M 不断增加的话,如果本地队列挂载在 M 上,那么就意味着本地队列也会随之增加。这显然是不合理的,因为本地队列的管理会变得复杂,且 Work Stealing 性能会大幅度下降。
M 被系统调用阻塞之后,我们是期望他能将未执行的任务分配给其他 M 继续运行,而不是全部停止。因此,使用 M 是不合理的,引入新的组件 P,把本地队列关联到 P 上,就能很好的解决这个问题。
go 的 CSP 并发模型
go 的 CSP 并发模型,是通过 goroutine 和 channel 来实现的。
goroutine 是 go 中并发的执行单位,channel 用于各个并发结构体之间的通信。
go 中常用的并发模型
通过 channel 通知实现并发控制
无缓冲的通道指的是通道的大小为 0,也就是说,这种类型的通道在接收前没有能力保存任何值,它要求发送 goroutine 和接收 goroutine 同时准备好,才可以完成发送和接收操作。无缓冲的通道我们也称之为同步通道,通过它接可以简单实现并发控制。
func main() {
ch := make(chan struct{})
go func() {
fmt.Println("start working")
time.Sleep(time.Second * 1)
ch <- struct{}{}
}()
<- ch
fmt.Println("finished")
}
通过 sync 包中的 WaitGroup 实现并发控制
goroutine 是异步执行的,有的时候为了防止在结束 main 函数的时候结束掉 goroutine,所以需要同步等待,这时候就需要使用 WaitGroup 了。在 Sync 包中,提供了 WaitGroup,它会等待它收集的所有 goroutine 任务全部完成。
需要注意的是,WaitGroup 在第一次使用后,不能被拷贝。
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg sync.WaitGroup, i int) {
fmt.Printf("i:%d", i)
wg.Done()
}(wg, i)
}
wg.Wait()
fmt.Println("exit")
}
上面代码运行之后会出现死锁。是因为 wg 给拷贝传递到了 goroutine 中,导致只有 Add 操作,其实 Done 操作是在 wg 的副本执行的。因此,Wait 就会死锁。
有两个修改方式:1️⃣ 将匿名函数中的 wg 传入类型改为 *sync.WaitGroup,这样就能引用到正确的 WaitGroup 了。2️⃣ 将匿名函数中的 wg 的的传入参数去掉,因为 go 支持闭包类型,在匿名函数中可以直接使用外面的 wg 变量。
Go 1.7 之后的 Context
通常,一些简单场景下使用 channel 和 WaitGroup 已经足够了,但是当面临一些复杂多变的网络并发场景下 channel 和 WaitGroup 就显得有些力不从心了。
⽐如⼀个⽹络请求 Request,每个 Request 都需要开启⼀个 goroutine 做⼀些事情,这些 goroutine ⼜可能会开启其他的 goroutine,⽐如数据库和RPC服务。
所以我们需要⼀种可以跟踪 goroutine 的⽅案,才可以达到控制他们的⽬的,这就是 Go 语⾔为我们提供的 Context,称之为上下⽂⾮常贴切,它就是 goroutine 的上下⽂。
它是包括⼀个程序的运⾏环境、现场和快照等。每个程序要运⾏时,都需要知道当前程序的运⾏状态,通常Go 将这些封装在⼀个 Context ⾥,再将它传给要执⾏的 goroutine 。context 包主要是⽤来处理多个 goroutine 之间共享数据,及多个 goroutine 的管理。
context 包的核⼼是 struct Context,接⼝声明如下:
// A Context carries a deadline, cancelation signal, and requestscoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
// Done returns a channel that is closed when this `Context` is canceled
// or times out.
// Done() 返回⼀个只能接受数据的channel类型,当该context关闭或者超时时间到了的时候,该channel就会有⼀个取消信号
Done() <-chan struct{}
// Err indicates why this Context was canceled, after the Done channel
// is closed.
// Err() 在Done() 之后,返回context 取消的原因。
Err() error
// Deadline returns the time when this Context will be canceled, if any.
// Deadline() 设置该context cancel的时间点
Deadline() (deadline time.Time, ok bool)
// Value returns the value associated with key or nil if none.
// Value() ⽅法允许 Context 对象携带request作⽤域的数据,该数据必须是线程安全的。
Value(key interface{}) interface{}
}
Context 对象是协程安全的,你可以把⼀个 Context 对象传递给任意个数的 gorotuine,对它执⾏取消操作时,所有 goroutine 都会接收到取消信号。
⼀个 Context 不能拥有 Cancel ⽅法,同时我们也只能 Done channel 接收数据。其中的原因是⼀致的:接收取消信号的函数和发送信号的函数通常不是⼀个。
典型的场景是:⽗操作为⼦操作操作启动 goroutine,⼦操作也就不能取消⽗操作。