协程
- 基本概念
- 定义
- 组成
- 挂起和恢复
- 结构化并发
- 协程构建器
- 作用域构建器
- 挂起函数
- 阻塞与非阻塞
- runBlocking
- 全局协程像守护线程
- Job的生命周期
- 常用函数
- 延时和等待
- 启动和取消
- 启动
- 取消
- 暂停
- 协程启动
- 调度器
- 启动方式
- 启动模式
- 线程上下文
- 继承的定义
- 继承的公式
- 协程取消与超时
- 取消
- 挂起点
- 取消失败
- 可以取消
- 释放资源
- finally块
- 不能取消
- 超时
- withTimeout
- withTimeoutOrNull
- 协程的异常处理
- 异常的传播
- 异常的传播特性
- superviseJob
- 异常的捕获
- 全局异常处理
- 取消与异常
- 异常聚合
- 异步流Flow
- 作用
- 概念
- 冷流
- 流的连续性
- 流构建器
- 流上下文
- 流的取消
- 取消检测
- 背压
- 操作符
- 转换操作符
- 末端操作符
- 组合操作符
- 展平操作符
- 异常处理
- 流的完成
- 通道和多路复用
- channel
- produce与actor
- channel的关闭
- BroadcastChannel
- 多路复用
- SelectClause
- 并发安全
- 避免访问外部可变状态
基本概念
定义
协程基于线程,是轻量级的线程。
我们用GlobalScope启动了一个新的协程,这意味着新协程的生命周期只受整个应用程序的生命周期限制。
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
fun main() {
GlobalScope.launch { // 在后台启动一个新的协程并继续
delay(300) // 等待300毫秒
"rustfisher.com".forEach {
print(it)
delay(200) // 每次打印都等待一下
}
}
println("RustFisher")
Thread.sleep(3000) // 阻塞主线程防止过快退出
}
// 输出结果:
//RustFisher
//rustfisher.com
协程不一定在同一个线程中,它们有在同一个线程的可能性。
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.lang.Thread.sleep
fun main() {
println("main线程信息 ${Thread.currentThread().id}")
for (i in 1..20) { // 多启动几次协程
GlobalScope.launch {
println("协程启动#$i 所在线程id: ${Thread.currentThread().id}")
}
}
sleep(5000) // 阻塞主线程防止过快退出
println("RustFisher 示例结束")
}
输出结果:
协程启动#13 所在线程id: 34
协程启动#2 所在线程id: 22 ---
协程启动#9 所在线程id: 30
......
协程启动#20 所在线程id: 27
协程启动#19 所在线程id: 22 ---
RustFisher 示例结束
组成
kotlin的协程实现分为两个层次:
- 基础设施层:标准库的协程API,主要对协程提供了概念和语义上最基本的支持(如:kotlin.contracts.*)
- 业务框架层:协程的上层框架支持(如:kotlinx.contracts.*)
挂起和恢复
常规函数的基本操作包括:invoke(或call)和 return,协程新增了 suspend 和 resume。
- suspend:挂起/暂停,用于暂停当前协程,并保存所有基本变量
- resume:用于让已暂停的协程从暂停处继续执行
结构化并发
结构化并发(Structured Concurrency)是一种编程范式,用于编写易读、易维护的并发程序。在Kotlin协程中,结构化并发特别指的是协程之间的协作是有组织、有纪律的。这种并发模式允许开发者明确地定义协程的入口和出口,并管理协程之间的依赖关系和生命周期。
结构化并发的核心思想是,当一个协程内部创建了其他协程(子协程)时,这些子协程的生命周期应与父协程同步。具体来说,如果所有子协程在父协程的作用域结束前都已完成执行,则认为当前协程具备结构化并发。当父协程结束时,如果其子协程仍在运行,则父协程会阻塞自己,等待子协程运行完成后才退出。这种机制确保了即使在并发环境中,也能保持代码的清晰性和可维护性。
在Kotlin协程中,结构化并发主要依赖于CoroutineScope
来实现。
当我们使用结构化并发后,可以做到:
- 取消任务:当某项任务不再需要时可以取消它
- 追踪任务:当任务正在执行时,可以追踪它
- 发出错误信号:当协程失败时,会发出错误信号表面有错误发生
定义协程时必须指定其 CoroutineScope
,它是一个用于管理协程生命周期的接口,提供了创建和取消协程的方式。
常用的一些CoroutineScope
:
-
GlobalScope:进程级别的
CoroutineScope
,与应用进程同级;在GlobalScope
中启动的协程不受任何特定生命周期的限制,即使Activity被销毁,协程任务也可以继续执行。通常用于后台任务,如网络请求、定时器等。 -
MainScope:它与Activity的生命周期绑定,在
MainScope
中启动的协程会在Activity的onDestroy
生命周期函数中取消。通常用于在UI线程上执行协程,如更新UI - ViewModelScope
- LifecycleScope
注意:使用协程时,虽然它很轻量,并且不使用主线程,但仍会消耗一些内存资源。如果忘记保持对新启动的协程的引用,它还会继续运行,导致内存泄漏、资源泄露等问题。
协程构建器
- launch:它会立即返回一个
Job
对象,并在后台执行协程任务 - async:这个函数用于启动一个异步协程,并返回一个
Deferred
(继承Job)对象,你可以通过调用await()
方法来获取异步协程的结果。 - …
作用域构建器
以下三种作用域构建器可以直接使用,它们都会继承父协程的 coroutineScope
。自己创建的协程作用域对象,则是使用自己的作用域。
使用自己的作用域的例如:
- CoroutineScope实例对象
- Global.launch
可以记住:小写字母开头会继承,大写的不会
以下三种作用域构建器的异同,在 这里 有详细说明。
runBlocking
coroutineScope
supervisorScope
import kotlinx.coroutines.*
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default)
scope.launch {
delay(1000)
println("runBlocking 会等待CoroutineScope下协程的执行吗")
}
val job = GlobalScope.launch {
delay(1000)
println("runBlocking 会等待GlobalScope下协程的执行吗")
}
}
// 控制台无打印结果,说明这两种方法的作用域没有继承父协程的
fun main() = runBlocking {
coroutineScope {
delay(1000)
println("runBlocking 会等待coroutineScope下协程的执行吗")
}
supervisorScope {
delay(1000)
println("runBlocking 会等待supervisorScope下协程的执行吗")
}
}
// 控制台把两句话都打印了,明这两种方法的作用域会继承父协程的
挂起函数
挂起函数(Suspend Function)是一个特殊类型的函数,它被标记为suspend
,并且只能在协程中调用。挂起函数的主要特点是它们能够在执行过程中挂起(暂停)和恢复(继续执行),而不会阻塞当前线程。这使得挂起函数能够以一种非阻塞的方式执行异步操作,同时保持代码的清晰性和可读性。
挂起函数只能在协程体内或其他挂起函数内调用。
例如:将 launch { …… } 内部的代码块提取到独立的函数中。提取出来的函数需要 suspend 修饰符,它是挂起函数。
阻塞与非阻塞
runBlocking
delay 是非阻塞的, Thread.sleep 是阻塞的。显式使用 runBlocking 协程构建器来阻塞。
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // 在后台启动一个新的协程并继续
delay(200)
"rustfisher.com".forEach {
print(it)
delay(280)
}
}
println("主线程中的代码会立即执行")
runBlocking { // 这个表达式阻塞了主线程
delay(3000L) //阻塞主线程防止过快退出
}
println("\n示例结束")
}
可以看到, runBlocking 里使用了 delay 来延迟。用了 runBlocking 的线程会一直阻塞直到 runBlocking 内部的协程执行完毕。 也就是 runBlocking{ delay } 实现了阻塞的效果。
我们也可以用 runBlocking 来包装主函数,runBlocking 中的Unit目前可以省略,并且runBlocking 也可用在测试中。
import kotlinx.coroutines.*
fun main() = runBlocking {
delay(100) // 在这里可以用delay了
GlobalScope.launch {
delay(100)
println("Fisher")
}
print("Rust ")
delay(3000)
}
全局协程像守护线程
我们在线程介绍中知道,如果进程中只剩下了守护线程,那么虚拟机会退出。
前文那个打印 rustfisher.com 的例子,其实也能看到,字符没打印完程序就结束了。 在 GlobalScope 中启动的活动协程并不会使进程保活。它们就像守护线程。
Job的生命周期
对于每一个创建的协程,会返回一个Job实例,该实例是协程的唯一标识,并且负责管理协程的生命周期。
-
New(新创建):协程对象刚刚通过
launch
或async
等函数创建,但尚未开始执行。 - Active(活跃):协程已经开始执行,但尚未完成。在这个阶段,协程可能会执行自己的任务,也可能会启动子协程。
- Completing(完成中):协程已经完成了自己的任务,但可能还在等待其子协程完成。这个阶段是短暂的,通常很快会过渡到下一个状态。
- Completed(已完成):这是协程的最终状态,表示协程已经成功执行完毕或已被取消。
-
Cancelling(取消中):协程正在等待取消操作完成。这通常发生在调用
Job.cancel()
方法后,但取消操作可能需要一些时间才能完成。 -
Cancelled(已取消):协程已经被取消。这可能是因为协程运行出错,或者显式调用了
Job.cancel()
方法。在已取消状态下,协程的执行将被终止,并且不会再次启动。
需要注意的是,虽然上述状态描述了协程的生命周期,但并非所有状态都是直接可访问的。相反,我们可以通过访问Job对象的属性(如isActive
、isCancelled
和 isCompleted
)来了解协程的当前状态。例如,如果 isActive
为 false
且 isCancelled
为 true
,则表示协程处于取消中(Cancelling)状态;
注意:如果 isCompleted
为 true
,则表示协程已完成(Completed)或已取消(Cancelled)。
常用函数
延时和等待
-
delay
:可以达到延时的效果,是一个特殊的挂起函数,它不会造成线程阻塞,但是会挂起协程,并且只能在协程中使用 -
job.join()
:该方法会挂起当前协程,等待job
协程执行完成,可以用于协程之间的顺序执行 -
joinAll(job...)
:可以同时让多个Job调用join方法 -
job.await()
:该方法被join类似,但可以获取协程完成的结果
启动和取消
启动
launch
和 async
协程构建器都用于启动新协程。
- lauch:返回一个
Job
并且不附带任何结果值。 - async:返回一个
Deferred
,它也是Job
,但可以使用await()
来获取协程执行完后的返回值。
取消
-
Job.cancel()
:用于取消协程。 -
Job.cancelAndJoin()
:同时具有cancel()
和join()
的作用。
暂停
yield()
:用于让当前协程暂停执行,并将执行权交还给协程调度器,以便让其他协程有机会运行。
当你在协程中使用 yield()
时,当前协程会进入挂起状态,但不会释放其占用的资源(如内存栈)。当协程调度器决定再次执行该协程时,它会从 yield()
调用点恢复执行。
这个函数通常用于实现非阻塞的并发编程,特别是在处理密集计算的场景时,通过 yield()
可以避免长时间占用线程,从而提高应用的响应性和性能。
注意:yield()
并不保证一定会导致协程切换。它的行为取决于当前的调度策略和协程调度器的实现。
import kotlinx.coroutines.*
fun main() = runBlocking {
launch {
delay(1000L) // 等待1秒
println("World!")
}
repeat(1000) { i ->
println("Hello $i")
//在每次打印500个Hello后,暂停当前协程,让hello有机会输出
if (i % 500 == 0) {
yield()
}
}
}
协程启动
调度器
所有协程必须在调度器中运行。
- Dispatchers.Main:这是主线程调度器,用于处理UI交互和一些轻量级任务(调用挂起函数、UI函数,更新LiveData)
- Dispatchers.Default:这是默认调度器,通常用于CPU密集型任务,如大量计算或数据处理。它会使用共享后台线程的公共池来执行协程。
- Dispatchers.IO:这是用于IO密集型任务的调度器,如文件读写、网络请求等。它同样使用共享线程池,但专注于IO操作。
- Dispatchers.Unconfined:这是一个不限制协程执行线程的调度器。它不会将协程绑定到特定的线程或线程池,而是允许协程在任意线程中执行。通常,这个调度器在某些特殊场景下使用,比如你需要完全控制协程的执行线程。
启动方式
-
launch:这是最常用的启动协程的方式。它会立即返回一个
Job
对象,并在后台执行协程任务。如果在启动协程时使用了try-catch
块,异常会被捕获;否则,异常会传递给未捕获异常处理器进行处理。 -
async:这个函数用于启动一个异步协程,并返回一个
Deferred
对象,你可以通过调用await()
方法来获取异步协程的结果。 -
runBlocking:这是一个阻塞式函数,通常用于测试和调试协程代码。它会启动一个新的协程,并等待其协程体以及所有子协程结束执行完毕后才会返回。在调用
runBlocking
时,当前线程会被阻塞,直到协程执行完毕。 - coroutineScope:它会创建一个协程作用域,并等待其协程体以及所有子协程结束。如果一个子协程失败了,当前域和域内的协程都会被取消。(子协程的异常会传播到父协程)
-
supervisorScope:与
coroutineScope
一样,但它在子协程失败时,不会影响域内其他协程的执行(子协程的异常不会传播到父协程)
注: runBlocking
是常规函数,会堵塞住当前线程;coroutineScope
和 supervisorScope
是挂起函数,不会堵塞住当前线程。
import kotlinx.coroutines.*
import java.lang.RuntimeException
suspend fun main() {
// 换成supervisorScope后,使用log都会被输出
coroutineScope {
val job1 = launch {
delay(5000)
// 以下输出无法执行
println("Job1 完成")
}
launch {
println("job2 执行了")
throw RuntimeException()
}
delay(1000)
// 以下输出无法执行
println("会执行我吗?")
}
}
启动模式
- DEFAULT:这是协程的默认启动模式。当协程创建后,它会立即开始调度。如果在调度前协程被取消,它会直接进入取消状态。
- ATOMIC:协程创建后会立即开始调度,协程在执行到第一个挂起点之前不响应取消。
-
LAZY:协程只有在被需要时才会开始调度,如主动调用协程的
start
、join
或await
等函数时。如果协程在调度前被取消,它将直接进入异常结束状态。 - UNDISPATCHED:协程创建后会在当前函数调用栈中立即执行,直到遇到第一个真正挂起的点。
如果想要你的协程立刻执行,而不是等待调度,可以使用最后一个模式。
注:调度不等于执行。调度(scheduling)是指决定协程何时在哪个线程上开始或恢复执行的过程,而执行(execution)是指协程代码的实际运行。协程什么时候执行取决于调度器的当前状态和其他协程的优先级。
import kotlinx.coroutines.*
fun main() = runBlocking {
// 默认模式下程序立刻就结束了,没有打印出log
//ATOMIC模式下,log被打印
val job1 = launch(start = CoroutineStart.ATOMIC) {
Thread.sleep(5000)
println("Job 完成")
}
// job2立刻就执行了,使用其他模式需要等待job1睡眠结束
// 这是使用runBlocking启动方式的情况,使用coroutineScope则会立刻执行job2
val job2 = launch(start = CoroutineStart.UNDISPATCHED) {
println("立刻执行了")
}
job1.cancel()
}
注:使用UNDISPATCHED
模式可以让你的协程调度器即使为Dispatchers.IO
类型(使用后台线程),仍在主线程中执行。
线程上下文
CoroutineContext
是一个接口,用于描述协程的运行环境,包含了与协程执行相关的各种参数和配置信息。
CoroutineContext
主要包含以下几个方面的元素:
-
Job:代表协程的生命周期。通过 Job,你可以控制协程的启动、取消和等待其完成。
-
CoroutineDispatcher:协程调度器,用于向合适的线程分发任务。它决定了协程在哪个线程上执行。
-
CoroutineName:协程的名称,主要用于调试目的。
-
CoroutineExceptionHandler:处理协程中发生的(未被捕捉)异常。
有时我们需要在协程上下文中定义多个元素,可以用 +
操作符来实现。
例如:为一个协程指定一个调度器和名称
fun main() = runBlocking<Unit> {
launch(Dispatchers.Default + CoroutineName("test")) {
println("我工作在:${Thread.currentThread().name}")
}
}
继承的定义
新创建的协程,它的 CoroutineContext
会包含一个全新的Job,并返回,用于控制新协程的生命周期。而它上下文中剩下的元素会从 创建该协程的CoroutineScope 或 父协程的 CoroutineContext
继承。
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("test"))
// 这里调用launch方法时继承了scope的上下文
val job = scope.launch {
println("job-launch:${coroutineContext[CoroutineName]} ${Thread.currentThread().name}")
// 这里调用launch方法时继承了父协程的上下文
val childJob = async {
println("childJob-launch:${coroutineContext[CoroutineName]} ${Thread.currentThread().name}")
coroutineContext[Job]
}
println("新协程的上下文中的Job对象 等于 返回的Job对象吗:" + (childJob == childJob.await())) // 为true
}
job.join()
}
继承的公式
协程的上下文 = 默认值 + 继承的CoroutineScope + 参数
- 默认值:如 CoruoutineDispatchers 的默认值为 Dispatchers.Default
- 继承的继承的CoroutineScope是CoroutineScope或父协程的CoroutineContext
- 传入协程构建器的参数,其优先级高于继承的上下文参数,会覆盖对应的参数值
协程取消与超时
取消
我们可以在协程尚未结束时主动取消协程,协程在处于挂起点的时候就会被取消。
- 取消作用域时,会把它的子协程都取消
- 被取消的子协程不会影响其余兄弟协程
- 协程通过抛出一个异常
CancellationException
来处理取消操作 - kotlinx.coroutines 中的挂起函数都是可被取消的
- 取消协程时,抛出的异常会被静默处理,当作正常完成
挂起点
当挂起函数被调用时,它们会暂停当前协程的执行,直到挂起函数的操作完成或需要等待某个条件满足。这些暂停点被称为挂起点(Suspension Points)。
协程只有在挂起点(即协程暂停执行并等待某些条件满足的点)才会检查其取消状态。这些挂起点通常是由挂起函数(如 delay
、withContext
等)产生的。如果协程在挂起点发现它已经被取消,那么它通常会立即停止执行并抛出 CancellationException
。
需要注意的是,挂起点不仅仅是挂起函数本身产生的,还包括了挂起函数内部可能调用的其他挂起函数。一个协程可能会在多个挂起点之间来回切换,直到最终完成。
另外,不是所有标记为 suspend
的函数都会产生挂起点。有些挂起函数可能会立即返回结果,而不会导致协程挂起。这取决于函数内部的实现和调用时的上下文。
取消失败
如果协程在执行计算(cpu密集型)任务,并且没检查取消的话,那我们的取消尝试会失败。
import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0 // 模拟的控制循环数量
while (i < 5) { // 模拟耗时计算
if (System.currentTimeMillis() >= nextPrintTime) {
println("[job] 模拟耗时计算中 ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(800) // 等待一会
println("[rustfisher] 尝试取消协程")
job.cancelAndJoin()
println("程序退出 bye~")
}
[job] 模拟耗时计算中 0 ...
[job] 模拟耗时计算中 1 ...
[rustfisher] 尝试取消协程
[job] 模拟耗时计算中 2 ...
[job] 模拟耗时计算中 3 ...
[job] 模拟耗时计算中 4 ...
程序退出 bye~
可以看到,模拟耗时计算直到4,整个程序退出。而调用 cancelAndJoin() 并没有成功取消掉协程。
可以取消
让协程可被取消的方法
- 显式的检查取消状态,例如检查 isActive 变量
- 使用
ensureActive
方法,如果Job
处于非活跃状态,这个方法就会立即抛出异常CancellationException
- 使用
yield
方法,它会检查协程的状态,如果状态为已取消
,则抛出异常CancellationException
;它还会让出线程的执行权 - 补:使用
delay(值>0)
,它会让协程处于挂起点
注:实际调用取消方法后,如果协程在挂起点则会抛出异常进行取消。原因是 Job 对象的 isCancelled
变为 true
后,调用会使协程挂起的函数时都会抛出异常而成功取消掉协程。
对上面的代码进行一些改进。把 while (i < 5) 循环中的条件改成 while (isActive) 。修改后的代 码如下:
import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5 && isActive) { // 模拟耗时计算
// 或者在这调用ensureActive、yield、delay(1)都会抛出异常取消掉任务,其他会使当前协程处于挂起点的函数
if (System.currentTimeMillis() >= nextPrintTime) {
println("[job] 模拟耗时计算中 ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(800) // 等待一会
println("[rustfisher] 尝试取消协程")
job.cancelAndJoin()
println("程序退出 bye~")
}
[job] 模拟耗时计算中 0 ...
[job] 模拟耗时计算中 1 ...
[rustfisher] 尝试取消协程
程序退出 bye~
释放资源
finally块
取消协程时,挂起函数(使用suspend修饰的函数)会抛出异常:CancellationException。我们可以使用try-catch-finally来处理。并且在 finally块中释放资源。
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("[job]模拟计算次数 $i ...")
delay(300L)
}
} catch (e: CancellationException) {
println("[job] CancellationException ${e.message}")
} finally {
println("[job][finally] 释放资源..")
}
}
delay(800) // 等待一会
println("[rustfisher] 尝试取消协程")
job.cancelAndJoin()
println("[rustfisher] 程序退出 bye~")
}
不能取消
有时候,我们需要运行不能取消的代码块。
withCotext(context){}
:使用给定的协程上下文调用指定的挂起块,挂起直到完成,然后返回结果。
withContext(NonCancellable)
可以创建一个无法取消的协程作用域,确保在这个作用域内执行的挂起函数不会被取消。这通常在资源释放或清理操作的上下文中使用,这些操作可能需要在协程被取消后仍然执行。
实际上,这里在finally块中调用了delay方法,它会检查协程 isCancelled
的值,发现为true
就会抛出异常,导致执行完delay方法后面的代码无法执行。若把delay方法换成Thread.sleep方法,或在finally块中再捕捉一次一次,即使没使用withContext(NonCancellable)
也能保证finally块中的代码都被执行。
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("[job]模拟计算次数 $i ...")
delay(300L)
}
} catch (e: CancellationException) {
println("[job] CancellationException ${e.message}")
} finally {
withContext(NonCancellable) {
println("[job][finally] 进入NonCancellable")
delay(1000) // 假设这里还有一些耗时操作
println("[job][finally] NonCancellable完毕")
}
println("[job][finally] 结束")
}
}
delay(800) // 等待一会
println("[rustfisher] 尝试取消协程")
job.cancelAndJoin()
println("[rustfisher] 程序退出 bye~")
}
运行结果如下: