〇、什么是协程 Coroutines ?
进程和线程太常见,本文就不再赘述了,直接一起看下什么是协程。如下图,先看下协程的定位:
关于用户空间和内核空间:进程运行起来就涉及到对内存资源的管理,然而内存资源有限,因此操作系统采用虚拟内存技术,把进程虚拟地址空间划分成用户空间和内核空间。所有用户空间要访问硬件或执行 I/O 操作,必须经过内核进行操作,这样无论再多的请求,也可保证了系统的高效稳定。
操作系统在线程等待 IO 的时候,会阻塞当前线程,切换到其它线程,这样在当前线程等待 IO 的过程中,其它线程可以继续执行。当系统线程较少的时候没有什么问题,但是当线程数量非常多的时候,就会出问题。
- 一是,系统线程会占用非常多的内存空间;
- 二是,过多的线程切换会占用大量的系统时间。
协程刚好可以解决上述两个问题。协程运行在线程之上,当一个协程执行完成后,会主动让出,让另一个协程运行在当前线程之上。协程并没有增加线程数量,只是在线程的基础之上通过分时复用的方式运行多个协程,而且协程的切换在用户态完成,切换的代价比线程从用户态到内核态的代价小很多。
因此,在有大量 I/O 操作业务的情况下,我们采用协程替换线程,可以到达很好的效果,一是降低了系统内存,二是减少了系统切换开销,同时系统的性能也会提升。对于计算密集型操作,一般无需频繁切换线程,此时协程就是累赘,白白增加代码复杂度。
另外,在协程中尽量不要调用阻塞 I/O 的方法,比如打印,读取文件,Socket 接口等,除非改为异步调用的方式,并且协程只有在 I/O 密集型的任务中才会发挥作用。
详细可参考:https://zhuanlan.zhihu.com/p/172471249 https://zhuanlan.zhihu.com/p/337978321
一、Goroutine 是啥?
Goroutine 就是 Go 语言中的协程,也可以叫做轻量级线程,由 Go 运行时(runtime)管理,它会智能地将 Goroutine 中的任务合理地分配给每个 CPU,这主要靠的是 Go 在语言层面已经内置了调度和上下文切换的机制。
Go 协程调度器有三个重要数据结构,简称为 GMP,这也是其并发调度的核心:
- G(Goroutine):它是一个待执行的任务。Goroutine 可以快速创建和销毁,它们被设计用于执行并发任务。Goroutine 在逻辑上可以看作是一个函数或者一段代码,它们被放入队列中等待执行。
- M(Machine):即操作系统的工作线程,是实际执行 Goroutine 的实体。每个 M 都与一个 P 绑定,从 P 的本地队列中获取 Goroutine 来执行。如果本地队列为空,M 会尝试从全局队列获取 Goroutine 或者从其他 P 的本地队列中“窃取”Goroutine 来执行。
- P(Processor):表示处理器,它可以被看做运行在线程上的本地调度器。P 的数量通常与系统的 CPU 核心数相同,但这个数量可以通过环境变量来调整。P 负责管理 Goroutine 的调度,将 Goroutine 分配给与之绑定的 M 来执行。
大概的运行流程就是:当一个 M 线程可以执行任务时,它会首先检查与其关联的 P 的本地队列。如果本地队列为空,M 会尝试从全局队列中获取一批 Goroutine 放入本地队列,或者从其他 P 的本地队列中偷取一半 Goroutine 放入自己的本地队列。当 Goroutine 执行完成后,M 会从 P 获取下一个 Goroutine 继续执行。
单从线程调度讲,Go 语言相比起其他语言的优势在于,操作系统(OS)线程是由 OS 内核来调度的,Goroutine 则是由 Go 运行时(runtime)自己的调度器调度的,这个调度器使用一个称为 m:n 调度的技术(调度 m 个 Goroutine 到 n 个 OS 线程),多对多。其一大特点是 Goroutine 的调度是在用户空间下完成的,不涉及与内核空间之间的频繁切换,包括内存的分配与释放,都是在用户空间维护的一块大的内存池中,不直接调用系统的 malloc 函数(malloc 函数用于动态分配内存,并返回分配内存的首地址),除非内存池需要改变,成本比调度 OS 线程低很多。另一方面充分利用了多核的硬件资源,近似的把若干 Goroutine 均分在物理线程上, 再加上本身 Goroutine 的超轻量(初始版本为 2kb,后续升到 4kb、8kb,在 1.4 版本又减小到 2kb,不断优化),以上种种保证了 Go 调度方面的性能。
Go 语言中使用 Goroutine 非常简单,只需要在调用函数的时候在前面加上 go 关键字,就可以为一个函数创建一个 Goroutine。一个 Goroutine 必定对应一个函数,可以创建多个 Goroutine 去执行相同的函数。
下面将通过简单的示例看下如何创建 Goroutine。
1.1 启动单个 Goroutine 的示例
情况一:顺序执行,程序开始运行时开启一个 Goroutine,然后按照代码顺序逐个执行。
package main
import "fmt"
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
fmt.Println("begin-------------------!")
hello()
fmt.Println("main goroutine done!")
fmt.Println("end---------------------!")
}
输出结果:
情况二:执行 hello() 函数时,通过 go 关键字,另开一个 Goroutine,其后代码仍然在主协程中运行。
package main
import "fmt"
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
fmt.Println("begin-------------------!")
go hello()
fmt.Println("main goroutine done!")
fmt.Println("end---------------------!")
}
这一次的执行结果只打印了“main goroutine done!”,当 main() 函数返回的时候,该 Goroutine 就结束了,所有在 main() 函数中启动的 Goroutine 会一同结束,main 函数所在的 Goroutine 就像是权利的游戏中的夜王,其他的 Goroutine 都是异鬼,夜王一死它转化的那些异鬼也就全部 GG 了。
为了让分支协程能够正常输出,主协程需要等一等,就是下边的第三种情况。
情况三:在另开协程后,通过主协程等待一定时间,让副协程执行完成。
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
fmt.Println("begin-------------------!")
go hello()
fmt.Println("main goroutine done!")
time.Sleep(time.Second * 1)
fmt.Println("end---------------------!")
}
可以看到,先打印了“main goroutine done!”,是因为我们在创建新的 Goroutine 的时候需要花时间,而此时 main 函数所在的 Goroutine 是继续执行的。
1.2 启动多个 Goroutine 的示例
如下示例中的 sync.WaitGroup 是为了实现 Goroutine 的同步,本文后续章节会另外详细介绍。
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // Goroutine 结束就登记-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 5; i++ {
wg.Add(1) // 启动一个 Goroutine 就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的 Goroutine 都结束
}
多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为多个 Goroutine 是并发执行的,而 Goroutine 的调度是随机的。
1.3 关于 Goroutine 池
下面是一个代码示例:(目的是:随机生成一串数字,并计算一个数字的各个位数之和,例如数字 123,结果为 1+2+3=6)
注:由于示例中涉及管道等内容,本文后续章节会介绍。
package main
import (
"fmt"
"math/rand"
"runtime"
)
type Job struct {
Id int // id
RandNum int // 需要计算的随机数
}
type Result struct {
job *Job // 这里必须传对象实例
sum int // 求和
}
func main() {
// 需要2个管道
jobChan := make(chan *Job, 128) // 1.job管道
resultChan := make(chan *Result, 128) // 2.结果管道
createPool(10, jobChan, resultChan) // 3.创建工作池,允许十个协程同时操作
go func(resultChan chan *Result) { // 4.开个打印的协程
for result := range resultChan { // 监控结果管道,并实时打印
fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
result.job.RandNum, result.sum)
}
}(resultChan)
var id int
for { // 循环创建job,输入到管道
id++
r_num := rand.Int() // 生成随机数
job := &Job{
Id: id,
RandNum: r_num,
}
jobChan <- job // 将新鲜的 Job 对象,填入管道
if id == 10 { // 仅输出前十个
runtime.Goexit()
}
}
}
// 创建工作池 参数1:指定开几个协程
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
for i := 0; i < num; i++ { // 根据开协程个数,去跑运行
go func(jobChan chan *Job, resultChan chan *Result) {
// 执行运算
for job := range jobChan { // 遍历 job 管道所有数据,进行相加
r_num := job.RandNum // 随机数接过来
var sum int // 定义返回值
for r_num != 0 { // 随机数逐位相加
tmp := r_num % 10
sum += tmp
r_num /= 10
}
r := &Result{ // 结果 Result
job: job,
sum: sum,
}
resultChan <- r //运算结果填入结果管道
}
}(jobChan, resultChan)
}
}
输出结果:
1.4 主协程退出后,其他全部副协程也将中断
如下示例代码,在后边的主协程退出后,由于副协程睡的时间(5秒)超过的主协程运行时间(2秒),“Sleep-end”将不会被打印:
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("begin-------------------!")
go func() { // 直接在 main 中写函数体
i := 0
for {
i++
fmt.Printf("new goroutine: i = %d\n", i)
time.Sleep(time.Second * 5)
fmt.Println("new goroutine: Sleep-end")
}
}()
i := 0
for {
i++
fmt.Printf("main goroutine: i = %d\n", i)
time.Sleep(time.Second)
if i == 2 {
break
}
}
fmt.Println("end---------------------!")
}
在 Go 语言中,主协程(main Goroutine)的退出通常会导致程序的终止,即使还有其他的 Goroutine 正在运行。这是因为 Go 语言的设计是为了让程序的生命周期与主协程的生命周期绑定。当主协程结束时,它不会等待其他 Goroutine 完成任务,因此程序会直接退出,除非有额外的同步机制(比如:sync.WaitGroup)来确保主协程等待其他 Goroutine 完成。
二、runtime 包
2.1 runtime.Gosched() 切换到下一个待执行的 Goroutine
由于 Goroutine 的调度是由 Go 运行时管理的,因此在某些情况下,一个 Goroutine 可能会长时间占用 CPU 资源,导致其他 Goroutine 无法得到足够的执行时间。
为了解决这个问题,Go 语言提供了 runtime.Gosched() 函数。当调用该函数时,当前 Goroutine 会主动放弃 CPU 的控制权,并将执行权交给其他正在等待的 Goroutine。这样,其他 Goroutine 就可以获得更多的执行时间,从而提高程序的整体性能。
需要注意的是,runtime.Gosched() 函数只是让出 CPU 的控制权,并不会阻塞当前 Goroutine 的执行。也就是说,即使调用了 runtime.Gosched(),当前 Goroutine 仍然会继续执行下去,直到完成其任务或遇到阻塞操作。
下面是一个简单的示例代码:
package main
import (
"fmt"
"runtime"
)
func main() {
fmt.Println("begin-----------------!")
go func(s string) { // 开启副协程
for i := 0; i < 2; i++ {
fmt.Println(s)
}
}("new Goroutine!")
for i := 0; i < 2; i++ { // 主协程
runtime.Gosched() // 切换到下一个 Goroutine,当前 Goroutine 进入等待状态
fmt.Println("main Goroutine!")
}
fmt.Println("end-------------------!")
}
由输出结果可知,副协程先执行,主协程暂停后最终也执行了。
2.2 runtime.Goexit() 终止当前 Goroutine
在某些情况下,一个 Goroutine 可能会遇到错误或异常情况,需要立即停止执行并释放资源。这时,可以使用 runtime.Goexit() 函数来终止当前 Goroutine 的执行。
当调用 runtime.Goexit() 函数时,当前 Goroutine 会立即退出,不再执行后续的代码。同时,该函数还会将当前 Goroutine 的状态设置为“已结束”,并将其从调度队列中移除。这样,其他正在等待的 Goroutine 就可以继续执行,而不需要等待已经出现异常的 Goroutine 完成。
需要注意的是,runtime.Goexit() 函数只会终止当前 Goroutine 的执行,并不会关闭整个程序。也就是说,如果还有其他正在运行的 Goroutine,它们仍然会继续执行直到完成或遇到阻塞操作。
下面是一个简单的示例代码:
package main
import (
"fmt"
"runtime"
)
func main() {
fmt.Println("begin-----------------!")
go func() {
defer fmt.Println("A.defer") // defer:延迟执行,等协程退出当前代码块时执行
fmt.Println("A")
func() {
defer fmt.Println("B.defer")
fmt.Println("B")
runtime.Goexit() // 结束协程
defer fmt.Println("C.defer")
fmt.Println("C")
}()
}()
for { // 阻塞主协程,等待副协程运行完成
}
}
由输出结果可知,在函数 runtime.Goexit() 之前的 A 和 B 都正常输出了,结束协程后的 C 未输出。
2.3 runtime.GOMAXPROCS() 设置程序运行时可以使用的最大 CPU 核心数
在某些情况下,一个 Goroutine 可能会长时间占用 CPU 资源,导致其他 Goroutine 无法得到足够的执行时间。为了解决这个问题,Go 语言提供了 runtime.GOMAXPROCS 参数。当设置该变量的值时,Go 运行时会限制同时运行的 Goroutine 数量,使其不超过指定的 CPU 核心数。这样,每个 CPU 核心都可以被充分利用,从而提高程序的整体性能。
需要注意的是,runtime.GOMAXPROCS 变量的值应该根据实际的硬件配置和程序需求来设置。如果设置得过高,可能会导致系统资源的浪费;如果设置得过低,则可能无法充分发挥多核处理器的性能优势。
Go1.5 版本之前,默认使用的是单核心执行,从 Go1.5 版本开始,默认使用全部的 CPU 逻辑核心数,但都手动配置。
下面是一个简单的示例代码:
package main
import (
"fmt"
"runtime"
"time"
)
func a() {
fmt.Println("begin--a!")
for i := 1; i < 5; i++ {
fmt.Println("A:", i)
}
fmt.Println("end--a!")
}
func b() {
fmt.Println("begin--b!")
for i := 1; i < 5; i++ {
fmt.Println("B:", i)
}
fmt.Println("end--b!")
}
func main() {
fmt.Println("begin-----------------!")
runtime.GOMAXPROCS(1) // 通过配置数字,允许多个系统线程处理操作
go a()
go b()
time.Sleep(time.Second * 11)
fmt.Println("end-------------------!")
}
配置为 1 时的输出示例:(当 runtime.GOMAXPROCS 为 1 时,方法 a() 和 b() 是串行的,顺序执行)
配置为 2 时的输出示例:(当 runtime.GOMAXPROCS 为 2 时,方法 a() 和 b() 是并行的,同时开始执行)
注意:并行执行有时候不会生效。由于调度器的优化、操作系统的多任务处理能力以及运行时环境的调度策略,仍然可能观察到看似“按顺序”执行的输出。这是因为虽然指令并行发射和执行,但打印操作(fmt.Println)本身是同步的,并且多个 Goroutine 的输出可能在内部缓冲,然后以某种顺序刷新到标准输出。
三、Channel 通道
如果使用共享内存进行数据交换,那么共享内存存在不同的 Goroutine 中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
Go 语言的并发模型是 CSP(Communicating Sequential Processes 通信顺序进程),提倡通过通信共享内存,而不是通过共享内存而实现通信。CSP 是一种并发编程模型,它强调了并发实体之间的通信顺序,而不是它们执行的具体时序。
关于 CSP:在 CSP 模型中,进程是并发执行的基本单位,每个进程都有自己的独立执行序列。进程之间不共享内存,而是通过通道进行通信。通道是一种特殊的数据结构,用于在进程之间传递消息。进程通过发送和接收操作与通道进行交互,从而实现数据的交换和同步。CSP模型提供了一种清晰、简洁的并发编程范式,适用于构建各种类型的并发系统。它强调了并发实体之间的通信顺序,而不是它们执行的具体时序,这有助于提高并发程序的正确性和可靠性。
如果说 Goroutine 是 Go 程序并发的执行体,Channel 就是它们之间的连接。Channel 是可以让一个 Goroutine 发送特定值到另一个 Goroutine 的通信机制。
Go 语言中的通道(Channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明 Channel 的时候需要为其指定元素类型。
3.1 声明和创建 Channel
Channel 是一种类型,一种引用类型,空值是 nil。声明通道类型的格式如下:
// 声明
var 变量 chan 元素类型
// 示例
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递 int切片 的通道
fmt.Println(ch1) // 输出:<nil>
声明的通道后需要使用 make 函数初始化之后才能使用。
创建 Channel 的格式如下:
make(chan Type, value)
其中,Type 表示要传递的数据类型,value 表示 Channel 的缓冲区大小。如果不指定缓冲区大小,默认为 0,表示非缓冲通道,这个本文后续章节会详细介绍。
// 创建通道的几个示例:
ch1 := make(chan int) // 无缓冲
ch2 := make(chan int, 2) // 缓冲区为 2
ch3 := make(chan bool)
ch4 := make(chan []int)
3.2 Channel 的操作 send、receive、close
通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用 <- 符号。
// 创建 int 类型的通道
ch := make(chan int)
// send 发送数据进入通道
ch <- 10 // 把 10 发送到 ch 中
// receive 接收通道中的数据
x := <- ch // 从 ch 中接收值,并赋值给变量 x
<-ch // 从 ch 中接收值,忽略结果
// close 关闭通道,是指关闭了入口,取值不受影响
close(ch)
关于关闭通道,需要注意的是,只有在通知接收方 Goroutine 所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,所以关闭通道不是必须的。
关闭后的通道有以下特点:
- 对一个关闭的通道再发送值就会导致 panic;
- 对一个关闭的通道进行接收,会一直获取值,直到通道为空;
- 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值;
- 关闭一个已经关闭的通道会导致panic。
下面是一个关闭通道的示例代码:
package main
import (
"fmt"
"time"
)
func recv(c chan int) {
for {
ret := <-c
time.Sleep(time.Second) // 一秒钟接收一个值
fmt.Println("接收成功", ret)
}
}
func main() {
fmt.Println("begin-----------------!")
ch := make(chan int, 5)
go recv(ch) // 启用goroutine从通道接收值
for i := 1; i < 6; i++ {
ch <- i
}
fmt.Println("发送成功")
time.Sleep(time.Second * 2) // 两秒钟后关闭通道 ch
close(ch)
time.Sleep(time.Second * 6)
fmt.Println("end-------------------!")
}
由输出结果可知,在关闭通道后,接收方仍可以从通道内取值,当通道中没值时,返回零值。
3.3 Channel 通道的有缓冲和无缓冲
3.3.1 无缓冲的通道
无缓冲的通道又称为阻塞的通道。
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
上面这段代码能够通过编译,但是执行的时候会出现以下错误:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../main.go:9 +0x28
exit status 2
为什么会报错?因为上边使用 ch := make(chan int) 创建的是无缓冲的通道,只有在有对象接收值的时候才能发送值。就像快递员给你送大件的货,是必须要有人接收的,家里没人的话他就不送了。
由于代码中只有发送没有接收,所以在 ch <- 10 这一行代码形成死锁。
若想不发生死锁,就得再开一个协程来接收通道的值:
package main
import (
"fmt"
"time"
)
func recv(c chan int) {
ret := <-c
time.Sleep(time.Second) // 模拟耗时操作
fmt.Println("接收成功", ret)
}
func main() {
fmt.Println("begin-----------------!")
ch := make(chan int)
go recv(ch) // 启用 Goroutine 从通道接收值
ch <- 10
fmt.Println("发送成功")
time.Sleep(time.Second * 2) // 等待副协程操作完成
fmt.Println("end-------------------!")
}
无缓冲的通道如下图,在管道的两端必须一个发送,一个接收。
无缓冲通道上的发送操作会阻塞,直到另一个 Goroutine 在该通道上执行接收操作,这时值才能发送成功,两个 Goroutine 将继续执行。相反,如果接收操作先执行,接收方的 Goroutine 将阻塞,直到另一个 Goroutine 在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
3.3.2 有缓冲的通道
在无解暂时无接收方的时候,要想不报错就可以创建有缓冲的通道。
在使用 make 函数初始化通道的时候为其指定通道的容量,例如:
package main
import (
"fmt"
)
func main() {
fmt.Println("begin-----------------!")
ch := make(chan int, 1) // 创建一个容量为 1 的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
fmt.Println("end-------------------!")
}
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。
另外,可以使用内置的 len 函数获取通道内元素的数量,使用 cap 函数获取通道的容量。
package main
import (
"fmt"
)
func main() {
fmt.Println("begin-----------------!")
ch := make(chan int, 2) // 创建一个容量为 2 的有缓冲区通道
ch <- 10
fmt.Println("len:", len(ch))
fmt.Println("cap:", cap(ch))
fmt.Println("end-------------------!")
}
3.4 判断通道是否已经关闭
当通道已经关闭,再取值时,会返回零值,若程序仍然按照正常的流程去处理就会有问题,所以,就需要添加判断,来实时了解通道是否已经关闭。
下面是一段示例代码,其中包含了两种判断的方法:
package main
import (
"fmt"
)
func main() {
fmt.Println("begin-----------------!")
ch1 := make(chan int)
ch2 := make(chan int)
// 开启 Goroutine 将 0~5 的数发送到 ch1 中
go func() {
for i := 0; i < 6; i++ {
ch1 <- i
}
close(ch1)
}()
// 开启 Goroutine 从 ch1 中接收值,并将该值的平方发送到 ch2 中
go func() {
for {
i, ok := <-ch1 // 【方法一】通道关闭后再取值 ok=false
fmt.Println(ok)
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}()
// 在主 Goroutine 中从 ch2 中接收值打印
for i := range ch2 { // 【方法二】通道关闭后会退出 for range 循环
fmt.Println(i)
}
fmt.Println("end-------------------!")
}
其中 for range 方式更加常用。
3.5 单向通道
有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。Go 语言中提供了单向通道来处理这种情况。
下面是一段的示例代码:
package main
import (
"fmt"
)
// chan<- int 是一个只能发送的通道,可以发送但是不能接收
func counter(out chan<- int) {
for i := 0; i < 5; i++ {
out <- i
}
close(out)
}
// <-chan int 是一个只能接收的通道,可以接收但是不能发送
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) { // <-chan int 是一个只能接收的通道,可以接收但是不能发送
for i := range in {
fmt.Println(i)
}
}
func main() {
fmt.Println("begin-----------------!")
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
fmt.Println("end-------------------!")
}
在函数传参及任何赋值操作中,将双向通道转换为单向通道是可以的,但反过来是不可以的。
3.6 通道操作各种情况汇总
四、并发安全和锁
4.1 并发操作导致数据问题的出现
有时候在 Go 代码中可能会存在多个 Goroutine 同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。
下面是一段示例代码,新分配两个副协程一起执行 add() 函数:
package main
import (
"fmt"
"sync"
)
var x int64
var wg sync.WaitGroup
func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add() // 副协程一
go add() // 副协程二
wg.Wait()
fmt.Println(x)
}
执行结果:
上述代码执行了三次,每次的结果都不同,明显已经偏离了目标。
在 add() 函数中,对全局变量 x 的访问和修改没有进行同步保护,这可能导致竞态条件(race condition)。当两个 Goroutine 同时访问和修改 x 时,它们的操作可能会相互干扰,导致结果不正确。如何解决呢,下面来介绍下互斥锁。
4.2 互斥锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间,仅有一个 Goroutine 可以访问共享资源,其他 Goroutine 等待。当有多个 Goroutine 同时等待一个锁时,唤醒的策略是随机的。
Go 语言中的互斥锁(Mutex)是一种同步原语,用于保护共享资源的访问,可以使用 sync.Mutex 结构体来实现互斥锁。
下面代码就是对 4.1 中代码的优化,添加了 sync.Mutex 互斥锁:
package main
import (
"fmt"
"sync"
)
var x int64
var wg sync.WaitGroup
var mutex sync.Mutex // 互斥锁的声明
func add() {
for i := 0; i < 5000; i++ {
mutex.Lock() // 加锁
x = x + 1
mutex.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
输出结果始终为正确数值:
4.3 互斥锁的另一种情况:读写锁
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用 sync.RWMutex 类型。
读写锁分为两种:读锁和写锁。
- 当一个 Goroutine 获取读锁之后,其他的 Goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;
- 当一个 Goroutine 获取写锁之后,其他的 Goroutine 无论是获取读锁还是写锁都会等待。
下面是一个可以操作互斥锁和读写锁操作效率对比的示例代码:
package main
import (
"fmt"
"sync"
"time"
)
var (
x int64
wg sync.WaitGroup
mutex sync.Mutex
rwmutex sync.RWMutex
)
func write() {
// mutex.Lock() // 加互斥锁
rwmutex.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设写操作耗时 10 毫秒
rwmutex.Unlock() // 解写锁
// mutex.Unlock() // 解互斥锁
wg.Done()
}
func read() {
// mutex.Lock() // 加互斥锁
rwmutex.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwmutex.RUnlock() // 解读锁
// mutex.Unlock() // 解互斥锁
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
// fmt.Println("互斥锁---------:",end.Sub(start))
fmt.Println("互斥锁-读写锁--:", end.Sub(start))
}
如下图,分别是用互斥锁和读写锁的时效,明显看出,当读多写少(1000:10)时,效果还是比较明显的:
特别注意,读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
五、sync 包
5.1 sync.WaitGroup:等待一组 Goroutine 执行完毕
在代码中生硬的使用 time.Sleep 肯定是不合适的,Go 语言中可以使用 sync.WaitGroup 来实现并发任务的同步。
sync.WaitGroup有以下几个方法:
方法名 | 功能 |
(wg * WaitGroup) Add(delta int) | 计数器 +delta |
(wg *WaitGroup) Done() | 计数器 -1 |
(wg *WaitGroup) Wait() | 阻塞直到计数器变为 0 |
sync.WaitGroup 内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了 N 个并发任务时,就将计数器值增加 N。每个任务完成时通过调用 Done() 方法将计数器减 1。通过调用 Wait() 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经全部完成。
下面是一段示例代码:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello() {
defer wg.Done() // 执行完后续代码后,计数器 -1
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1) // 计数器 +1
go hello() // 启动另外一个 Goroutine 去执行 hello() 函数
fmt.Println("main Goroutine Done!")
wg.Wait() // 等待计数器为 0
}
另外,要注意 sync.WaitGroup 是一个结构体,传递的时候要传递指针。
5.2 sync.Once:使得指定函数在并发环境下仅执行一次
在编程的很多场景下,我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、连接数据库、只关闭一次通道等。
Go 语言中的 sync 包中提供了一个针对只执行一次场景的解决方案,就是:sync.Once。
sync.Once 只有一个 Do 方法,其签名如下:
func (o *Once) Do(f func()) {}
延迟执行一个开销很大的初始化操作,到真正用到它的时候再执行,是一个很好的思路。因为预先初始化一个变量(比如在 init 函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就是非必须的。
下面是一个加载配置文件的示例代码:
package main
import (
"encoding/json"
"fmt"
"io"
"os"
"sync"
)
var once sync.Once
var config map[string]string
var wait sync.WaitGroup
func main() {
fmt.Println("begin-----------------!")
for i := 0; i < 5; i++ {
wait.Add(1)
go loadConfig()
}
wait.Wait()
fmt.Println("end-------------------!")
}
func loadConfig() {
fmt.Println("尝试加载配置文件")
once.Do(func() {
file, err := os.Open("config.json") // 配置文件和 main.go 同目录,直接读取
if err != nil {
panic(err)
}
defer file.Close()
bytes, err := io.ReadAll(file)
if err != nil {
panic(err)
}
config = make(map[string]string) // 声明保存配置文件的内容,键值对形式
err = json.Unmarshal(bytes, &config)
fmt.Println(config)
fmt.Println("配置文件已加载")
if err != nil {
panic(err)
}
})
wait.Done()
}
json 配置文件的内容示例:
{"key1":"value1","key2":"value2","key3":"value3"}
输出结果:(由结果可知,尽管有多次尝试加载配置文件,但是最后还是只加载了一次)
sync.Once 其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会被执行多次。
5.3 sync.Map:并发安全的无序键值对 map
Go 语言中内置的 map(map 是一种无序的键值对数据结构) 不是并发安全的。
请看下面关于 map 并发测试报异常的示例:
package main
import (
"fmt"
"strconv"
"sync"
)
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
fmt.Println("begin-----------------!")
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
fmt.Println("end-------------------!")
}
如下输出结果:(当同时开三个协程时,有时可以正常输出,有时就报错了)
像上边这种场景下,就需要为 map 加锁来保证并发的安全性了,Go 语言的 sync 包中提供了一个开箱即用的并发安全版 map:sync.Map。开箱即用表示不用像内置的 map 一样使用 make 函数初始化才能使用。
同时 sync.Map 内置了必要的操作方法和属性,如下:
- Store(key, value interface{}):存储键值对。如果键已经存在,更新其对应的值;
- Load(key interface{}) (value interface{}, ok bool):获取键对应的值。如果键不存在,返回 nil 和一个布尔值 false;如果键存在,返回键对应的值和一个布尔值 true;
- Delete(key interface{}):删除键及其对应的值;
- Range(f func(key, value interface{}) bool):遍历,对于每个键值对,调用传入的函数 f。如果函数返回 false,停止遍历;
- Len() int:键值对的数量。
下面是运用 sync.Map 对上部分代码的优化,确保了程序的稳定运行:
package main
import (
"fmt"
"strconv"
"sync"
)
var m = sync.Map{}
func main() {
fmt.Println("begin-----------------!")
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n) // Store 存储
value, _ := m.Load(key) // Load 获取
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
fmt.Println("end-------------------!")
}
参考: https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/