问题背景
上一篇文章分析了kotlin中flow的冷流、以及热流SharedFlow和StateFlow基本使用和介绍,参考 https://blog.51cto.com/baorant24/6041918 ,分析了热流的特点,并且对SharedFlow和StateFlow进行了对比: StateFlow就是一个replaySize=1的sharedFlow,同时它必须有一个初始值,此外,每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。 StateFlow重点在状态... SharedFlow侧重在事件,当某个事件触发,发送到队列之中,按照挂起或者非挂起、缓存策略等将事件发送到接受方,在具体使用时,SharedFlow更适合通知ui界面的一些事件,比如toast等,也适合作为viewModel和repository之间的桥梁用作数据的传输。 基于SharedFlow的特点,我们是不是可以使用flow来实现一个eventBus呢?
问题分析
1、SharedFlow进一步分析
上一篇文章简单介绍了sharedflow的源码和定义,参考https://blog.51cto.com/baorant24/6041918 ,我们这里再看一下SharedFlow的部分源码:
(1)collect()方法
划重点,对shardflow调用flow.collect方法永远不会complete,这一点非常重要,如果忽视了这一点,非常容易导致内存溢出。具体实现:
(2)emit方法
向shared flow发送数据,如果缓存溢出会挂起,查看具体实现: kotlinx.coroutines.flow.SharedFlowImpl#emit kotlinx.coroutines.flow.SharedFlowImpl#tryEmit kotlinx.coroutines.flow.SharedFlowImpl#emitSuspend
2、基于sharedFlow实现eventBus
废话不多说,直接上demo,代码如下:
package composer.util
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import java.util.concurrent.ConcurrentHashMap
/**
* 基于flow的eventBus类
*
* @author baorant
* @since 2023/02/05
*/
object FlowEventBus {
private val bus: ConcurrentHashMap<String, MutableSharedFlow<out Any>> = ConcurrentHashMap()
private fun <T : Any> with(key: String): MutableSharedFlow<T> {
if (!bus.containsKey(key)) {
bus[key] = MutableSharedFlow<T>()
}
return bus[key] as MutableSharedFlow<T>
}
/**
* 对外暴露方法获取SharedFlow
*
* @param action String
* @return SharedFlow<T>
*/
fun <T> getFlow(action: String): SharedFlow<T> {
return with(action)
}
/**
* 挂起函数
* @param action String
* @param data T
*/
suspend fun <T : Any> post(action: String, data: T) {
with<T>(action).emit(data)
}
/**
* 详见tryEmit和emit的区别
*
* @param action String
* @param data T
* @return Boolean
*/
fun <T : Any> tryPost(action: String, data: T): Boolean {
return with<T>(action).tryEmit(data)
}
/**
* sharedFlow会长久持有,所以要加声明周期限定,不然会出现内存溢出
*
* @param lifecycle Lifecycle
* @param action String
* @param block Function1<T, Unit>
*/
suspend fun <T : Any> subscribe(lifecycle: Lifecycle, action: String, block: (T) -> Unit) {
lifecycle.repeatOnLifecycle(Lifecycle.State.CREATED) {
with<T>(action).collect {
block(it)
}
}
}
/**
* 使用这个方法需要将协程在合适的时候取消,否则会导致内存溢出
*
* @param action String
* @param block Function1<T, Unit>
*/
@Deprecated("需要注意取消,避免内存泄漏")
suspend fun <T : Any> subscribe(action: String, block: (T) -> Unit) {
with<T>(action).collect {
block(it)
}
}
}
到这里,一个基本的基于flow的eventBus就完成了。下面开始订阅和发送数据: 需要注册监听的地方,订阅数据,代码如下:
/**
* 注册订阅event事件
*/
fun subscribeEvent() {
lifecycleScope.launch {
FlowEventBus.subscribe<String>(lifecycle, "testSharedFlow") {
Log.d("MainActivity", "getMsg $it")
}
}
}
发送数据,代码如下:
/**
* 发送event事件
*/
fun testSharedFlow() {
viewModelScope.launch {
FlowEventBus.post("testSharedFlow", "send msg")
}
}
问题总结
本文在上一篇文章的基础上,进一步分析了热流sharedFlow的特点,介绍了emit()和collect()方法的特点和实现,在此基础上,基于sharedFlow实现了一个基本的eventBus,有兴趣的同学可以进一步深入研究。