21 并发编程
目录:
21 并发编程
21.1 概述
21.2 goroutine
21.3 channel
21.3.1 channel类型
21.3.2 无缓冲channel
21.3.3 有缓冲channel
21.3.4 range和close
21.3.5 单方向channel
21.3.6 定时器
Timer
Ticker
21.4 select
21.4.1 select作用
21.4.2 超时
21.1 概述
21.1.1 并行和并发
并行(parallel):指在同一时刻,有多条指令在多个处理器上同时执行。
并发(concurrency):指在同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果,但在微观上并不是同时执行的,只是把时间分成若干段,使多个进程快速交替的执行。
- 并行是两个队列同时使用两台咖啡机
- 并发是两个队列交替使用一台咖啡机
21.1.2 Go语言并发优势
有人把Go比作21世纪的C语言,第一是因为Go语言设计简单;第二,21世纪最重要的就是并发程序设计,而Go从语言层面就支持了并发。同时,并发程序的内存管理有时候是非常复杂的,而Go语言提供了自动垃圾回收机制。
Go语言为并发编程而内置的上层API基于CSP(communication sequential process,顺序通信进程)模型。这就意味着显式锁都是可以避免的,因为Go语言通过安全的通道发送和接受数据以实现同步,这大大地简化了并发程序的编写。
一般情况下,一个普通的桌面计算机跑十几二十个线程就有点负载过大了,但是同样这台机器却可以轻松地让成百上千甚至过万个goroutine进行资源竞争。
21.2 goroutine
21.2.1 goroutine是什么
goroutine是Go并发设计的核心。goroutine说到底其实就是就是协程,但是它比线程更小,十几个goroutine可能体现在底层就是五六个线程,Go语言内部帮你实现了这些goroutine之间的内存共享。执行goroutine只需极少的内存(大概是4~5KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更轻便。
21.2.2 创建goroutine
只需要在函数调用语句前添加go关键字,就可以创建并发执行单元。开发人员无需了解任何执行细节,调度器会自动将其安排到合适的系统线程上执行。
在并发编程里,我们通常想将一个过程切分成几块,然后让每个goroutine各自负责一块工作。当一个程序启动时,其主函数即在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。
示例:
package main import ( "fmt" "time" ) func main(){ go newTask() //新建一个goroutine for { fmt.Println("this is a main goroutine.") time.Sleep(time.Second) } } func newTask(){ for { fmt.Println("this is a new Task.") time.Sleep(time.Second) //延时1s } }
以上实例运行结果为:
this is a main goroutine. this is a new Task. this is a new Task. this is a main goroutine. this is a main goroutine. this is a new Task. ......
21.2.3 主goroutine先退出
主协程退出了,其他子协程也要跟着退出。
实例:
package main import ( "fmt" "time" ) func main(){ go func (){ i:=0 for { fmt.Println("this is a new Task : ",i) time.Sleep(time.Second) i++ } }() i := 0 for { fmt.Println("this is a main goroutine :",i) time.Sleep(time.Second) i++ if i==2 { break } } }
以上实例运行结果为:
this is a main goroutine : 0 this is a new Task : 0 this is a new Task : 1 this is a main goroutine : 1
主协程先退出导致子协程没有来得及调用:
package main import ( "fmt" "time" ) func main(){ go func (){ i:=0 for { fmt.Println("this is a new Task : ",i) time.Sleep(time.Second) i++ } }() }
11.2.4 runtime包
Gosched
runtime.Gosched()用于让出CPU时间片,让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行。
这就像跑接力赛,A跑了一会碰到代码runtime.Gosched()就把接力棒交给B了,A歇着了,B继续跑。
实例:
package main import ( "fmt" "runtime" ) func main(){ go func (){ for i:=0;i<5;i++{ fmt.Println("Oh!") } }() for i:=0;i<2;i++{ //让出时间片,先让别的协程执行,执行完了,再回来执行此协程 runtime.Gosched() fmt.Println("Yeah!") } }
Goexit
调用runtime.Goexit()将立即终止当前goroutine执行,调度器确保所有已注册defer延迟调用被执行。
package main import ( "fmt" "runtime" ) func main(){ //创建协程 go func(){ fmt.Println("En...") //调用函数 test() fmt.Println("Oops...") }() //不让主协程结束 for{} } func test() { defer fmt.Println("Yeah!") runtime.Goexit() //终止所在的协程 fmt.Println("Oh!") }
GOMAXPROCS
调用runtime.GOMAXPROCS()用来设置可以并行计算的CPU核数的最大值,并返回之前的值。
package main import ( "fmt" "runtime" ) func main(){ n:=runtime.GOMAXPROCS(1) //把参数改为2试一试 fmt.Println("n=",n) for { go fmt.Print(0) fmt.Print(1) } }
在第一次执行(runtime.GOMAXPROCS(1))时,最多同时只能有一个goroutine被执行。所以会打印很多1。过了一段时间后,Go调度器会将其置为休眠,并唤醒另一个goroutine,这时候就开始打印很多0了,在打印的时候,goroutine是被调度到操作系统线程上的。
在第二次执行(runtime.GOMAXPROCS(2))时,我们使用了两个CPU,所以两个goroutine可以一起被执行,以同样的频率交替打印0和1。
多任务资源竞争问题:
package main import ( "fmt" "time" ) func Printer(str string){ for _,data:=range str { fmt.Printf("%c",data) time.Sleep(time.Second) } fmt.Printf("\n") } func person1(){ Printer("Oh!") } func person2(){ Printer("Yeah!") } func main() { //新建2个协程,代表2个人。两个人共同使用打印机 go person1() go person2() //不让主协程结束 for{} }
21.3 channel
goroutine运行在相同的地址空间,因此访问共享内存必须做好同步。goroutine奉行通过通信来共享内存,而不是共享内存来通信。
引用类型channel是CSP模式的具体实现,用于多个goroutine通讯。其内部实现了同步,确保并发安全。
21.3.1 channel类型
定义一个channel时,也需要定义发送到channel的值的类型。channel可以使用内置的make()函数来创建:
make(chan Type) //等价于make(chan Type,0) make(chan Type,capacity)
当capacity=0时,channel是无缓冲阻塞读写的;当capacity>0时,channel有缓冲、是非阻塞的,直到写满capacity个元素才阻塞写入。
channel通过操作符<-来接收和发送数据,发送和接收数据语法:
channel <- value //发送value到channel <- channel //接收并将其丢弃 x := <-channel //从channel中接收数据,并赋值给x x,ok := <-channel //功能同上,同时检查通道是否已关闭或者是否为空
默认情况下,channel接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得goroutine同步变得更加简单,而不需要显示的lock。
实例:
package main import ( "fmt" "time" ) var ch = make(chan int) func Printer(str string){ for _,data:=range str { fmt.Printf("%c",data) time.Sleep(time.Second) } fmt.Printf("\n") } //person1执行完成,才到person2执行 func person1(){ Printer("Oh!") ch<-0 //给管道/通道写数据,发送 } func person2(){ <-ch //从管道取数据,接收,如果通道没有数据它就会阻塞 Printer("Yeah!") } func main() { //新建2个协程,代表2个人。两个人共同使用打印机 go person1() go person2() //不让主协程结束 for{} }
以上实例执行结果为:
Oh! Yeah!
通过channel实现同步和数据交互。
实例:
package main import ( "fmt" "time" ) func main() { defer fmt.Println("主协程结束。") ch := make(chan string) go func() { defer fmt.Println("子协程调用完毕。") for i := 0; i < 2; i++ { fmt.Println("子协程 i = ", i) time.Sleep(time.Second) } ch <- "子协程干活儿了。" //把这行注释掉再运行一下,看看什么结果 }() str := <-ch //没有数据前,阻塞 fmt.Println("str = ", str) }
以上实例执行结果为:
子协程 i = 0 子协程 i = 1 子协程调用完毕。 str = 子协程干活儿了。 主协程结束。
21.3.2 无缓冲的channel
无缓冲的通道(unbuffersd channel)是指在接收前没有能力保存任何值的通道。
这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。如果两个goroutine没有同时准备好,通道会导致先执行发送或接收操作的goroutine阻塞等待。
这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
下图展示两个goroutine如何利用无缓冲的通道来共享一个值:
- 在第1步,两个goroutine都到达通道,但哪个都没有开始执行发送或者接收。
- 在第2步,左侧的goroutine将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个goroutine会在通道中被锁住,直到交换完成。
- 在第3步,右侧的goroutine将它的手放入通道,这模拟了从通道里接收数据。这个goroutine一样也会在通道中被锁住,直到交换完成。
- 在第4步和第5步,进行交换,并最终在第6步,两个goroutine都将它们的手从通道里拿出来,这模拟了被锁住的goroutine得到释放。两个goroutine现在都可以去做别的事情了。
无缓冲的channel创建格式:
make(chan Type) //等价于make(chan Type,0)如果没有指定缓冲区容量,那么该通道就是同步的,因此会阻塞到发送者准备好发送和接收者准备好接收。
实例:
package main import ( "fmt" "time" ) func main() { //创建一个无缓存的channel ch := make(chan int,0) //len(ch)缓冲区剩余数据个数,cap(ch)缓冲区大小 fmt.Printf("len(ch)=%d,cap(ch)=%d\n",len(ch),cap(ch)) //新建协程 go func() { for i:=0;i<3;i++{ fmt.Println("子协程:i=",i) ch <- i } }() //延时 time.Sleep(2*time.Second) for i:=0;i<3;i++{ num := <-ch //读取管道中内容,没有内容前,阻塞 fmt.Println("num =",num) } }
以上实例执行结果为:
len(ch)=0,cap(ch)=0 子协程:i= 0 num = 0 子协程:i= 1 子协程:i= 2 num = 1 num = 2
21.3.3 有缓冲的channel
有缓冲的通道(buffered channel)是一种在被接收前能存储一个或多个值的通道。
这种类型的通道并不强制要求goroutine之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的goroutine会在同一时间进行数据交换;有缓冲的通道没有这种保证。
- 在第1步,右侧的goroutine正在从通道接收一个值。
- 在第2步,右侧的这个goroutine独立完成了接收值的动作,而左侧的goroutine正在发送一个新值到通道里。
- 在第3步,左侧的goroutine还在向通道发送新值,而右侧的goroutine正在从通道接收另外一个值。这个步骤里的两个操作既不是同步的,也不是互相阻塞。
- 在第4步,所有的发送和接收都完成,而通道里还有几个值,也有一些空间可以存更多的值。
有缓冲的channel创建格式:
make(chan Type,capicity)
如果给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含可以接收的数据,那么其通信就会无阻塞地进行。
实例:
package main import "fmt" func main() { //创建一个有缓存的channel,容量为3 ch := make(chan int, 3) fmt.Printf("len(ch)=%d,cap(ch)=%d", len(ch), cap(ch)) }
输出结果为:
len(ch)=0,cap(ch)=3
实例:
package main import ( "fmt" "time" ) func main() { //创建一个有缓存的channel,容量为3 ch := make(chan int, 3) fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch)) //新建协程 go func() { for i := 0; i < 3; i++ { //改成i<10试试 ch <- i //不会阻塞,ch容量为3 fmt.Printf("子协程[%d]:len(ch)=%d,cap(ch)=%d\n", i, len(ch), cap(ch)) } }() //延时 time.Sleep(2 * time.Second) for i := 0; i < 3; i++ { //改成i<10试试 num := <-ch //读取管道中内容,没有内容前,阻塞 fmt.Println("num =", num) } }
输出结果为:
len(ch)=0,cap(ch)=3 子协程[0]:len(ch)=1,cap(ch)=3 子协程[1]:len(ch)=2,cap(ch)=3 子协程[2]:len(ch)=3,cap(ch)=3 num = 0 num = 1 num = 2
21.3.4 range和close
close的用法:
package main import ( "fmt" ) func main() { //创建一个无缓存的channel ch := make(chan int) fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch)) //新建协程 go func() { for i := 0; i < 5; i++ { ch <- i //往通道写数据 } //不需要再写数据,关闭channel close(ch) ch <- 5 //关闭channel后无法再发送数据 }() for { //如果ok为true,说明通道没有关闭 if num,ok:=<-ch;ok==true{ fmt.Println("num = ",num) }else { //通道关闭 //fmt.Println(num) break } } }
上述实例打印结果为:
len(ch)=0,cap(ch)=0 num = 0 num = 1 num = 2 num = 3 num = 4
注意点:
- channel不像文件一样需要经常去关闭,只有当你确定没有任何发送数据了,或者你想显式地结束range循环之类的,才去关闭channel;
- 关闭channel后,无法向channel再发送数据(引发panic错误后导致接收立即返回零值);
- 关闭channel后,可以继续从channel接收数据;
- 对于nil channel,无论收发都会被阻塞。
range的用法:
package main import ( "fmt" ) func main() { //创建一个无缓存的channel ch := make(chan int) fmt.Printf("len(ch)=%d,cap(ch)=%d\n", len(ch), cap(ch)) //新建协程 go func() { for i := 0; i < 5; i++ { ch <- i //往通道写数据 } //不需要再写数据,关闭channel close(ch) //ch <- 5 //关闭channel后无法再发送数据 }() for num:=range ch{ //可以自动跳出循环 fmt.Println("num = ",num) } }
上述实例打印结果为:
len(ch)=0,cap(ch)=0 num = 0 num = 1 num = 2 num = 3 num = 4
21.3.5 单方向的channel
默认情况下,通道是双向的,也就是,既可以往里面发送数据也可以从里面取出数据。
但是,我们经常见一个通道作为参数进行值传递而且希望对方是单向使用的,要么只让它发送数据,要么只让它接收数据,这时候我们可以指定通道的方向。
单向channel变量的声明非常简单,如下:
var ch1 chan int //ch1是一个正常的channel,不是单向的 var ch2 chan<- float64 //ch2是单向channel,只用于写float64数据 var ch3 <-chan int //ch3是单向channel,只用于读取int数据
- chan<- 表示数据进入管道,要把数据写进管道,对于调用者就是输出。
- <-chan 表示数据从管道出来,对于调用者就是得到管道的数据,当然就是输入。
package main func main() { //创建一个channel,双向的 ch := make(chan int) //双向channel能隐式转换为单向channel var writeCh chan<- int = ch //只能写 //var readCh <-chan int = ch //只能读 //writeCh<- 6 //<-writeCh //invalid operation: <-writeCh (receive from send-only type chan<- int) //close(writeCh) //<-readCh //readCh<- 6 //invalid operation: readCh <- 6 (send to receive-only type <-chan int) //单向无法转换为双向 var ch1 chan int = writeCh //cannot use writeCh (type chan<- int) as type chan int in assignment }
可以将channel隐式转换为单向队列,只收或只发,不能将单向channel转换为普通channel。
实例:
package main import "fmt" func main() { //创建一个双向通道 ch := make(chan int) //生产者,生产数字,写入channel //新开一个协程 go producer(ch) //channel传参,引用传递 //消费者,从channel读内容 consumer(ch) } //此channel只能写 func producer(in chan<- int){ for i:=0;i<10;i++{ in<-i } close(in) } //此channel只能读 func consumer(out <-chan int) { for num := range out{ fmt.Println("num = ",num) } }
上述实例打印结果为:
num = 0 num = 1 num = 2 num = 3 num = 4 num = 5 num = 6 num = 7 num = 8 num = 9
21.3.6 定时器
1.Timer
Timer是一个定时器,代表未来的一个单一事件,你可以告诉timer你要等待多长时间,它提供一个channel,在未来的那个时间那个channel提供了一个时间值。
time.MewTimer()方法:
package main import ( "fmt" "time" ) func main() { //创建一个定时器,设置时间为2s,2s后往time通道写内容(当前时间) timer := time.NewTimer(2*time.Second) fmt.Println("Current time :",time.Now()) // 2s后,往timer.C写数据,有数据后,就可以读取 t := <-timer.C //channel没有数据前后阻塞 fmt.Println("t = ",t) }
上述实例打印结果为:
Current time : 2018-05-25 19:06:32.3679043 +0800 CST m=+0.005014201 t = 2018-05-25 19:06:34.3681931 +0800 CST m=+2.005303101
time.NewTimer()时间到了,只会响应一次:
package main import ( "fmt" "time" ) func main() { //创建一个定时器,设置时间为2s,2s后往time通道写内容(当前时间) timer := time.NewTimer(2*time.Second) for { <-timer.C //只会写一次,然后就阻塞,死锁报错 fmt.Println("Time out.") } }
上述实例输出结果为:
Time out. fatal error: all goroutines are asleep - deadlock!
time.Sleep()方法:
package main import ( "fmt" "time" ) func main() { //延时2s后打印 time.Sleep(2*time.Second) fmt.Println("Time out.") }
2s后打印:
Time out.
time.After()方法:
package main import ( "fmt" "time" ) func main() { <-time.After(2*time.Second) //定时2s,阻塞2s,2s后产生一个事件,往channel写内容 fmt.Println("Time out.") }
2s后打印:
Time out.
time的停用:
package main import ( "fmt" "time" ) func main() { timer := time.NewTimer(3*time.Second) go func() { <-timer.C fmt.Println("Time out.") }() timer.Stop() //停止定时器 for { } }
time的重置:
package main import ( "fmt" "time" ) func main() { timer := time.NewTimer(3*time.Second) timer.Reset(1*time.Second) <-timer.C fmt.Println("Time out.") }
2.Ticker
Ticker是一个定时触发的计时器,它会以一个间隔(interval)往channel发送一个事件(当前时间),而channel的接收者可以以固定的时间间隔从channel中读取事件。
实例:
package main import ( "fmt" "time" ) func main(){ ticker := time.NewTicker(1*time.Second) i:=0 for { <-ticker.C i++ fmt.Println(i) } }
上述实例输出结果为:
1 2 3 4 5 6 7 8 9 ...
ticker的停止:
package main import ( "fmt" "time" ) func main(){ ticker := time.NewTicker(1*time.Second) i:=0 for { <-ticker.C fmt.Println(i) i++ if i==5{ ticker.Stop() break } } }
上述实例输出结果为:
0 1 2 3 4
21.4 select
Go里面提供了一个关键字select,通过select可以监听channel上的数据流动。
select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。
与switch语句可以选择任何可使用相等比较的条件相比,select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:
select{ case <-chan1: //如果chan1成功读到数据,则进行case处理语句 case chan2<-1: //如果成功向chan2写入数据,则进行该case处理语句 default: //如果上面都没有成功,则进入default处理流程 }
在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。
如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。
如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:
- 如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。
- 如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去。
通过select实现斐波那契数列数列:
package main import ( "fmt" ) func fibonacci(ch chan<- int,quit <-chan bool) { x,y := 1,1 for{ //监听channel数据的流动 select { case ch <- x: x,y = y,x+y case flag:=<-quit: fmt.Println("flag = ",flag) return } } } func main() { ch:=make(chan int) //数字通信 quit:=make(chan bool) //消费者,从channel读取内容 //新建协程 go func() { for i:=0;i<10;i++{ num:=<-ch fmt.Println(num) } //可以停止 quit<-true }() //生产者,生产数字,写入channel fibonacci(ch,quit) }
上述实例输出结果为:
1 1 2 3 5 8 13 21 34 55 flag = true
有时候会出现goroutine阻塞的情况,那么我们如何避免整个程序进入阻塞的情况呢?我们可以利用select来设置超时,通过如下的方式实现:
package main import ( "fmt" "time" ) func main() { ch := make(chan int) quit := make(chan bool) // 新开一个协程 go func() { for ; ; { select { case v := <-ch: fmt.Println(v) case <-time.After(3*time.Second): fmt.Println("Timeout.") quit<-true break } } }() //往ch中存放数据 for i:=0;i<5;i++{ ch<-i time.Sleep(time.Second) } <-quit fmt.Println("It is the end of the program.") }
上述实例输出结果为:
0 1 2 3 4 Timeout. It is the end of the program.
新浪微博:古老医麦
技术交流论坛:http://www.yinchengxueyuan.com/forum.php