Go 数据类型 | channel 进阶(上)

时间:2025-03-30 15:47:25
// $GOROOT/src/runtime/ // chanrecv 从通道 c 接收数据并将接收到的数据写入 ep。 // ep 可能为 nil,这种情况表示忽略接收到的数据。 // 如果 block == false (非阻塞模式)并且通道中没有元素可用,返回 (false, false)。 // 否则,如果 c 已关闭,则将 *ep 清零并返回 (true, false)。 // 否则,将 *ep 填充为一个元素并返回 (true, true)。 // 非空的 ep 必须指向堆或调用者的栈。 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: 不需要检查 ep,因为它始终在栈上或由 reflect 分配的新内存。 if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { // c 是 nil,非阻塞模式返回 (false, false)。 if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 快速路径:在不获取锁的情况下检查失败的非阻塞操作。 if !block && empty(c) { // 在观察到 channel 没有准备好接收操作之后,我们观察 channel 是否已经关闭。 // // 重新排序这些检查可能导致与关闭冲突时出现错误的行为。 // 例如,如果 channel 是打开的且不为空,被关闭,然后被耗尽,重新排序后的读取可能错误地指示“打开且为空”。 // 为了防止重新排序,我们对这两种检查都使用原子加载,并依赖清空和关闭发生在同一锁下的不同临界区。 // 当关闭带有阻塞发送的无缓冲 channel 时,此假设失败,但无论如何这是一个错误条件。 if atomic.Load(&c.closed) == 0 { // 由于 channel 不可重新打开,所以最近一次观察到 channel 没有关闭也就意味着第一次观察时也没有关闭。 // 我们的行为就好像我们在那个时刻观察到了 channel,并报告接收无法继续。 // // channel 没有关闭,直接返回 (false, false)。 return } // Sequential consistency is also required here, when racing with such a send. // channel 的关闭是不可逆的。这里重新检查 channel 是否有任何待接收的数据,是因为这些数据可能在上述判空和关闭检查之间到达。 // 这里需要顺序一致性,当出现发送竞争时。 if empty(c) { // channel 不可逆地关闭且为空。 if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) // 如果 channel 关闭,并且 channel 中没有元素,那么返回 (true, false)。 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 如果发送队列存在发送者,返回 (true, true)。 if sg := c.sendq.dequeue(); sg != nil { // 找到一个等待的发送者。如果缓冲区大小为 0,则直接接收该发送者的值。 // 否则,从队列头部接收,并将发送者的值添加到队列的尾部(两者都映射到相同的缓存槽,因为队列已满)。 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 如果 channel 有缓冲数据,则直接从队列中接收数据,然后返回 (true, true)。 if c.qcount > 0 { // 直接从队列接收 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } // 如果为非阻塞,此时没有数据可用,返回 (false, false)。 if !block { unlock(&c.lock) return false, false } // 没有可用的发送者:在该 channel 上阻塞。 gp := getg() // 获取当前的 goroutine mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 在 上分配 elem 和将 mysg 入队之间没有栈分割, // 这样 copystack 可以找到它。 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg // sudog 节点记录 goroutine。 mysg.g = gp mysg.isSelect = false // 记录等待的 channel。 mysg.c = c gp.param = nil // 将自己的 sudog(等待节点)放入通道的接收队列中。 c.recvq.enqueue(mysg) // 向任何试图缩小我们的栈的人发出信号,表明我们将要在一个通道上停靠。 // 当这个 G 的状态发生变化和我们设置 之间的窗口对于堆栈收缩是不安全的。 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 有人唤醒了我们 if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }