目录
第八章 Goroutines和Channels
- 备注:
1.这一部分开始,原书gopl开始出现较多的中文错误
2.这一章节有大量例子,此处省去
Goroutines
- 在Go语言中,每一个并发的执行单元叫做一个goroutine
- 当一个程序启动,其主函数将在一个单独的goroutine中运行,叫做main goroutine
- 可以使用go关键字来创建新的goroutine
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
- 除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个goroutine来打断另一个的执行
Channels
- 如果说goroutine是Go语言程序的并发体的话,那么channels就是它们之间的通信机制
- 每一个channel都有一个特殊的类型,也就是channel可发送数据的类型
ch := make(chan int) // ch has type 'chan int'
- 和map类似,chan也是使用make创建的底层数据结构的引用
- 当我们复制一个chan或用于函数传参时,只是拷贝了一个chan的引用
- chan类型的零值是nil
- 两个相同类型的chan可以使用==运算符进行比较:
1.如果两个chan引用的是相同的对象,那么比较的结果为真
2.一个chan也可以和nil进行比较 - 一个channel有发送和接受两种操作,都是通信行为:
1.一个发送语句将一个值从一个goroutine通过channel发送到另一个执行接收操作的goroutine
2.发送和接收都用<-运算符
3.发送语句中,channel在左,要发送的值在右
4.接受语句中,channel在右,如果使用变量接收,需要用到=
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
- channel还有close操作,使用close操作后:
1.基于该channel的任何发送操作都将导致panic
2.1.基于该channel的任何接收操作依然可以接收之前已经成功发送的数据
2.2.如果channel中已经没有数据,将产生一个零值的数据
close(ch)
- 使用make创建chan,可以指定参数来实现一个有缓冲的channel(默认是无缓冲的)
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
不带缓存的Channel
- 基于无缓存的Chan之间:
1.如果发送者goroutine的发送操作先执行,那么发送者goroutine将被阻塞,直到另一个接收者goroutine在相同的Chan上执行接收操作。
2.如果接收者goroutine的接收操作先发生,那么接收者goroutine将被阻塞,直到有另一个发送者goroutine在相同的Chan上执行发送操作
3.当发送的值被成功接收后,两个goroutine才可以继续执行后续语句 - 基于无缓存的Chan的发送和接收操作将导致两个goroutine做一次同步操作,因此这个无缓存的Chan也被称为同步Chan
- 当通过一个无缓存的Chan发送数据时,接收者会先被唤醒,然后接收数据(详细需要深究Go语言的并发内存模型,happens before概念)
- 一个基本问题:主的goroutine的完成通常不会等待其它goroutine完成
- 可以使用一个chan来解决这个同步问题
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}
串联的Channels(Pipeline)
- Channels页可以用于将多个goroutine链接在一起
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}
- "像这样的串联Channels的管道(Pipelines)可以用在需要长时间运行的服务中,每个长时间运行的goroutine可能会包含一个死循环,在不同goroutine的死循环内部使用串联的Channels来通信"
- 如何发送有限的数列?可以基于以下条件:
1.当一个chan被关闭,再向chan发送数据会引起panic
2.当一个chan被关闭,已经发送的数据被成功接收后,后续的接收操作会收到一个零值
3.没有办法直接测试一个chan是否被关闭,但是接收操作有一个变体形式:
// Squarer
go func() {
for {
x, ok := <-naturals
if !ok {
break // channel was closed and drained
}
squares <- x * x
}
close(squares)
}()
- 通过ok变量可以知道是否从chan中成功接收到值
- 可以使用range循环是上面处理模式的简洁语法,它依次从channel接收数据,当channel被关闭并且没有值可接收时跳出循环
- 改进版本:
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}
- 试图关闭一个chan两次或以上,将导致panic
- 试图关闭一个nil值的chan也会引起panic
- 关闭一个channel还会触发一个广播机制
并发的退出(调整了原书的顺序,原书gopl-zh 332页)
- 有时候我们需要通知goroutine停止它正在干的事情
- Go没有提供在一个goroutine中终止另一个goroutine的方法
- 使用内置的close函数会触发一个广播
单方向的Channel
- chan T表示一个既能接收又能发送T类型数据的chan
- chan<- T 类型表示一个只发送T类型数据的chan
- <-chan T类型表示一个只接收T类型数据的chan
- 以上的限制会在编译期检测
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
带缓存的Channels
- 带缓存的Chan内部是一个元素队列,队列最大容量在使用make创建chan时指定
ch = make(chan string, 3)
- 对于上面的例子,我们可以在无阻塞的情况下,连续向chan发送三个值
ch <- "A"
ch <- "B"
ch <- "C"
- 如果我们接收一个值
fmt.Println(<-ch) //"A"
- 这时,chan里的缓存队列不是空的,也不是满的,可以在无阻塞的情况下,对其进行发送和接收操作
- 可以使用len函数获取chan的有效元素个数
- 可以使用cap函数获取chan的缓存队列容量
- 如果再发生两次接收操作,队列将变空
fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"
- 之后的接收操作的goroutine将被阻塞
- 不可将带缓存的chan当作同一个goroutine中的队列使用。channel和goroutine的调度器机制是紧密向量的,一个发送操作或许是整个程序,这样会造成永久的阻塞
- 下面有一个bug程序,根据原书例子(gopl-zh 310页)改编
func mirroredQuery() string {
responses := make(chan string)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
- 以上程序用的是无缓存的chan,随着函数被执行,必定会有两个goroutine被一直阻塞,这就造成了goroutines泄漏。和垃圾变量不同,泄漏的goroutines并不会被自动回收
- 因此,确保每个不再需要的goroutine能正常退出是重要的
无缓存和有缓存chan的选择
- 无缓存:更强调操作之间的同步接收操作
- 有缓存:接收操作解耦,像cpu,cache,memory之间关系,原书中用了蛋糕店作为例子(gopl-zh 310页)
并发的循环
- "子问题都是完全彼此独立的问题被叫做易并行问题(译注:embarrassingly parallel,直译的话更像是尴尬并行)"
- 回顾匿名函数调用循环变量问题
var rmdirs []func()
for _, dir := range tempDirs() {
os.MkdirAll(dir, 0755)
rmdirs = append(rmdirs, func() {
os.RemoveAll(dir) // NOTE: incorrect!
})
}
- 以上例子,匿名函数中的函数体使用os.RemoveAll(dir),其实质上是使用了循环变量dir的地址,而不是循环变量某一时候的值,而os.MkdirAll(dir, 0755)因为是函数传参,获得dir的一份拷贝所以没问题
- gopl-zh 312页例子很有价值,直接去看即可
基于select的多路复用
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
- 上面是select语句的一般形式,每一个分支代表一个通信操作
- "select会等待case中有能够执行的case时去执行。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去"