go语言圣经第八章(读书笔记)

时间:2021-11-16 17:53:28

第八章 Goroutines和Channels

  • 备注:
    1.这一部分开始,原书gopl开始出现较多的中文错误
    2.这一章节有大量例子,此处省去

Goroutines

  • 在Go语言中,每一个并发的执行单元叫做一个goroutine
  • 当一个程序启动,其主函数将在一个单独的goroutine中运行,叫做main goroutine
  • 可以使用go关键字来创建新的goroutine
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
  • 除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个goroutine来打断另一个的执行

Channels

  • 如果说goroutine是Go语言程序的并发体的话,那么channels就是它们之间的通信机制
  • 每一个channel都有一个特殊的类型,也就是channel可发送数据的类型
ch := make(chan int) // ch has type 'chan int'
  • 和map类似,chan也是使用make创建的底层数据结构的引用
  • 当我们复制一个chan或用于函数传参时,只是拷贝了一个chan的引用
  • chan类型的零值是nil
  • 两个相同类型的chan可以使用==运算符进行比较:
    1.如果两个chan引用的是相同的对象,那么比较的结果为真
    2.一个chan也可以和nil进行比较
  • 一个channel有发送和接受两种操作,都是通信行为:
    1.一个发送语句将一个值从一个goroutine通过channel发送到另一个执行接收操作的goroutine
    2.发送和接收都用<-运算符
    3.发送语句中,channel在左,要发送的值在右
    4.接受语句中,channel在右,如果使用变量接收,需要用到=
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
  • channel还有close操作,使用close操作后:
    1.基于该channel的任何发送操作都将导致panic
    2.1.基于该channel的任何接收操作依然可以接收之前已经成功发送的数据
    2.2.如果channel中已经没有数据,将产生一个零值的数据
close(ch)
  • 使用make创建chan,可以指定参数来实现一个有缓冲的channel(默认是无缓冲的)
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3

不带缓存的Channel

  • 基于无缓存的Chan之间:
    1.如果发送者goroutine的发送操作先执行,那么发送者goroutine将被阻塞,直到另一个接收者goroutine在相同的Chan上执行接收操作。
    2.如果接收者goroutine的接收操作先发生,那么接收者goroutine将被阻塞,直到有另一个发送者goroutine在相同的Chan上执行发送操作
    3.当发送的值被成功接收后,两个goroutine才可以继续执行后续语句
  • 基于无缓存的Chan的发送和接收操作将导致两个goroutine做一次同步操作,因此这个无缓存的Chan也被称为同步Chan
  • 当通过一个无缓存的Chan发送数据时,接收者会先被唤醒,然后接收数据(详细需要深究Go语言的并发内存模型,happens before概念)
  • 一个基本问题:主的goroutine的完成通常不会等待其它goroutine完成
  • 可以使用一个chan来解决这个同步问题
func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})
    go func() {
        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
        log.Println("done")
        done <- struct{}{} // signal the main goroutine
    }()
    mustCopy(conn, os.Stdin)
    conn.Close()
    <-done // wait for background goroutine to finish
}

串联的Channels(Pipeline)

  • Channels页可以用于将多个goroutine链接在一起
    go语言圣经第八章(读书笔记)
func main() {
    naturals := make(chan int)
    squares := make(chan int)
    // Counter
    go func() {
    for x := 0; ; x++ {
        naturals <- x
    }
    }()
    // Squarer
    go func() {
    for {
        x := <-naturals
        squares <- x * x
    }
    }()
    // Printer (in main goroutine)
    for {
        fmt.Println(<-squares)
    }
}
  • "像这样的串联Channels的管道(Pipelines)可以用在需要长时间运行的服务中,每个长时间运行的goroutine可能会包含一个死循环,在不同goroutine的死循环内部使用串联的Channels来通信"
  • 如何发送有限的数列?可以基于以下条件:
    1.当一个chan被关闭,再向chan发送数据会引起panic
    2.当一个chan被关闭,已经发送的数据被成功接收后,后续的接收操作会收到一个零值
    3.没有办法直接测试一个chan是否被关闭,但是接收操作有一个变体形式:
// Squarer
go func() {
    for {
        x, ok := <-naturals
        if !ok {
            break // channel was closed and drained
        }
        squares <- x * x
    }
    close(squares)
}()
  • 通过ok变量可以知道是否从chan中成功接收到值
  • 可以使用range循环是上面处理模式的简洁语法,它依次从channel接收数据,当channel被关闭并且没有值可接收时跳出循环
  • 改进版本:
func main() {
    naturals := make(chan int)
    squares := make(chan int)
    // Counter
    go func() {
        for x := 0; x < 100; x++ {
            naturals <- x
        }
        close(naturals)
    }()
    // Squarer
    go func() {
        for x := range naturals {
            squares <- x * x
        }
        close(squares)
    }()
    // Printer (in main goroutine)
    for x := range squares {
        fmt.Println(x)
    }
}
  • 试图关闭一个chan两次或以上,将导致panic
  • 试图关闭一个nil值的chan也会引起panic
  • 关闭一个channel还会触发一个广播机制

并发的退出(调整了原书的顺序,原书gopl-zh 332页)

  • 有时候我们需要通知goroutine停止它正在干的事情
  • Go没有提供在一个goroutine中终止另一个goroutine的方法
  • 使用内置的close函数会触发一个广播

单方向的Channel

  • chan T表示一个既能接收又能发送T类型数据的chan
  • chan<- T 类型表示一个只发送T类型数据的chan
  • <-chan T类型表示一个只接收T类型数据的chan
  • 以上的限制会在编译期检测
func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}
func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}
func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}
func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

带缓存的Channels

  • 带缓存的Chan内部是一个元素队列,队列最大容量在使用make创建chan时指定
ch = make(chan string, 3)

go语言圣经第八章(读书笔记)

  • 对于上面的例子,我们可以在无阻塞的情况下,连续向chan发送三个值
ch <- "A"
ch <- "B"
ch <- "C"

go语言圣经第八章(读书笔记)

  • 如果我们接收一个值
fmt.Println(<-ch) //"A"

go语言圣经第八章(读书笔记)

  • 这时,chan里的缓存队列不是空的,也不是满的,可以在无阻塞的情况下,对其进行发送和接收操作
  • 可以使用len函数获取chan的有效元素个数
  • 可以使用cap函数获取chan的缓存队列容量
  • 如果再发生两次接收操作,队列将变空
fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"
  • 之后的接收操作的goroutine将被阻塞
  • 不可将带缓存的chan当作同一个goroutine中的队列使用。channel和goroutine的调度器机制是紧密向量的,一个发送操作或许是整个程序,这样会造成永久的阻塞
  • 下面有一个bug程序,根据原书例子(gopl-zh 310页)改编
func mirroredQuery() string {
    responses := make(chan string)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
  • 以上程序用的是无缓存的chan,随着函数被执行,必定会有两个goroutine被一直阻塞,这就造成了goroutines泄漏。和垃圾变量不同,泄漏的goroutines并不会被自动回收
  • 因此,确保每个不再需要的goroutine能正常退出是重要的

无缓存和有缓存chan的选择

  • 无缓存:更强调操作之间的同步接收操作
  • 有缓存:接收操作解耦,像cpu,cache,memory之间关系,原书中用了蛋糕店作为例子(gopl-zh 310页)

并发的循环

  • "子问题都是完全彼此独立的问题被叫做易并行问题(译注:embarrassingly parallel,直译的话更像是尴尬并行)"
  • 回顾匿名函数调用循环变量问题
var rmdirs []func()
for _, dir := range tempDirs() {
    os.MkdirAll(dir, 0755)
    rmdirs = append(rmdirs, func() {
        os.RemoveAll(dir) // NOTE: incorrect!
    })
}
  • 以上例子,匿名函数中的函数体使用os.RemoveAll(dir),其实质上是使用了循环变量dir的地址,而不是循环变量某一时候的值,而os.MkdirAll(dir, 0755)因为是函数传参,获得dir的一份拷贝所以没问题
  • gopl-zh 312页例子很有价值,直接去看即可

基于select的多路复用

select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
  • 上面是select语句的一般形式,每一个分支代表一个通信操作
  • "select会等待case中有能够执行的case时去执行。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去"