Go 数据类型 | channel 进阶(上)
// $GOROOT/src/runtime/
// chanrecv 从通道 c 接收数据并将接收到的数据写入 ep。
// ep 可能为 nil,这种情况表示忽略接收到的数据。
// 如果 block == false (非阻塞模式)并且通道中没有元素可用,返回 (false, false)。
// 否则,如果 c 已关闭,则将 *ep 清零并返回 (true, false)。
// 否则,将 *ep 填充为一个元素并返回 (true, true)。
// 非空的 ep 必须指向堆或调用者的栈。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: 不需要检查 ep,因为它始终在栈上或由 reflect 分配的新内存。
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
// c 是 nil,非阻塞模式返回 (false, false)。
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 快速路径:在不获取锁的情况下检查失败的非阻塞操作。
if !block && empty(c) {
// 在观察到 channel 没有准备好接收操作之后,我们观察 channel 是否已经关闭。
//
// 重新排序这些检查可能导致与关闭冲突时出现错误的行为。
// 例如,如果 channel 是打开的且不为空,被关闭,然后被耗尽,重新排序后的读取可能错误地指示“打开且为空”。
// 为了防止重新排序,我们对这两种检查都使用原子加载,并依赖清空和关闭发生在同一锁下的不同临界区。
// 当关闭带有阻塞发送的无缓冲 channel 时,此假设失败,但无论如何这是一个错误条件。
if atomic.Load(&c.closed) == 0 {
// 由于 channel 不可重新打开,所以最近一次观察到 channel 没有关闭也就意味着第一次观察时也没有关闭。
// 我们的行为就好像我们在那个时刻观察到了 channel,并报告接收无法继续。
//
// channel 没有关闭,直接返回 (false, false)。
return
}
// Sequential consistency is also required here, when racing with such a send.
// channel 的关闭是不可逆的。这里重新检查 channel 是否有任何待接收的数据,是因为这些数据可能在上述判空和关闭检查之间到达。
// 这里需要顺序一致性,当出现发送竞争时。
if empty(c) {
// channel 不可逆地关闭且为空。
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 如果 channel 关闭,并且 channel 中没有元素,那么返回 (true, false)。
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果发送队列存在发送者,返回 (true, true)。
if sg := c.sendq.dequeue(); sg != nil {
// 找到一个等待的发送者。如果缓冲区大小为 0,则直接接收该发送者的值。
// 否则,从队列头部接收,并将发送者的值添加到队列的尾部(两者都映射到相同的缓存槽,因为队列已满)。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果 channel 有缓冲数据,则直接从队列中接收数据,然后返回 (true, true)。
if c.qcount > 0 {
// 直接从队列接收
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 如果为非阻塞,此时没有数据可用,返回 (false, false)。
if !block {
unlock(&c.lock)
return false, false
}
// 没有可用的发送者:在该 channel 上阻塞。
gp := getg() // 获取当前的 goroutine
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 在 上分配 elem 和将 mysg 入队之间没有栈分割,
// 这样 copystack 可以找到它。
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
// sudog 节点记录 goroutine。
mysg.g = gp
mysg.isSelect = false
// 记录等待的 channel。
mysg.c = c
gp.param = nil
// 将自己的 sudog(等待节点)放入通道的接收队列中。
c.recvq.enqueue(mysg)
// 向任何试图缩小我们的栈的人发出信号,表明我们将要在一个通道上停靠。
// 当这个 G 的状态发生变化和我们设置 之间的窗口对于堆栈收缩是不安全的。
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 有人唤醒了我们
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}