kotlin flow介绍(1)
问题背景
kotlin的日常开发和使用过程中,flow是一个很常见的东西,那么问题来了,flow是什么东东呢? Flow的定义:异步流,概念上讲依然是响应式流。****按顺序发出多个值的数据流,本质上就是一个生产者消费者模型,生产者发送数据给消费者进行消费。flow流的话分为以下两种: 冷流:当执行collect等末端操作符的时候(也就是有消费者的时候),生产者才开始发射数据流。 生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。 热流:不管有没有执行collect等末端操作符(也就是不管有没有消费者),生产者都会发射数据流到内存中。 生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据
问题分析
1、Flow的简单使用
(1)flow{...}内部可以调用suspend函数; (2)通过emit()方法来发射数据; (3)通过collect()方法来收集结果。简单使用代码如下:
fun main() {
testFlowNormal1()
}
fun testFlowNormal1() {
runBlocking {
val flow = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
}
flow.collect {
println("collect$it")
}
println("collect ok")
}
}
运行结果如下:
2、flow创建的常用方式
(1)flow{...}中通过emit()方法来发送数据; (2)flowOf()一个发射固定值集的流; (3)asFlow()拓展函数,可能将集合和序列转换成流。 示例代码如下:
fun main() {
testFlowNormal1()
testFlowNormal2()
testFlowNormal3()
}
fun testFlowNormal1() {
runBlocking {
// 1、flow{...}方式
val flow = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
}
flow.collect {
println("collect$it")
}
println("collect testFlowNormal1 ok")
}
}
fun testFlowNormal2() {
runBlocking {
// 2、flowOf方式
val flow = flowOf(2, 3).onEach { delay(1000) }
flow.collect {
println("collect$it")
}
println("collect testFlowNormal2 ok")
}
}
fun testFlowNormal3() {
runBlocking {
// 3、asFlow方式
val flow = listOf(4, 5).asFlow().onEach { delay(1000) }
flow.collect {
println("collect$it")
}
println("collect testFlowNormal3 ok")
}
}
运行结果如下:
3、Flow是冷流
调用末端流操作符( collect 是其中之一)之前,flow{ ... } 中的代码不会执行。代码如下:
fun main() {
testFlowCold()
}
fun testFlowCold() {
runBlocking {
// 1、flow{...}方式
val flow = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
}
println("calling collect first...")
flow.collect {
println("collect$it")
}
println("calling collect second...")
flow.collect {
println("collect$it")
}
}
}
运行结果如下:
4、末端流操作符
在流上用于启动流收集的挂起函数。 collect 是最基础的末端操作符。collect /reduce /fold/toList 等都是末端操作符,示例代码如下:
fun main() {
testFlowFinalFun()
}
fun testFlowFinalFun() {
runBlocking {
// 1、flow{...}方式
val flow = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
}
// collect方法
flow.collect {
println("collect$it")
}
// reduce方式
val reduceSum = flow.reduce { a, b -> a + b }
println("reduceSum: $reduceSum")
// fold方式
val foldNum = flow.fold(100) {
a, b -> a + b
}
println("foldNum $foldNum")
}
}
运行结果如下:
5、过渡操作符
过渡操作符应用于上游流,并返回下游流。这些操作符也是冷操作符,也就是说如果没有 '被订阅'就不会执行。onStart/catch/onCompletion/map/filter 等都是过渡操作符。示例代码如下:
fun main() {
testFlowNormal()
}
fun testFlowNormal() {
GlobalScope.launch {
flow {
emit("good")
}.flowOn(Dispatchers.IO) // 切换线程
.onStart {
println("onStart")
}.catch {
println("catch:${it.message}") // 有异常会进入此方法
}.onCompletion {
println("oniComplete:${it?.message}") // 无论是否有异常都会执行
}.collect { // 收集流
println("result = $it")
}
}
Thread.sleep(6000)
}
运行结果如下:
6、和LiveData的区别
flow是kotlin中类似rxJava中flowable的响应流。同样是观察者模式,和livedata的区别在于 1、livedata是生命周期感知的,在整个mvvm架构中适合使用在view层和viewmodel层之间交互,而flow是没有生命周期感知,适合用在model层和viewmodel之间 2、livedata无法处理背压问题,只能显示最新数据,flow和flowable类似可以处理这类问题 3、livedata都放在主线程处理数据,对于线程控制不太理想,flow可以结合协程实现线程切换
7、flow 的背压
什么是背压? 背压的概念:以自然界的水流为例,当上游的流速大于下游的流速,日积月累,最终导致大坝溢出,此种现象称为背压的出现,对应到Kotlin里的Flow,也有上游(生产者)、下游(消费者)的概念,背压的场景是一样的。 1、如何处理背压? 先来模拟一个生产者消费者速度不一致的场景,代码如下:
fun main() {
runBlocking {
testFlowNormal()
}
}
suspend fun testFlowNormal() {
val flow = flow {
(1..3).forEach {
delay(1000)
println("emit $it")
emit(it)
}
}
val time = measureTimeMillis {
flow.collect {
delay(2000)
println("collect:$it")
}
}
println("use time:${time} ms")
}
运行结果如下: 生产者的速度比消费者的速度快,生产者必须等待消费者消费完毕后才会进行下一次生产。 因此,整个流的耗时=生产者耗时(3 * 1000ms)+消费者耗时(3 * 2000ms)=9s。 显而易见,消费者影响了生产者的速度,这种情况下该怎么优化呢? 2、buffer的使用 代码如下:
fun main() {
runBlocking {
testFlowNormal()
}
}
suspend fun testFlowNormal() {
val flow = flow {
(1..3).forEach {
delay(1000)
println("emit $it")
emit(it)
}
}.buffer(5) // 使用buffer
val time = measureTimeMillis {
flow.collect {
delay(2000)
println("collect:$it")
}
}
println("use time:${time} ms")
}
运行结果如下: 对比没有buffer情况,大概节省了2秒左右。
问题总结
本文主要介绍了flow相关的部分概念,包括什么是flow、如何创建flow、flow的简单使用介绍,以及背压等概念,有兴趣的同学可以进一步深入研究。