kotlin flow介绍(2)
问题背景
上一篇文章,主要介绍了flow相关的部分概念,包括什么是flow、如何创建flow、flow的简单使用介绍,以及背压等概念,参考 https://blog.51cto.com/baorant24/6039206 ,其中提到了Flow包括冷流和热流两种类型并进行了相关介绍: Flow的定义:异步流,概念上讲依然是响应式流。****按顺序发出多个值的数据流,本质上就是一个生产者消费者模型,生产者发送数据给消费者进行消费。flow流的话分为以下两种: 冷流:当执行collect等末端操作符的时候(也就是有消费者的时候),生产者才开始发射数据流。 生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。 热流:不管有没有执行collect等末端操作符(也就是不管有没有消费者),生产者都会发射数据流到内存中。 生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据。 本文对冷流和热流进行进一步的研究。
问题分析
1、冷流回顾
回顾上篇文章关于冷流的例子,代码如下:
fun main() {
testFlowCold()
}
fun testFlowCold() {
runBlocking {
// 1、flow{...}方式
val flow = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
}
println("calling collect first...")
flow.collect {
println("collect$it")
}
println("calling collect second...")
flow.collect {
println("collect$it")
}
}
}
运行结果如下: 从运行结果可知flow每次重新收集都会将所有事件重新发送一次。
2、热流MutableSharedFlow简单使用
直接上代码,代码如下:
fun main() {
testSharedFlow()
}
fun testSharedFlow() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0, // 事件粘滞数,既当事件发送在订阅者订阅之前,会将订阅之前的第 i 份数据发送给这个新的订阅者。
extraBufferCapacity = 0, // 接受的慢时候,控制发送的是否入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
runBlocking {
launch {
sharedFlow.collect {
println("collect1 received shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send before flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a 100
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
运行结果如下: 运行结果分析: 从运行结果很容易看出,sharedFlow有两次collect操作,一次在sharedFlow发射数据前,一次在数据发射后。发射前的collect操作收集到了数据,发射后collect反倒没收到,这是为什么呢? 第二个流收集被延迟,晚了100毫秒后就收不到了,想当于不管是否订阅,流都会发送,只管发,而collect1能够收集到是因为他在发送之前进行了订阅收集。
3、热流MutableSharedFlow简单分析
(1)sharedflow的源码和定义: 这是一个生成MutableShardFlow的顶层函数,有三个参数: reply:事件粘滞数,既当事件发送在订阅者订阅之前,会将订阅之前的第 i 份数据发送给这个新的订阅者。(类似的liveData粘滞事件就相当于reply=1,不同的是sharedflow可以自定义粘滞的数量)默认值=0,代表没有粘滞,比如2中demo,testSharedFlow设置为0,所以不能收到collect之前发射的数据。
extraBufferCapacity:缓存容量,既然存在粘滞,就说明shardflow是有缓存的,缓存一方面用于粘滞事件的发送,另一方面主要是为了处理响应流中常见的背压问题,既当下游的订阅者collector消费速度低于上游生产速度(比如订阅者被挂起),数据流会被放在缓存中,缓存的大小就是由这个参数控制。
onBufferOverflow:由背压就有处理策略,sharedflow默认为suspend,也即是如果当事件数量超过缓存,发送就会被挂起,其他还有drop_oldest和drop_latest两种枚举值:DROP_OLDEST销毁最旧的值,DROP_LATEST销毁最新的值
4、热流StateFlow简单介绍
直接上demo,代码如下:
fun main() {
testStateFlow()
}
fun testStateFlow() {
val stateFlow = MutableStateFlow<Int>(-1)
runBlocking {
launch {
stateFlow.collect {
println("collect1 received shared flow $it")
}
}
launch {
// 在发送1-5前先发送一次1
println("emit1 send before flow 0")
stateFlow.emit(1)
println("emit1 send after flow 0")
// 循环发送1-5
(1..5).forEach {
println("emit1 send before flow $it")
stateFlow.emit(it)
println("emit1 send after flow $it")
}
}
}
}
运行结果如下: 由运行结果可知,一个时间内发送多个事件,会执行最新的一条。我们在发送时间之间增加一个间隔,代码如下:
fun main() {
testStateFlow()
}
fun testStateFlow() {
val stateFlow = MutableStateFlow<Int>(-1)
runBlocking {
launch {
stateFlow.collect {
println("collect1 received shared flow $it")
}
}
launch {
// 在发送1-5前先发送一次1
println("emit1 send before flow 0")
stateFlow.emit(1)
println("emit1 send after flow 0")
// 循环发送1-5
(1..5).forEach {
delay(100)
println("emit1 send before flow $it")
stateFlow.emit(it)
println("emit1 send after flow $it")
}
}
}
}
运行结果如下: 由运行结果可知:每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值
5、SharedFlow和StateFlow的对比
StateFlow就是一个replaySize=1的sharedFlow,同时它必须有一个初始值,此外,每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。 StateFlow重点在状态,ui永远有状态,所以StateFlow必须有初始值,同时对ui而言,过期的状态毫无意义,所以stateFLow永远更新最新的数据(和liveData相似),所以必须有粘滞度=1的粘滞事件,让ui状态保持到最新。另外在一个时间内发送多个事件,不会管中间事件有没有消费完成都会执行最新的一条.(中间值会丢失)。具体应用时,StateFlow适合那些长期保持某种状态的ui,比如一些开关值之类。 SharedFlow侧重在事件,当某个事件触发,发送到队列之中,按照挂起或者非挂起、缓存策略等将事件发送到接受方,在具体使用时,SharedFlow更适合通知ui界面的一些事件,比如toast等,也适合作为viewModel和repository之间的桥梁用作数据的传输。
问题总结
本文在前一篇文章的基础上,进一步分析了kotlin中flow的冷流、以及热流SharedFlow和StateFlow基本使用和介绍,有兴趣的同学可以进一步深入分析。