golang channel底层结构和实现

时间:2022-10-04 17:10:24

Golang 设计模式: 不要通过共享内存来通信,而要通过通信实现内存共享

通信顺序模型(communication sequential processes, CSP)的并发模式,可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制

channel中的数据遵循先入先出(First In First Out)的规则,保证收发数据的顺序

 

二、结构

channel的源码在runtime包下的chan.go文件, 参见chan.go

 以下时channel的部分结构:

type hchan struct {
	qcount   uint
	dataqsiz uint
	buf      unsafe.Pointer
	elemsize uint16
	closed   uint32
	elemtype *_type
	sendx    uint
	recvx    uint
	recvq    waitq
	sendq    waitq
	lock mutex
}


type waitq struct {
	first *sudog
	last  *sudog
}

 

其中:

qcount: 队列中剩余的元素个数

dataqsiz: 环形队列长度,即可以存放的元素个数, make初始化时指定

buf: 缓存区,实际上就是环形队列(有环形队列就有缓冲区,否则没有缓冲区),指向环形队列首部的指针,基于环形队列实现,大小等于make初始化channel时指定的环形队列长度,如果make初始化channel时不指定dataqsiz,则buf=0。只有缓冲型的channel才有buf

elemsize: 每个元素的大小

closed: channel关闭标志

elemtype: 元素类型

sendx: 写入数据的索引,即从哪个位置开始写入数据,取值[0, dataqsiz)

recvx: 读取数据的索引,即从哪个位置开始读取数据,取值[0, dataqsiz)

recvq: 接收等待队列,链表结构,长度无限长, 读取数据的goroutine等待队列, 如果channel的缓冲区为空或者没有缓冲区,读取数据的goroutine被阻塞,加入到recvq等待队列中。因读阻塞的goroutine会被向channel写入数据的goroutine唤醒

sendq: 发送等待队列,链表结构,长度无限长, 写入数据的goroutine等待队列, 如果channel的缓冲区为满或者没有缓冲区,写入数据的goroutine被阻塞,加入到sendq等待队列中。因写阻塞的goroutine会被从channel读取数据的goroutine唤醒

lock: 并发控制锁, 同一时刻,只允许一个, channel不允许并发读写

 

1 结构图

golang channel底层结构和实现

 

 

其中:

环形队列中的0表示没有数据,1表示有数据; G表示一个goroutine
dataqsiz表示环形队列的长度为6, 即可缓存6个元素
buf指向环形队列首部,此时还可以缓存2个元素
qcount表示环形队列中有4个元素
sendx表示下一个发送的数据在环形队列index=5的位置写入,取值[0, 6)
recvx表示从环形队列index=1的位置读取数据,取值[0, 6)
sendq, recvq: 虚线表示,此时转态下的channel可能有等待队列

 

三、channel的创建

1 声明channel类型

//同时读写的channel
var 变量 chan 类型

//只能写入数据的channel
var 变量 chan<- 类型

//只能读取数据的channel
var 变量 <-chan 类型 

其中:

类型:channel内的数据类型,golang支持的合法类型

声明的channel此时还是nil,需要配合make函数初始化之后才能使用

 

2 创建channel

//无缓冲的channel
变量 := make(chan 数据类型)

//有缓冲的channel
变量 := make(chan 数据类型, dataqsiz)

  

四、向channel发送数据

1 发送数据的格式

变量 <- 值

 

2 写数据的过程

1) 流程图如下:

golang channel底层结构和实现

 

其中:

G表示一个goroutine
虚线表示sendq中堵塞的G被唤醒的流程,如果G没有被唤醒,则一直堵塞下去,此时关闭channel,会触发panic

 

2) 过程描述:

1) 如果channel是nil(没有初始化), 发送数据则一直会堵塞,这是一个BUG
2) 如果等待接收队列recvq 不为空,说明没有缓冲区或者缓冲区没有数据,直接从recvq取出一个G数据写入,把G唤醒,结束发送过程
3) 如果等待接收队列recvq为空,且缓冲区有空位,那么就直接将数据写入缓冲区sendx位置, sendx++, qcount++, 结束发送过程
4) 如果等待接收队列recvq为空,缓冲区没有空位,将数据写入G,然后把G放到等待发送队列sendq中进行阻塞,等待被唤醒, 结束发送过程。当被唤醒的时候,需要写入的数据已经被读取出来,且已经完成了写入操作

 

五、从channel接收数据

1 接收数据的格式

1) 阻塞接收数据

程序阻塞直到收到数据并赋值

data := <-ch  

 

2) 非阻塞接收数据

非阻塞的通道接收方法可能造成高的 CPU 占用

//ok表示是否接收到数据
data, ok := <-ch

  

3) 接收数据并忽略

程序阻塞直到接收到数据,但接收到的数据会被忽略

<-ch

  

4) 循环接收

channel是可以进行遍历的,遍历的结果就是接收到的数据

for data := range ch {
    //done
}

 

5) SELECT语句接收

select 的特点是只要其中有一个 case 已经完成,程序就会继续往下执行,而不会考虑其他 case 的情况
在一个 select 语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语
如果其中的多条case语句可继续执行(即没有被阻塞),那么就从这些case语句中任意选择一条
如果没有case语句可以执行(即所有的通道都被阻塞):
  1) 如果有 default 语句,执行 default 语句,同时程序的执行会从 select 语句后的语句中恢复
  2) 如果没有 default 语句,那么 select 语句将被阻塞,直到至少有一个case可以进行下去

 

select {
    case <- chan1:
       //done
    case chan2 <- 2:
       //done
    default:
       //done
}

  

2 读取数据的流程

1) 流程图如下:

golang channel底层结构和实现

其中:

G表示一个goroutine
虚线表示recvq中堵塞的G被唤醒的流程,如果G没有被唤醒,则一直堵塞下去,此时关闭channel,会得到channel类型的零值

 

2) 过程描述:

1 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,读取数据,最后把G唤醒,结束读取过程
2 如果等待发送队列sendq不为空,有缓冲区(此时缓冲区满了),从缓冲区中首部读出数据,把sendq出列的G中数据写入缓冲区尾部,把G唤醒,结束读取过程
3 如果等待发送队列sendq为空,且环形队列无元素,将goruntime加入等待接收队列recvq中进行堵塞,等待被唤醒
4 如果等待发送队列sendq为空,环形队列有元素,直接从缓冲区读取数据,结束读取过程

六、关闭channel

1 格式

close(ch)

 

2 过程描述

1) 首先校验chan是否已被初始化,然后加锁之后再校验是否已被关闭过,如果校验都通过了,那么将closed字段设值为1
2) 遍历recvq和sendq,并将所有的goroutine 加入到glist中
3) 将所有glist中的goroutine加入调度队列,等待被唤醒
4) recvq中的goroutine接收到对应数据的零值,sendq中的goroutine会直接panic

 

七、channel发送、接收数据过程可能产生的问题

1 向一个nil的channel发送/读取数据会一直堵塞下去?该如何唤醒?

会一直堵塞下去,不会被唤醒,可能会造成泄露,这是一个BUG

 

2 等待发送队列(sendq)中有数据,如果一直没有gouruntine从channel里面读数据会不会造成泄漏?

会造成泄露,channel用完了,最好要close

 

3 向已经关闭的channel读/写数据会发生什么?

写已经关闭的 channel 会触发panic

读已经关闭的 channel,能一直读到数据:

  1) 如果 channel 关闭前,buf内有元素还未读,会正确读到 channel 内的值,且返回的第二个 bool 值为 true

  2) 如果 channel 关闭前,buf内有元素已经被读完,channel 内无值,返回 channel 元素的零值,第二个 bool 值为 false

 

4 触发 panic 的三种情况

1) 向一个关闭的 channel 进行写操作

2) 关闭一个为 nil 的 channel

3) 重复关闭一个 channel