kotlin flow介绍(2)

时间:2023-02-07 15:21:24

kotlin flow介绍(2)

问题背景

上一篇文章,主要介绍了flow相关的部分概念,包括什么是flow、如何创建flow、flow的简单使用介绍,以及背压等概念,参考 https://blog.51cto.com/baorant24/6039206 ,其中提到了Flow包括冷流和热流两种类型并进行了相关介绍: Flow的定义:异步流,概念上讲依然是响应式流。****按顺序发出多个值的数据流,本质上就是一个生产者消费者模型,生产者发送数据给消费者进行消费。flow流的话分为以下两种: 冷流:当执行collect等末端操作符的时候(也就是有消费者的时候),生产者才开始发射数据流。 生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。 热流:不管有没有执行collect等末端操作符(也就是不管有没有消费者),生产者都会发射数据流到内存中。 生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据。 本文对冷流和热流进行进一步的研究。

问题分析

1、冷流回顾

回顾上篇文章关于冷流的例子,代码如下:

fun main() {
    testFlowCold()
}

fun testFlowCold() {
    runBlocking {
        // 1、flow{...}方式
        val flow = flow {
            delay(1000)
            emit(1)
            delay(1000)
            emit(2)
        }
        println("calling collect first...")
        flow.collect {
            println("collect$it")
        }
        println("calling collect second...")
        flow.collect {
            println("collect$it")
        }
    }
}

运行结果如下: kotlin flow介绍(2) 从运行结果可知flow每次重新收集都会将所有事件重新发送一次。

2、热流MutableSharedFlow简单使用

直接上代码,代码如下:

fun main() {
    testSharedFlow()
}

fun testSharedFlow() {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0, // 事件粘滞数,既当事件发送在订阅者订阅之前,会将订阅之前的第 i 份数据发送给这个新的订阅者。
        extraBufferCapacity = 0, // 接受的慢时候,控制发送的是否入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    runBlocking {
        launch {
            sharedFlow.collect {
                println("collect1 received shared flow $it")
            }
        }
        launch {
            (1..5).forEach {
                println("emit1 send before flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a 100
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }
}

运行结果如下: kotlin flow介绍(2) 运行结果分析: 从运行结果很容易看出,sharedFlow有两次collect操作,一次在sharedFlow发射数据前,一次在数据发射后。发射前的collect操作收集到了数据,发射后collect反倒没收到,这是为什么呢? 第二个流收集被延迟,晚了100毫秒后就收不到了,想当于不管是否订阅,流都会发送,只管发,而collect1能够收集到是因为他在发送之前进行了订阅收集。

3、热流MutableSharedFlow简单分析

(1)sharedflow的源码和定义: kotlin flow介绍(2) 这是一个生成MutableShardFlow的顶层函数,有三个参数: reply:事件粘滞数,既当事件发送在订阅者订阅之前,会将订阅之前的第 i 份数据发送给这个新的订阅者。(类似的liveData粘滞事件就相当于reply=1,不同的是sharedflow可以自定义粘滞的数量)默认值=0,代表没有粘滞,比如2中demo,testSharedFlow设置为0,所以不能收到collect之前发射的数据。

extraBufferCapacity:缓存容量,既然存在粘滞,就说明shardflow是有缓存的,缓存一方面用于粘滞事件的发送,另一方面主要是为了处理响应流中常见的背压问题,既当下游的订阅者collector消费速度低于上游生产速度(比如订阅者被挂起),数据流会被放在缓存中,缓存的大小就是由这个参数控制。

onBufferOverflow:由背压就有处理策略,sharedflow默认为suspend,也即是如果当事件数量超过缓存,发送就会被挂起,其他还有drop_oldest和drop_latest两种枚举值:DROP_OLDEST销毁最旧的值,DROP_LATEST销毁最新的值

4、热流StateFlow简单介绍

直接上demo,代码如下:

fun main() {
    testStateFlow()
}

fun testStateFlow() {
    val stateFlow = MutableStateFlow<Int>(-1)
    runBlocking {
        launch {
            stateFlow.collect {
                println("collect1 received shared flow $it")
            }
        }
        launch {
            // 在发送1-5前先发送一次1
            println("emit1 send before flow 0")
            stateFlow.emit(1)
            println("emit1 send after flow 0")
            // 循环发送1-5
            (1..5).forEach {
                println("emit1 send before flow $it")
                stateFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
    }
}

运行结果如下: kotlin flow介绍(2) 由运行结果可知,一个时间内发送多个事件,会执行最新的一条。我们在发送时间之间增加一个间隔,代码如下:

fun main() {
    testStateFlow()
}

fun testStateFlow() {
    val stateFlow = MutableStateFlow<Int>(-1)
    runBlocking {
        launch {
            stateFlow.collect {
                println("collect1 received shared flow $it")
            }
        }
        launch {
            // 在发送1-5前先发送一次1
            println("emit1 send before flow 0")
            stateFlow.emit(1)
            println("emit1 send after flow 0")
            // 循环发送1-5
            (1..5).forEach {
                delay(100)
                println("emit1 send before flow $it")
                stateFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
    }
}

运行结果如下: kotlin flow介绍(2) 由运行结果可知:每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值

5、SharedFlow和StateFlow的对比

StateFlow就是一个replaySize=1的sharedFlow,同时它必须有一个初始值,此外,每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。 StateFlow重点在状态,ui永远有状态,所以StateFlow必须有初始值,同时对ui而言,过期的状态毫无意义,所以stateFLow永远更新最新的数据(和liveData相似),所以必须有粘滞度=1的粘滞事件,让ui状态保持到最新。另外在一个时间内发送多个事件,不会管中间事件有没有消费完成都会执行最新的一条.(中间值会丢失)。具体应用时,StateFlow适合那些长期保持某种状态的ui,比如一些开关值之类。 SharedFlow侧重在事件,当某个事件触发,发送到队列之中,按照挂起或者非挂起、缓存策略等将事件发送到接受方,在具体使用时,SharedFlow更适合通知ui界面的一些事件,比如toast等,也适合作为viewModel和repository之间的桥梁用作数据的传输。

问题总结

本文在前一篇文章的基础上,进一步分析了kotlin中flow的冷流、以及热流SharedFlow和StateFlow基本使用和介绍,有兴趣的同学可以进一步深入分析。