kotlin中的flow使用,Flow跟生命周期结合

时间:2025-01-17 20:48:18

kotlin的Flow可以连续异步发出多个数据。

1. 普通flow,冷流类似于一个函数,当开始收集时才开始运行

val coldStream = flow {
        for (i in 1..5) {
            delay(100L)
            emit(i)
        }
    }
val collect1 = buildString {
    coldStream.collect { append(it).append(", ") }
}.removeSuffix(", ")
  • 在原来的的 CoroutineContext 中运行, 如果flow{}中运行完毕那流就结束了。
  • flow中的emit需在当前的上下文中调用,不可以切换到其它线程中发送;逻辑可以在其它Context执行,emit需要在当前上下文。
  • 监听者在collect()时是堵塞当前协程的,可以通过独立的上下文job保存,或者withTimeoutOrNull()的方式中途退出。

2. 把Callback转换为Flow流

val flow = callbackFlow<Long> {
    val dis = Observable.interval(200, TimeUnit.MILLISECONDS).subscribe {
        this.offer(it)
    }
    awaitClose {
        println("flow Closed.")
        dis.dispose()
    }
}

withTimeoutOrNull(5000) {
    flow.collect {
        println("Recv $it")
    }
}
  • awaitClose:当收集代码退出时也及时通知数据发送方停止。
  • 上面的offer()提示Deprecated, 但是改为trySend会报错,很奇怪。那就继续使用offer()吧。

3. SharedFlow热流,类似于 rxJava.PublishSubject 或者 LiveData。

val sharedFlow = MutableSharedFlow<Int>()
    scope.launch(Dispatchers.Default) {
        for (i in 1..10) {
            delay(200L)
            sharedFlow.emit(i)
            println("Emit: $i - ${threadName()}")
        }
    }
    scope.launch(Dispatchers.Default) {
        sharedFlow.collect {
            println("Col1: $it - ${threadName()}")
        }
    }

在UI中跟生命周期结合,由UI监听ViewModel的数据变动及时更新界面:

lifecycleScope.launch {
    viewModel.dataListFlow.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
        .collect {
            adapter.datas = it
            adapter.notifyDataSetChanged()
        }
}
  • Channel也是有点类似,可以使用这个代替Channel的使用,还不用学习多一套接口。

4. StateFlow也是热流,能保留最后一个状态值,有数据防抖功能,相同数据不会真正发出。

val stateFlow = MutableSharedFlow<Int>()
// 启动一个 Job 来发射数据
val job = launch {
    repeat(100) { count ->
        delay(200) // 模拟一些工作
        val e = count/2*2
        println("Emit $e")
        stateFlow.emit(e) // 更新 StateFlow 的值
    }
}

// 订阅 StateFlow
val subscription = launch {
    stateFlow.collect { value ->
        println("StateFlow collect: $value")
        if (value >= 8) {
            println("Cancelling subscription...")
            cancel() // 取消收集
        }
    }
}

// 等待一段时间,确保 Job 运行
delay(1200)
job.cancelAndJoin() // 取消数据源的 Job
subscription.cancelAndJoin() // 取消收集的 Job
  • Compose中的UI状态很多都是使用 mutableStateOf() 就是这个 StateFlow()。