Go语言学习之8 goroutine详解、定时器与单元测试

时间:2022-03-05 15:40:20

主要内容:

1.Goroutine
2. Chanel
3. 单元测试

1. Goroutine

Go 协程(Goroutine)(轻量级的线程,开线程没有数量限制)。
   (1)进程和线程
  A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
  B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
  C. 一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。

Go语言学习之8 goroutine详解、定时器与单元测试

例如:ngix是多进程的单线程程序

内核线程、轻量级进程、用户线程三种线程概念详见:https://blog.csdn.net/gatieme/article/details/51481863

  (2)并发和并行

A. 并发是指立即处理多个任务的能力。多线程程序在一个核的cpu上运行(线程之间通过CPU轮询来执行),就是并发。go多线程的切换都是在用户态操作的,不像其他语言先切换到内核态,完成线程切换,然后返回用户态继续执行程序。
        B. 并行是指同时处理多个任务。多线程程序在多个核的cpu上运行,就是并行。
        例如:假如有一个 web 浏览器。这个 web 浏览器有各种组件。其中两个分别是 web 页面的渲染区和从网上下载文件的下载器。假设各个组件也都可以相互独立地运行。当浏览器在单核处理器中运行时,处理器会在浏览器的两个组件间进行上下文切换。它可能在一段时间内下载文件,转而又对用户请求的 web 页面进行渲染。这就是并发。并发的进程从不同的时间点开始,分别交替运行。在这里,就是在不同的时间点开始进行下载和渲染,并相互交替运行的。
       如果该浏览器在一个多核处理器上运行,此时下载文件的组件和渲染 HTML 的组件可能会在不同的核上同时运行。这称之为并行。

Go 编程语言原生支持并发。Go 使用 Go 协程(Goroutine) 和信道(Channel)来处理并发。

Go语言学习之8 goroutine详解、定时器与单元测试

注意:并行不一定会加快运行速度,因为并行运行的组件之间可能需要相互通信。在我们浏览器的例子里,当文件下载完成后,应当对用户进行提醒,比如弹出一个窗口。于是,在负责下载的组件和负责渲染用户界面的组件之间,就产生了通信。在并发系统上,这种通信开销很小。但在多核的并行系统上,组件间的通信开销就很高了。所以,并行不一定会加快运行速度!

补充:用户线程指的是完全建立在用户空间的线程库,用户线程的建立,同步,销毁,调度完全在用户空间完成,不需要内核的帮助。因此这种线程的操作是极其快速的且低消耗的。

    (3)协程和线程
   协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。
   线程:一个线程上可以跑多个协程,协程是轻量量级的线程。一个线程可以跑多个Goroutine。

Go 协程相比于线程的优势:

  • 相比线程而言,Go 协程的成本极低。堆栈大小只有若干 kb,并且可以根据应用的需求进行增减。而线程必须指定堆栈的大小,其堆栈是固定不变的。
  • Go 协程会复用(Multiplex)数量更少的 OS 线程。即使程序有数以千计的 Go 协程,也可能只有一个线程。如果该线程中的某一 Go 协程发生了阻塞(比如说等待用户输入),那么系统会再创建一个 OS 线程,并把其余 Go 协程都移动到这个新的 OS 线程。所有这一切都在运行时进行,作为程序员,我们没有直接面临这些复杂的细节,而是有一个简洁的 API 来处理并发。
  • Go 协程使用信道(Channel)来进行通信。信道用于防止多个协程访问共享内存时发生竞态条件(Race Condition)。信道可以看作是 Go 协程之间通信的管道.

      GO语言Goroutine与线程的区别:https://baijiahao.baidu.com/s?id=1620972759226100794&wfr=spider&for=pc

    (4)goroutine调度模型

M 代表内核级线程,一个M就是一个线程,goroutine就是跑在M之上的。
        P 全称是Processor,处理器,它的主要用途就是用来执行goroutine的,所以它也维护了一个goroutine队列,里面存储了所有需要它来执行的goroutine。
        G 就是goroutine实现的核心结构了,G维护了goroutine需要的栈、程序计数器以及它所在的M等信息。
        Sched 结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等。

Go语言学习之8 goroutine详解、定时器与单元测试

如果有IO操作时,会新起一个线程等待IO操作的Goroutine

Go语言学习之8 goroutine详解、定时器与单元测试

Go scheduler: https://www.jianshu.com/p/1911b1229a44
       解释Goroutine浅显易懂:https://www.jianshu.com/p/7ebf732b6e1f
       Go语言 Goroutine 浅析:http://baijiahao.baidu.com/s?id=1587634508058779877&wfr=spider&for=pc
       Goroutine并发调度模型深度解析之手撸一个协程池:
       https://www.jianshu.com/p/fa6d82934cb8?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

启动一个go协程?

 package main 

 import "fmt"

 func test_go() {
fmt.Println("hello world")
} func main() {
go test_go()
fmt.Println("main func finished")
}

启动一个go协程

执行结果:

Go语言学习之8 goroutine详解、定时器与单元测试

分析:发现起的go协程go test_go()并没有生效,只打印出 hello world,这是由于,启动一个新的协程时,协程的调用会立即返回。与函数不同,程序控制不会去等待 Go 协程执行完毕。下面使用sleep使主线程处于睡眠之中等待go协程执行结束(实际中该方法不靠谱)。后面会介绍靠谱的方法。

 package main 

 import (
"fmt"
"time"
)
func test_go() {
fmt.Println("hello world")
} func main() {
go test_go()
time.Sleep(time.Second)
fmt.Println("main func finished")
}

sleep阻塞主线程等待go协程执行结束

执行结果:

Go语言学习之8 goroutine详解、定时器与单元测试

启动多个go协程?

 package main

 import (
"fmt"
"time"
) func numbers() {
for i := ; i <= ; i++ {
time.Sleep( * time.Millisecond)
fmt.Printf("%d ", i)
}
}
func alphabets() {
for i := 'a'; i <= 'e'; i++ {
time.Sleep( * time.Millisecond)
fmt.Printf("%c ", i)
}
}
func main() {
go numbers()
go alphabets()
time.Sleep( * time.Millisecond)
fmt.Printf("\nmain terminated")
}

启动多个go协程

读者可以自行分析该程序的时间片打印输出。

    (5)如何设置golang运行的cpu核数
         1.5之前go需要手动设置程序执行的内核数,1.5之后go自动设置

 package main

 import (
"fmt"
"runtime"
) func main() {
num := runtime.NumCPU() //查看有几个内核
fmt.Printf("cpu num:%d\n", num)
runtime.GOMAXPROCS() //设置有程序用几个内核执行
}

获取CPU核数并设置执行程序的核数

    (6)不同goroutine之间进行通讯

        A:全局变量和锁同步

 package main

 import (
"fmt"
"sync"
"time"
) var (
m = make(map[int]uint64)
lock sync.Mutex
) type task struct {
n int
} func calc(t *task) {
var sum uint64
sum =
for i := ; i < t.n; i++ {
sum *= uint64(i)
} fmt.Println(t.n, sum)
lock.Lock() //加锁,不然多个协程修改全局变量会存在竞争
m[t.n] = sum
lock.Unlock()
} func main() {
for i := ; i < ; i++ {
t := &task{n: i}
go calc(t)
} time.Sleep( * time.Second)
lock.Lock()
for k, v := range m {
fmt.Printf("%d! = %v\n", k, v)
}
lock.Unlock()
}

全局变量和锁同步

B:Channel

 package main

 import (
"fmt"
"time"
) func write(ch chan int) {
for i := ; i < ; i++ {
ch <- i
fmt.Println("put data:", i)
}
} func read(ch chan int) {
for {
var b int
b = <-ch
fmt.Println(b)
time.Sleep(time.Second)
}
} func main() {
intChan := make(chan int, )
go write(intChan)
go read(intChan) time.Sleep( * time.Second)
}

channel write and read

    (7)goroutine中使用recover

        如果某个goroutine出现panic,为了不使程序崩溃挂掉,可以在该goroutine中使用recover(类似于python中的try……except)捕获该panic。

 package main

 import (
"fmt"
"time"
) func test() {
defer func() { //defer必须放置在最前面,才能捕获后面所有的panic,程序退出时执行defer
err := recover() //捕获goroutine错误
if err != nil {
fmt.Println(err)
}
}() var p *int
*p = //panic
} func main() {
go test()
time.Sleep(time.Second)
fmt.Println("main progress exit")
}

example

 package main

 import (
"fmt"
"runtime"
"time"
) func test() { defer func() {
if err := recover(); err != nil { //处理panic,calc依然可以正常执行
fmt.Println("panic:", err)
}
}() var m map[string]int //panic: assignment to entry in nil map
m["stu"] =
} func calc() {
for {
fmt.Println("i'm calc")
time.Sleep(time.Second)
}
} func main() {
num := runtime.NumCPU()
runtime.GOMAXPROCS(num - )
go test()
for i := ; i < ; i++ {
go calc()
} time.Sleep(time.Second * )
}

recover示例2

2. 信道(Channel

Channel 可以想像成 Go 协程之间通信的管道。如同管道中的水会从一端流到另一端,通过使用信道,数据也可以从一端发送,在另一端接收。

    (1)channel概念

  • 类似unix中管道(pipe)
  • 先进先出
  • 线程安全,多个goroutine同时访问,不需要加锁
  • channel是有类型的, 一个整数的channel只能存放整数

    (2) channel声明 

var 变量名 chan 类型,例如:
var test chan int
var test chan string
var test chan map[string]string
var test chan stu //stu是一个结构体

注意:所有信道都关联了一个类型。信道只能运输这种类型的数据,而运输其他类型的数据都是非法的。

 package main 

 import "fmt"

 func main() {
var ch chan int
// ch = make(chan int, 1)
ch<-
var num int
num = <-ch
fmt.Println(num)
}

声明未初始化信道

运行结果:出现死锁

Go语言学习之8 goroutine详解、定时器与单元测试

    (3)channel初始化

使用make进行初始化,例如:
var test chan int
test = make(chan int, )
var test chan string
test = make(chan string, )

上面的程序声明了信道但是未初始化,去掉上面程序的注释初始化信道,执行结果:输出1
     注意:chan T 表示 T 类型的信道。
                信道的零值为 nil。信道的零值没有什么用,应该像对 map 和切片所做的那样,用 make 来定义信道。
     例如:

 package main

 import "fmt"

 func main() {
var ch chan int
if ch == nil {
fmt.Println("channel a is nil, going to define it")
ch = make(chan int)
fmt.Printf("Type of a is %T", ch)
}
}

信道的声明

快速声明一个信道:

ch := make(chan int)

    (4)channel基本操作

       信道旁的箭头方向指定了是发送数据还是接收数据

  • 从channel读取数据:

var testChan chan int
          testChan = make(chan int, 10)
          var a int
          a = <- testChan  //箭头对于 testChan 来说是向外指的,因此我们读取了信道 testChan 的值,并把该值存储到变量 a 中。

  • 从channel写 入数据:

var testChan chan int
          testChan = make(chan int, 10)
          var a int = 10
          testChan <- a   //箭头指向了 testChan,因此我们在把数据写入信道 testChan。
    (5)带缓冲区的channel

对于无缓冲信道的发送和接收过程是阻塞的。而对于有缓冲信道,只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。

ch := make(chan type, capacity)
有缓冲信道:capacity 应该大于
无缓冲信道:capacity为0,或者不设置capacity则容量默认也为
  • testChan只能放 一个元素:

   var testChan chan int
   testChan = make(chan int)
   var a int
   a = <- testChan

  • testChan是带缓冲区的chan, 一次可以放10个元素:

     var testChan chan int
     testChan = make(chan int, 10)
     var a int = 10
     testChan <- a

 package main

 import (
"fmt"
) func main() {
chStr := make(chan string, )
chStr <- "zhangsan"
chStr <- "lisi"
fmt.Println(<-chStr)
fmt.Println(<-chStr) chInt := make(chan int, )
chInt <-
chInt <-
fmt.Println(<-chInt)
fmt.Println(<-chInt)
}

带缓冲区的channel

 package main

 import (
"fmt"
"time"
) func write(ch chan int) {
for i := ; i < ; i++ {
ch <- i //当存入第三个数时会阻塞住,直到信道ch里面有数据被取走
fmt.Printf("Write %d to ch\n", i)
}
close(ch) //关闭信道ch
}
func main() {
ch := make(chan int, ) //信道一次最多存入两个数
go write(ch)
time.Sleep( * time.Second) //等待
for v := range ch {
fmt.Printf("read value %d from ch\n", v)
time.Sleep(time.Second) }
} // 执行结果:
// Write 0 to ch
// Write 1 to ch //先往信道里面写入两个数,阻塞
// read value 0 from ch //取走一个
// Write 2 to ch //立即往信道写入一个数
// read value 1 from ch
// Write 3 to ch
// read value 2 from ch
// Write 4 to ch
// read value 3 from ch
// read value 4 from ch

带缓冲区的chennel2

 package main

 import "fmt"

 type student struct {
name string
} func main() { var stuChan chan interface{}
stuChan = make(chan interface{}, ) stu := student{name: "stu01"} stuChan <- &stu var stu01 interface{}
stu01 = <-stuChan var stu02 *student
stu02, ok := stu01.(*student) //stu01转为*student类型
if !ok {
fmt.Println("can not convert")
return
} fmt.Println(stu02)
}

example

 package main

 import (
"fmt"
"sync"
"time"
) var wg sync.WaitGroup func consumer(goods chan string) {
for i := ; i < ; i++ {
g, ok := <-goods
if !ok {
fmt.Println("produce done ", g)
}
fmt.Println("consumer ", g)
time.Sleep(*time.Millisecond)
} wg.Done()
} func produce(goods chan string) {
for i := ; i < ; i++ {
g := fmt.Sprintf("baozi%d", i)
goods <- g
fmt.Println("produce ", g)
time.Sleep(*time.Millisecond)
}
close(goods) //生产完毕 wg.Done()
} func main() {
var goods chan string
goods = make(chan string, ) wg.Add()
go produce(goods)
go consumer(goods) wg.Wait()
}

生产者消费者模型

 package main

 import (
"fmt"
"time"
) func write(ch chan int) {
for i := ; i < ; i++ {
ch <- i
fmt.Println("put data:", i)
}
} func read(ch chan int) {
for {
var b int
b = <-ch
fmt.Println(b)
time.Sleep(time.Second)
}
} func main() {
intChan := make(chan int, )
go write(intChan)
go read(intChan) time.Sleep( * time.Second)
}

read and write channel

      死锁问题:

从带缓冲区的chennel2例子中我们可以看出,起初向新道里面写入两个数(已满),再写入时只有当信道里面有数据被取走才可以继续写入数据,如果没有数据被取走,则无法向信道里面写入数据,会出现死锁,使程序panic掉,例如

 package main

 import (
"fmt"
) func main() {
ch := make(chan string, )
ch <- "zhansan"
ch <- "lisi"
ch <- "wangwu"
fmt.Println(<-ch)
fmt.Println(<-ch)
}

带缓冲区死锁问题

     长度和容量:

 package main

 import (
"fmt"
) func main() {
ch := make(chan string, )
ch <- "zhangsan"
ch <- "lisi"
fmt.Println("capacity is", cap(ch)) //
fmt.Println("length is", len(ch)) //
fmt.Println("read value", <-ch) //zhangsan
fmt.Println("new length is", len(ch)) //new length is 1
}

带缓冲区信道长度和容量测试

从程序结果可以看出,带缓冲区信道容量是make出来可以存入信道的最大数据量,而信道长度是指信道的实际存入数据量,当从信道中取走数据,则信道长度会减小。

    (6)channel阻塞

信道的发送与接收默认是阻塞的。
        当把数据发送到信道时,程序控制会在发送数据的语句处发生阻塞,直到有其它 Go 协程从信道读取到数据,才会解除阻塞。与此类似,当读取信道的数据时,如果没有其它的协程把数据写入到这个信道,那么读取过程就会一直阻塞着。
        这样的作用?信道的这种特性能够帮助 Go 协程之间进行高效的通信,不需要用到其他编程语言常见的显式锁或条件变量。

 package main

 import (
"fmt"
) func go_test(done chan bool) {
fmt.Println("Hello world")
done <- true //将true写入信道done
}
func main() {
done := make(chan bool) //定义一个信道,里面只能存档放bool型
go go_test(done)
<-done //从信道done中读取并丢弃数据true
fmt.Println("main function")
}

信道的读取与写入

 package main

 import (
"fmt"
"time"
) func go_test(done chan bool) {
fmt.Println("go_test go routine is going to sleep")
time.Sleep( * time.Second)
fmt.Println("go_test go routine awake and going to write to done")
done <- true
}
func main() {
done := make(chan bool)
fmt.Println("Main going to call go_test go goroutine")
go go_test(done)
<-done //此时会阻塞主程序main,等待信道done中有数据写入,如果没有就会出现死锁,程序panic掉
fmt.Println("Main received data")
}

信道阻塞示例

执行结果:

Go语言学习之8 goroutine详解、定时器与单元测试

计算一个数中每一位的平方和与立方和,然后把平方和与立方和相加?

实现方法:在一个协程中计算该数的每位数平方和,在另一个协程中计算每位数立方之和,在主程序中等待上面两个协程的计算结果并计算最终结果。

 package main 

 import (
"fmt"
) //计算num每位平方之和
func calcSquare(num int, squareCh chan int) {
sum :=
for num != {
digit := num%
num /=
sum += digit * digit
}
fmt.Printf("Sum of squares is %d\n", sum)
squareCh <- sum
} //计算num每位立方之和
func calcCube(num int, cubeCh chan int) {
sum :=
for num != {
digit := num%
num /=
sum += digit * digit * digit
}
fmt.Printf("Sum of cubes is %d\n", sum)
cubeCh <- sum
} func main() {
var finalSum int =
squCh := make(chan int, )
cuCh := make(chan int, ) num := go calcSquare(num, squCh)
go calcCube(num, cuCh) squareSum, cubeSum := <-squCh, <-cuCh //阻塞并获取结果
finalSum = squareSum + cubeSum
fmt.Printf("result is %d\n", finalSum)
}

获取某数每位的平方和与立方和之和

    (7)死锁

  • 当 Go 协程给一个信道发送数据时,如果没有其他 Go 协程来接收该信道里面的数据。则程序就会在运行时触发 panic,形成死锁。
  • 同理,当有 Go 协程等着从一个信道接收数据时,如果其他 Go 协程没有向该信道写入数据,则程序就会触发 panic,形成死锁。
 package main

 func main() {
ch := make(chan int)
ch <-
}

写入管道没有被读取形成死锁

 package main

 func main() {
ch := make(chan int)
rch := <-ch
}

没有数据写入管道但是去读取形成的死锁

    (8)单向信道

Go 的信道可以在声明时约束其操作方向,如只发送或是只接收。这种被约束方向的信道被称做单向信道。

单向信道的声明格式
只能发送的信道类型为chan<-,只能接收的信道类型为<-chan,格式如下:
var 信道实例 chan<- 元素类型 // 只能发送信道
var 信道实例 <-chan 元素类型 // 只能接收信道 元素类型:信道包含的元素类型。
信道实例:声明的信道变量。
 package main

 import "fmt"

 func sendData(sendch chan<- int) {
sendch <-
} func main() {
sendch := make(chan<- int)
go sendData(sendch)
receiveData := <-sendch //error 企图去接受信道sendch里面的数据
fmt.Println(receiveData)
}

只能发送的信道

 package main

 func main() {
ch := make(<-chan int)
ch <- //error
}

只能接收信道

    (9)信道的转换

单向信道的作用? 对于信道转换(Channel Conversion),把一个双向信道转换成只能发送信道(send-only)或者接收(Receive Only)信道都是行得通的,但是反过来就不行。

 package main

 import (
"fmt"
"time"
) //send-only
func sendData(sendch chan<- int) {
sendch <-
} //Receive Only
func receiveData(receivech <-chan int) {
rec := <-receivech
fmt.Println(rec)
} func main() {
ch := make(chan int) //双向信道
go sendData(ch)
go receiveData(ch)
time.Sleep(time.Second)
}

信道转换

    (10)关闭信道和使用 for range 遍历信道

  • 数据发送方可以关闭信道,通知接收方这个信道不再有数据发送过来。
  • 当从信道接收数据时,接收方可以多用一个变量来检查信道是否已经关闭。
v, ok := <- ch

注意:上面的语句里,如果成功接收信道所发送的数据,那么 ok 等于 true。而如果 ok 等于 false,说明我们试图读取一个关闭的通道。从关闭的信道读取到的值会是该信道类型的零值。例如,当信道是一个 int 类型的信道时,那么从关闭的信道读取的值将会是 0。

 package main

 import (
"fmt"
) func producer(ch chan int) {
for i := ; i < ; i++ {
ch <- i
}
close(ch)
} func main() {
ch := make(chan int)
go producer(ch)
for {
v, ok := <-ch
if ok == false {
fmt.Println("chnnel closed ", v, ok) //chnnel closed 0 false
break
}
fmt.Println("Received ", v, ok)
}
}

close 信道

 package main

 import (
"fmt"
) func producer(ch chan int) {
for i := ; i < ; i++ {
ch <- i
}
close(ch)
} func main() {
ch := make(chan int)
go producer(ch) for v := range ch {
fmt.Println("Received ", v)
}
}

for range遍历信道

注意:Go 语言中 range 关键字用于 for 循环中迭代数组(array)、切片(slice)、通道(channel)或集合(map)的元素。在数组和切片中它返回元素的索引和索引对应的值,在集合中返回 key-value 对的 key 值。

重新实现计算一个数中每一位的平方和与立方和,然后把平方和与立方和相加?

 package main 

 import (
"fmt"
) func calcDigit(num int, digCh chan int) {
for num != {
digit := num%
digCh <- digit
num /=
}
close(digCh)
} //计算num每位平方之和
func calcSquare(num int, squareCh chan int) {
dch := make(chan int, )
sum :=
go calcDigit(num, dch)
for v := range dch {
sum += v * v
}
fmt.Printf("Sum of squares is %d\n", sum)
squareCh <- sum
} //计算num每位立方之和
func calcCube(num int, cubeCh chan int) {
dch := make(chan int, )
sum :=
go calcDigit(num, dch)
for v := range dch {
sum += v * v * v
}
fmt.Printf("Sum of cubes is %d\n", sum)
cubeCh <- sum
} func main() {
var finalSum int =
squCh := make(chan int, )
cuCh := make(chan int, ) num := go calcSquare(num, squCh)
go calcCube(num, cuCh) squareSum, cubeSum := <-squCh, <-cuCh //阻塞并获取结果
finalSum = squareSum + cubeSum
fmt.Printf("result is %d\n", finalSum)
}

Code

    (11)WaitGroup

          sync 包里的WaitGroup主要用于线程的同步。WaitGroup在WaitGroup这个结构体所实现的方法中,常用的是Add,Done,Wait解释如下:

//A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for.
//Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have
//finished.
//A WaitGroup must not be copied after first use. // Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
func (wg *WaitGroup) Add(delta int) // Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-)
} // Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait()

WaitGroup 使用计数器来工作(WaitGoup.Add(i))。当我们调用 WaitGroup 的 Add 并传递一个 int 时,WaitGroup 的计数器会加上 Add 的传参。要减少计数器,可以调用 WaitGroup 的 Done() 方法(WaitGroup.Done())。Wait() 方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。

 package main 

 import (
"fmt"
"time"
"sync"
) func go_test(num int, wg *sync.WaitGroup) {
fmt.Printf("Start goroutine %d\n", num)
time.Sleep(time.Second)
fmt.Printf("end goroutine %d\n", num)
wg.Done()
} func main() {
var wg sync.WaitGroup
for i := ; i < ; i++ {
wg.Add()
go go_test(i, &wg)
}
wg.Wait() //等待所有协程执行结束 fmt.Println("main finished")
}

WaitGroup示例

     (12)工作池的实现

缓冲信道的重要应用之一就是实现工作池。
           一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。
           下面看两个例子(重点):
            例1: 计算在10000个数里面有多少个素数?
            思路:1)设置一个输入型缓冲信道 intChan 存放这10000个数,每一个数相当于一个任务,10000个数就有一万个任务;
                       2)起多个协程来计算,比如100个,则每个协程相当于处理10000/100=100个任务,并设置一个信道 exitChan用来标记哪个协程处理结束;
                       3)设置一个输出型缓冲信道 resultChan 将 2)的计算结果存入到该信道中。

 package main

 import (
"fmt"
"time"
) func calc(taskChan chan int, resChan chan int, exitChan chan bool) {
for v := range taskChan {
flag := true
for i := ; i < v; i++ {
if v%i == {
flag = false
break
}
} if flag {
resChan <- v
}
} fmt.Println("exit")
exitChan <- true
} func main() {
startTime := time.Now() //开始时间 intChan := make(chan int, )
resultChan := make(chan int, )
exitChan := make(chan bool, ) go func() {
for i := ; i < ; i++ {
intChan <- i
} close(intChan)
}() for i := ; i < ; i++ {
go calc(intChan, resultChan, exitChan)
} //等待所有计算的goroutine全部退出
go func() {
for i := ; i < ; i++ {
<-exitChan
fmt.Println("wait goroutine ", i, " exited")
}
close(resultChan)
}() for v := range resultChan {
fmt.Println(v)
} endTime := time.Now() //结束时间
diff := endTime.Sub(startTime) //耗时
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

计算素数示例

例2:计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)
           思路如下:
           1)创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道;
           2)将作业添加到该输入型缓冲信道中;
           3)作业完成后,再将结果写入一个输出型缓冲信道;
           4)从输出型缓冲信道读取并打印结果。

 package main

 import (
"fmt"
"math/rand"
"sync"
"time"
) //任务
type Job struct {
id int
randomno int
} //结果
type Result struct {
job Job
sumofdigits int
} var jobs = make(chan Job, ) //输入型缓冲信道
var results = make(chan Result, ) //输出型缓冲信道 //计算随机数每位之和
func digits(number int) int {
sum :=
no := number
for no != {
digit := no %
sum += digit
no /=
}
time.Sleep( * time.Second)
return sum
} //工作者
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
} //创建工作池
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := ; i < noOfWorkers; i++ {
wg.Add()
go worker(&wg)
}
wg.Wait() //等待上面起的所有协程计算结束
close(results) //计算结束,关闭信道results
} //分配任务
func allocate(noOfJobs int) {
for i := ; i < noOfJobs; i++ {
randomno := rand.Intn()
job := Job{i, randomno}
jobs <- job
}
close(jobs) //分配完成,关闭信道
} //遍历结果
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
} func main() {
startTime := time.Now() noOfJobs :=
go allocate(noOfJobs) done := make(chan bool)
go result(done)
noOfWorkers :=
createWorkerPool(noOfWorkers)
<-done //等待取到所有结果完成 endTime := time.Now()
diff := endTime.Sub(startTime) //总共耗时
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

计算随机数各位之和

    (13)select

select 语句用于在多个发送/接收信道操作中进行选择。select 语句会一直阻塞,直到发送/接收操作准备就绪。如果有多个信道操作准备完毕,select 会随机地选取其中之一执行。该语法与 switch 类似,所不同的是,这里的每个 case 语句都是信道操作。

以下描述了 select 语句的语法:

  • 每个case都必须是一个信道
  • 所有channel表达式都会被求值
  • 所有被发送的表达式都会被求值
  • 如果任意某个通信可以进行,它就执行;其他被忽略。
  • 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。否则: 如果有default子句,则执行该语句。如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
 package main

 import (
"fmt"
"time"
) func server1(ch chan string) {
time.Sleep( * time.Second)
ch <- "from server1"
} func server2(ch chan string) {
time.Sleep( * time.Second)
ch <- "from server2"
} func main() {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
select {
case s1 := <-output1:
fmt.Println(s1)
case s2 := <-output2:
fmt.Println(s2)
}
}

select example1

select语法的最后一条作用:在没有 case 准备就绪时,可以执行 select 语句中的默认情况(Default Case)。这通常用于防止 select 语句一直阻塞。

 package main

 import (
"fmt"
"time"
) func process(ch chan string) {
time.Sleep( * time.Millisecond)
ch <- "process successful"
} func main() {
ch := make(chan string)
go process(ch)
for {
time.Sleep( * time.Millisecond)
select {
case v := <-ch:
fmt.Println("received value: ", v)
return
default:
fmt.Println("no value received")
}
} }

select default

死锁:

当select没有匹配到任何一个读取信道的结果,此时要是没有default,则select 语句会一直阻塞,从而发生死锁,导致程序panic掉。

 package main

 func main() {
ch := make(chan string)
select {
case <-ch:
}
}

缺少default 导致死锁问题

对于上面问题,如果加上默认情况,则不会出现死锁,会去执行default。select 只含有值为 nil 的信道(比如声明未初始化的信道),也同样会执行默认情况。

 package main

 import "fmt"

 func main() {
ch := make(chan string)
select {
case <-ch:
default:
fmt.Println("excute default")
}
}

执行default

 package main

 import "fmt"

 func main() {
var ch chan string
select {
case v := <-ch:
fmt.Println("received value", v)
default:
fmt.Println("default case executed") }
}

nil default

如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。

 package main

 import (
"fmt"
"time"
) func server1(ch chan string) {
ch <- "from server1"
} func server2(ch chan string) {
ch <- "from server2"
} func main() {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
time.Sleep(time.Second) //等待上面两个协程执行结束,下面随机执行
select {
case s1 := <-output1:
fmt.Println(s1)
case s2 := <-output2:
fmt.Println(s2)
}
}

随机执行select case

      空select:

执行空select会出现死锁。

 package main

 func main() {
select {}
}

空 select

    (14)定时器

  • 定时器的使用
func NewTicker(d Duration) *Ticker
官网对NewTicker的解释:
NewTicker returns a new Ticker containing a channel that will send the time with a period specified by the duration argument.
It adjusts the intervals or drops ticks to make up for slow receivers. The duration d must be greater than zero; if not, NewTicker
will panic. Stop the ticker to release associated resources.
 package main
import (
"fmt"
"time"
) func main() {
t := time.NewTicker(time.Second) //定时器设置为 1s
for v := range t.C {
fmt.Println("hello, ", v)
}
}

example1

 package main

 import (
"fmt"
"time"
) func main() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop() //程序结束关闭定时器
done := make(chan bool)
go func() {
time.Sleep( * time.Second)
done <- true
}()
for {
select {
case <-done:
fmt.Println("Done!")
return
case t := <-ticker.C:
fmt.Println("Current time: ", t)
}
}
}

官网例子

func (t *Ticker) Stop()
官网解释:
Stop turns off a ticker. After Stop, no more ticks will be sent. Stop does not close the channel, to prevent a concurrent goroutine
reading from the channel from seeing an erroneous "tick".
  • 一次定时器
func (t Time) After(u Time) bool
After reports whether the time instant t is after u.
 package main

 import (
"fmt"
"time"
) func main() {
select {
case <- time.After(*time.Second):
fmt.Println("after")
}
}

example

 package main

 import (
"fmt"
"time"
) func main() {
year2000 := time.Date(, , , , , , , time.UTC)
fmt.Println(year2000) year3000 := time.Date(, , , , , , , time.UTC)
fmt.Println(year3000) isYear3000AfterYear2000 := year3000.After(year2000) // True
isYear2000AfterYear3000 := year2000.After(year3000) // False fmt.Printf("year3000.After(year2000) = %v\n", isYear3000AfterYear2000)
fmt.Printf("year2000.After(year3000) = %v\n", isYear2000AfterYear3000) }

官网例子

  • 超时控制

例子是模拟一个数据库操作,如果在1s内未返回结果,则超时退出。

 package main
import (
"fmt"
"time"
)
func queryDb(ch chan int) {
time.Sleep(time.Millisecond) //模拟数据库操作耗时
ch <-
}
func main() {
ch := make(chan int) go queryDb(ch)
t := time.NewTicker(time.Second) //设置超时时间,如果1s内未返回结果则超时退出
defer t.Stop() //程序结束关闭定时器 select {
case v := <-ch:
fmt.Println("result", v)
case <-t.C:
fmt.Println("timeout")
}
}

超时控制

  • 信号处理
 package main

 import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
) var waitGroup sync.WaitGroup func produce(ch chan<- string, exitChan chan bool) { var i int
var exit bool
for {
str := fmt.Sprintf("hello %d", i)
select { //select检测哪个管道可写或者可读
case ch <- str:
case exit = <-exitChan:
}
if exit {
fmt.Printf("user notify produce exited\n")
break
}
}
close(ch)
waitGroup.Done()
} func consume(ch <-chan string) { for {
str, ok := <-ch
if !ok {
fmt.Printf("ch is closed")
break
}
fmt.Printf("value:%s\n", str)
}
waitGroup.Done()
} func main() {
// 在shell终端输入 kill -SIGUSR2 ID 给程序输入终止信号
var ch chan string = make(chan string)
var exitChan chan bool = make(chan bool, )
var sinalChan chan os.Signal = make(chan os.Signal, )
waitGroup.Add()
signal.Notify(sinalChan, syscall.SIGUSR2)
go produce(ch, exitChan)
go consume(ch) <-sinalChan //读取然丢弃
exitChan <- true
waitGroup.Wait() }

信号处理

3. 单元测试

go test命令是一个按照一定的约定和组织的测试代码的驱动程序。 在包目录内, 所有以_test.go为后缀名的源文件并不是go build构建包的一部分, 它们是go test测试的一部分。
      在*_test.go文件中, 有三种类型的函数: 测试函数、 基准测试函数、 示例函数。 一个测试函数是以Test为函数名前缀的函数, 用于测试程序的一些逻辑行为是否正确; go test命令会调用这些测试函数并报告测试结果是PASS或FAIL。本次只了解测试函数,后面会专门写博客学习其他类型的函数。
      go test命令会遍历所有的*_test.go文件中符合上述命名规则的函数, 然后生成一个临时的main包用于调用相应的测试函数, 然后构建并运行、 报告测试结果, 最后清理测试中生成的临时文件。

1)文件名必须以_test.go结尾
     2)使用go test -v执行单元测试

 package main

 func add(a, b int) int {
return a + b
} func sub(a, b int) int {
return a - b
}

calc.go

 package main

 import (
"testing"
) func TestAdd(t *testing.T) {
r := add(, )
if r != {
t.Fatalf("add(2, 4) error, expect:%d, actual:%d", , r)
}
t.Logf("test add succ")
} func TestSub(t *testing.T) {
r := sub(, )
if r != - {
t.Fatalf("sub(2, 4) error, expect:%d, actual:%d", -, r)
}
t.Logf("test sub succ")
}

calc_test.go

 package main

 import (
"encoding/json"
"io/ioutil"
) type student struct {
Name string
Sex string
Age int
} func (p *student) Save() (err error) {
data, err := json.Marshal(p)
if err != nil {
return
} err = ioutil.WriteFile("C:/stu.dat", data, )
return
} func (p *student) Load() (err error) { data, err := ioutil.ReadFile("C:/stu.dat")
if err != nil {
return
} err = json.Unmarshal(data, p)
return
}

student.go

 package main

 import "testing"
import "time" func TestSave(t *testing.T) {
stu := &student{
Name: "stu01",
Sex: "man",
Age: ,
} err := stu.Save()
if err != nil {
t.Fatalf("save student failed, err:%v", err)
} } func TestLoad(t *testing.T) { stu := &student{
Name: "stu01",
Sex: "man",
Age: ,
}
err := stu.Save()
if err != nil {
t.Fatalf("save student failed, err:%v", err)
}
stu2 := &student{}
time.Sleep( * time.Second)
err = stu2.Load()
if err != nil {
t.Fatalf("load student failed, err:%v", err)
}
if stu.Name != stu2.Name {
t.Fatalf("load student failed, name not equal")
}
if stu.Sex != stu2.Sex {
t.Fatalf("load student failed, Sex not equal")
}
if stu.Age != stu2.Age {
t.Fatalf("load student failed, Age not equal")
}
}

student_test.go

将四个文件放到同一个目录下,执行go test -v,执行结果如下:

Go语言学习之8 goroutine详解、定时器与单元测试

参考文献:

  • https://golang.org/pkg/time/
  • https://studygolang.com/articles/12522 (select)
  • https://studygolang.com/articles/12402 (channel)
  • https://studygolang.com/articles/12342 (Go 协程)
  • https://studygolang.com/articles/12341 (并发入门)
  • https://www.cnblogs.com/domestique/p/8410313.html