kotlin flow介绍(3)——基于Flow实现eventBus

时间:2023-02-08 08:01:38

问题背景

上一篇文章分析了kotlin中flow的冷流、以及热流SharedFlow和StateFlow基本使用和介绍,参考 https://blog.51cto.com/baorant24/6041918 ,分析了热流的特点,并且对SharedFlow和StateFlow进行了对比: StateFlow就是一个replaySize=1的sharedFlow,同时它必须有一个初始值,此外,每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。 StateFlow重点在状态... SharedFlow侧重在事件,当某个事件触发,发送到队列之中,按照挂起或者非挂起、缓存策略等将事件发送到接受方,在具体使用时,SharedFlow更适合通知ui界面的一些事件,比如toast等,也适合作为viewModel和repository之间的桥梁用作数据的传输。 基于SharedFlow的特点,我们是不是可以使用flow来实现一个eventBus呢?

问题分析

1、SharedFlow进一步分析

上一篇文章简单介绍了sharedflow的源码和定义,参考https://blog.51cto.com/baorant24/6041918 ,我们这里再看一下SharedFlow的部分源码:

(1)collect()方法

kotlin flow介绍(3)——基于Flow实现eventBus 划重点,对shardflow调用flow.collect方法永远不会complete,这一点非常重要,如果忽视了这一点,非常容易导致内存溢出。具体实现: kotlin flow介绍(3)——基于Flow实现eventBus

(2)emit方法

kotlin flow介绍(3)——基于Flow实现eventBus 向shared flow发送数据,如果缓存溢出会挂起,查看具体实现: kotlinx.coroutines.flow.SharedFlowImpl#emit kotlin flow介绍(3)——基于Flow实现eventBus kotlinx.coroutines.flow.SharedFlowImpl#tryEmit kotlin flow介绍(3)——基于Flow实现eventBus kotlinx.coroutines.flow.SharedFlowImpl#emitSuspend kotlin flow介绍(3)——基于Flow实现eventBus

2、基于sharedFlow实现eventBus

废话不多说,直接上demo,代码如下:

package composer.util

import androidx.lifecycle.Lifecycle
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import java.util.concurrent.ConcurrentHashMap

/**
 * 基于flow的eventBus类
 *
 * @author baorant
 * @since 2023/02/05
 */
object FlowEventBus {
    private val bus: ConcurrentHashMap<String, MutableSharedFlow<out Any>> = ConcurrentHashMap()

    private fun <T : Any> with(key: String): MutableSharedFlow<T> {
        if (!bus.containsKey(key)) {
            bus[key] = MutableSharedFlow<T>()
        }
        return bus[key] as MutableSharedFlow<T>
    }

    /**
     * 对外暴露方法获取SharedFlow
     *
     * @param action String
     * @return SharedFlow<T>
     */
    fun <T> getFlow(action: String): SharedFlow<T> {
        return with(action)
    }

    /**
     * 挂起函数
     * @param action String
     * @param data T
     */
    suspend fun <T : Any> post(action: String, data: T) {
        with<T>(action).emit(data)
    }

    /**
     * 详见tryEmit和emit的区别
     *
     * @param action String
     * @param data T
     * @return Boolean
     */
    fun <T : Any> tryPost(action: String, data: T): Boolean {
        return with<T>(action).tryEmit(data)
    }

    /**
     * sharedFlow会长久持有,所以要加声明周期限定,不然会出现内存溢出
     *
     * @param lifecycle Lifecycle
     * @param action String
     * @param block Function1<T, Unit>
     */
    suspend fun <T : Any> subscribe(lifecycle: Lifecycle, action: String, block: (T) -> Unit) {
        lifecycle.repeatOnLifecycle(Lifecycle.State.CREATED) {
            with<T>(action).collect {
                block(it)
            }
        }
    }

    /**
     * 使用这个方法需要将协程在合适的时候取消,否则会导致内存溢出
     *
     * @param action String
     * @param block Function1<T, Unit>
     */
    @Deprecated("需要注意取消,避免内存泄漏")
    suspend fun <T : Any> subscribe(action: String, block: (T) -> Unit) {
        with<T>(action).collect {
            block(it)
        }
    }

}

到这里,一个基本的基于flow的eventBus就完成了。下面开始订阅和发送数据: 需要注册监听的地方,订阅数据,代码如下:

    /**
     * 注册订阅event事件
     */
    fun subscribeEvent() {
        lifecycleScope.launch {
            FlowEventBus.subscribe<String>(lifecycle, "testSharedFlow") {
                Log.d("MainActivity", "getMsg $it")
            }
        }
    }

发送数据,代码如下:

    /**
     * 发送event事件
     */
    fun testSharedFlow() {
        viewModelScope.launch {
            FlowEventBus.post("testSharedFlow", "send msg")
        }
    }

问题总结

本文在上一篇文章的基础上,进一步分析了热流sharedFlow的特点,介绍了emit()和collect()方法的特点和实现,在此基础上,基于sharedFlow实现了一个基本的eventBus,有兴趣的同学可以进一步深入研究。