Go语言设计与实现 -- Channel

时间:2023-01-12 13:51:21

稍微需要注意一点的用法

类型断言

type dog struct {
	Name  string
	Color string
}

func main() {
	allChan := make(chan any, 10)
	allChan <- dog{Name: "lxy", Color: "yellow"}
	// 如果你这么写代码的话,你虽然拿到了一条狗,但是你拿不到它的任何属性和方法
	// dog1 := <-allChan
	dog1 := (<-allChan).(dog)
	fmt.Println(dog1.Name)
	fmt.Println(dog1.Color)
	fmt.Println(dog1)
}

for range时遇到的一些问题

type dog struct {
	Name  string
	Color string
}

func main() {
	allChan := make(chan any, 10)
	allChan <- dog{Name: "lxy", Color: "yellow"}
	allChan <- -1
	for val := range allChan {
		fmt.Println(val)
	}
}

运行结果如下:

{lxy yellow}
-1
fatal error: all goroutines are asleep - deadlock!

原因分析

这个for循环只有在管道关闭了之后才会结束,而管道没有被关闭,因此永远不可能结束,报出错误。

改成后的代码如下:

type dog struct {
	Name  string
	Color string
}

func main() {
	allChan := make(chan any, 10)
	allChan <- dog{Name: "lxy", Color: "yellow"}
	allChan <- -1
	close(allChan)
	for val := range allChan {
		fmt.Println(val)
	}
}

原理剖析

Go语言中也能使用共享内存加互斥锁进行通信,但是Go语言提供了一种不同的并发模型 – 通信顺序进程(CSP)。

GoroutineChannel分别对应CSP中实体和传递信息的媒介,Goroutine之间会通过Channel传递数据。

今天来讲解Channel,它的收发操作均遵循先进先出,并且是线程安全的,关于线程安全,我们一般使用锁来解决。

锁是一种常见的并发控制技术,我们一般将锁分成乐观锁悲观锁,即乐观并发控制和悲观并发控制。无锁队列更准确的描述是使用乐观并发控制的队列。但是乐观锁和悲观锁是有本质区别的,乐观锁不是真正的锁,只是一种并发控制思想。

乐观并发控制本质上是基于验证的协议,我们使用原子指令CAS在多线程间同步数据,无锁队列也依赖这一个原子指令。

然而Channel目前还是有锁队列,哈哈,它在无锁化的过程中遇到了一点问题,至今还有解决。

数据结构

type hchan struct {
   qcount   uint           // 元素个数
   dataqsiz uint           // 循环队列的长度
   buf      unsafe.Pointer // 缓冲区数据指针
   elemsize uint16 // 能够收发的元素大小
   closed   uint32
   elemtype *_type // 能够收发的元素类型
   sendx    uint   // 发送操作处理到的位置
   recvx    uint   // 接收操作处理到的位置
   recvq    waitq  // 由于缓冲区不足而阻塞的Goroutine读队列
   sendq    waitq  // 由于缓冲区不足而阻塞的Goroutine写队列

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}

这缓冲区是环形的

创建Channel

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

这个代码会根据Channel中收发元素的类型和缓冲区大小初始化runtime.hchan和缓冲区。

  • 如果当前Channel中不存在缓冲区,那么只会为runtime.hchan分配一块内存空间
  • 如果当前Channel中存储的类型不是指针类型,会为当前Channel和底层数组分配一块儿连续的空间
  • 默认情况下会单独为runtime.hchan和缓冲区分配内存
  • 最后统一更新各类字段

发送数据

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   if c == nil {
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   if debugChan {
      print("chansend: chan=", c, "\n")
   }

   if raceenabled {
      racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
   }

   // Fast path: check for failed non-blocking operation without acquiring the lock.
   //
   // After observing that the channel is not closed, we observe that the channel is
   // not ready for sending. Each of these observations is a single word-sized read
   // (first c.closed and second full()).
   // Because a closed channel cannot transition from 'ready for sending' to
   // 'not ready for sending', even if the channel is closed between the two observations,
   // they imply a moment between the two when the channel was both not yet closed
   // and not ready for sending. We behave as if we observed the channel at that moment,
   // and report that the send cannot proceed.
   //
   // It is okay if the reads are reordered here: if we observe that the channel is not
   // ready for sending and then observe that it is not closed, that implies that the
   // channel wasn't closed during the first observation. However, nothing here
   // guarantees forward progress. We rely on the side effects of lock release in
   // chanrecv() and closechan() to update this thread's view of c.closed and full().
   if !block && c.closed == 0 && full(c) {
      return false
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   lock(&c.lock)

   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)
      if raceenabled {
         racenotify(c, c.sendx, nil)
      }
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }

   if !block {
      unlock(&c.lock)
      return false
   }

   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set gp.activeStackChans is not safe for
   // stack shrinking.
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   if closed {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   return true
}

这个函数比较复杂,所以我们简单介绍一下:

  • 在发送逻辑执行之前,会为当前Channel加锁,防止多个线程并发修改数据
  • 当存在等待的接收者时,将发送的数据写入Channel的缓冲区
  • 当不存在缓冲区或者缓冲区已经满的时候,等待其他GoroutineChannel接收数据

直接发送

如果目标Channel没有被关闭并且已经有处于读等待的Goroutine,那么runtime.chansend会从接收队列recvq中取出最先陷入等待的Goroutine并直接向它发送数据。如图所示:

Go语言设计与实现 -- Channel

该图来自于面向信仰编程

我们之前讲解过了,recvpwaitq类型的,然后我们来看一下waitq类型的结构体。

type waitq struct {
   first *sudog
   last  *sudog
}

runtime.sudog表示一个在等待列表中的Goroution,该结构中存储了两个分别指向前后runtime.sudog指针以构成链表。

发送数据的时候会调用runtime.send

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if raceenabled {
      if c.dataqsiz == 0 {
         racesync(c, sg)
      } else {
         // Pretend we go through the buffer, even though
         // we copy directly. Note that we need to increment
         // the head/tail locations only when raceenabled.
         racenotify(c, c.recvx, nil)
         racenotify(c, c.recvx, sg)
         c.recvx++
         if c.recvx == c.dataqsiz {
            c.recvx = 0
         }
         c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
      }
   }
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1)
}

这个函数的执行可以分成两个部分:

  • 调用runtime.sendDirect将发送的数据直接复制到x = <-c表达式中变量x所在的内存地址上。
  • 调用runtime.goready将等待接收数据的Goroutine标记成可运行状态Grunnable,并该Goroutine放到发送方所在处理器的runnext上等待执行,该处理器在下次调度的时候会立即唤醒数据的接收方。发送数据的过程只是将接收方的Gouroutine放到了处理器的runnext中,并没有立即执行

缓冲区

如果创建的Channel包含缓冲区并且Channel中的数据没有装满。

首先,我们会使用runtime.chanbuf计算出下一个可以存储数据的位置,然后通过runtime.typememmove将发送的数据复制到缓冲区中,并增加sendx索引和qcount计数器。

if c.qcount < c.dataqsiz {
   // Space is available in the channel buffer. Enqueue the element to send.
   qp := chanbuf(c, c.sendx)
   if raceenabled {
      racenotify(c, c.sendx, nil)
   }
   typedmemmove(c.elemtype, qp, ep)
   c.sendx++
   if c.sendx == c.dataqsiz {
      c.sendx = 0
   }
   c.qcount++
   unlock(&c.lock)
   return true
}

Go语言设计与实现 -- Channel

图片来自于面向信仰编程

如果当前Channel的缓冲区未满,向Channel发送的数据会存储在Channelsendx索引所在的位置并将sendx索引加一。因为这里的buf是一个循环数组,所以当sendx等于dataqsiz时会重新回到数组开始的位置。

阻塞发送

Channel没有接收能够处理数据时,向Channel发送数据会被下游阻塞。

if !block {
   unlock(&c.lock)
   return false
}

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
   mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
   throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
   blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
   if c.closed == 0 {
      throw("chansend: spurious wakeup")
   }
   panic(plainError("send on closed channel"))
}
return true

该函数的调用过程如下:

  • 调用 runtime.getg 获取发送数据使用的 Goroutine;
  • 执行 runtime.acquireSudog 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 select 中和待发送数据的内存地址等;
  • 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪;
  • 调用 runtime.goparkunlock 将当前的 Goroutine 陷入沉睡等待唤醒;
  • 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放 runtime.sudog 结构体;

函数在最后会返回 true 表示这次我们已经成功向 Channel 发送了数据。

接收数据

我们接下来继续介绍 Channel 操作的另一方:接收数据。Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:

i <- ch
i, ok <- ch

Go

这两种不同的方法经过编译器的处理都会变成 ORECV 类型的节点,后者会在类型检查阶段被转换成 OAS2RECV 类型。数据的接收操作遵循以下的路线图:

Go语言设计与实现 -- Channel

图片来自于面向信仰编程

图 6-22 Channel 接收操作的路线图

虽然不同的接收方式会被转换成 runtime.chanrecv1runtime.chanrecv2 两种不同函数的调用,但是这两个函数最终还是会调用 runtime.chanrecv

接收数据包含3种场景:

  • 当存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据;
  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据;
  • 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据;

直接接收

当 Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时,该函数会取出队列头等待的 Goroutine,处理的逻辑和发送时相差无几,只是发送数据时调用的是 runtime.send 函数,而接收数据时使用 runtime.recv

我们来看一下源码:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if c.dataqsiz == 0 {
      if raceenabled {
         racesync(c, sg)
      }
      if ep != nil {
         // copy data from sender
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // Queue is full. Take the item at the
      // head of the queue. Make the sender enqueue
      // its item at the tail of the queue. Since the
      // queue is full, those are both the same slot.
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         racenotify(c, c.recvx, nil)
         racenotify(c, c.recvx, sg)
      }
      // copy data from queue to receiver
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // copy data from sender to queue
      typedmemmove(c.elemtype, qp, sg.elem)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   }
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1)
}
  • 如果Channel不存在缓冲区

    Channel发送队列中的Goroutine存储的数据复制到目标内存地址中

  • 如果存在缓冲区

    • 将队列中的数据复制到接收方的内存地址中
    • 将发送队列头的数据复制到缓冲区中,释放一个阻塞的发送方

Go语言设计与实现 -- Channel

图 6-23 从发送队列中获取数据

上图展示了 Channel 在缓冲区已经没有空间并且发送队列中存在等待的 Goroutine 时,运行 <-ch 的执行过程。发送队列头的 runtime.sudog 中的元素会替换接收索引 recvx 所在位置的元素,原有的元素会被拷贝到接收数据的变量对应的内存空间上。

缓冲区

当缓冲区中已经包含数据时,从Channel中接收数据会直接从缓冲区中recvx的索引位置取出数据进行处理:

if c.qcount > 0 {
   // Receive directly from queue
   qp := chanbuf(c, c.recvx)
   if raceenabled {
      racenotify(c, c.recvx, nil)
   }
   if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
   }
   typedmemclr(c.elemtype, qp)
   c.recvx++
   if c.recvx == c.dataqsiz {
      c.recvx = 0
   }
   c.qcount--
   unlock(&c.lock)
   return true, true
}

如果接收数据的内存地址不为空,那么会使用typedmemmove将缓冲区中的数据复制到内存中,清除队列中的数据并完成收尾工作。

Go语言设计与实现 -- Channel

图 6-24 从缓冲区中接接收数据

收尾工作包括递增 recvx,一旦发现索引超过了 Channel 的容量时,会将它归零重置循环队列的索引;除此之外,该函数还会减少 qcount 计数器并释放持有 Channel 的锁。

阻塞接收

当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if !block {
		unlock(&c.lock)
		return false, false
	}

	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	gp.waiting = mysg
	mysg.g = gp
	mysg.c = c
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	releaseSudog(mysg)
	return true, !closed
}

在正常的接收场景中,我们会使用 runtime.sudog 将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。

完成入队之后,上述代码还会调用 runtime.goparkunlock 立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。

关闭Channel

recvqsendq两个队列中的数据加入Goroutine列表gList中,于此同时清除runtime.sudog上所有未被处理的元素。

参考文章:《Go语言设计与实现 – Channel》