上周五分享过一篇 RxBus 深入分析,有些朋友第一次接触,可能理解起来颇为费事,恰巧 VipOthershe 也投稿了一篇,偏重于实战,今天分享给大家,希望能加深理解。另外,文末还有 RxJava 相关文章链接,有需要的朋友可以查看。
VipOthershe 的博客地址:
http://www.jianshu.com/users/4235f2b5b350
RxBus 的核心功能是基于 Rxjava 的,既然是模拟 EventBus,我们需要搞清楚 RxJava 满足实现 EventBus 的那些条件,这样才能更好的实现 RxBus。
EventBus 是 Android上的一个事件发布/订阅的事件总线框架,可以充分的解耦,简化了四大组件、UI线程与子线程的间的事件传递等等。它基本工作流程如下:
-
订阅:
EventBus.getDefault().register(this);
-
发送事件:
EventBus.getDefault().post(event);
-
接受、处理事件:
onEventXXX(Object event);
-
取消订阅:
EventBus.getDefault().unregister(this);
根据 EventBus 的工作流程,我们的 RxBus 首先需要自身的实例,这一点我们可以仿照 EventBus的getDefault()方法,通过一个单例来实现。
有了 RxBus 实例就可以进行订阅了,在 RxJava 中有个 Subject类,它继承Observable类,同时实现了 Observer接口,因此 Subject 可以同时担当订阅者和被订阅者的角色,这里我们使用 Subject 的 子类PublishSubject 来创建一个 Subject 对象(PublishSubject 只有被订阅后才会把接收到的事件立刻发送给订阅者),在需要接收事件的地方,订阅该 Subject对象,之后如果 Subject对象 接收到事件,则会发射给该订阅者,此时 Subject对象 充当被订阅者的角色。
完成了订阅,在需要发送事件的地方将事件发送给之前被订阅的 Subject对象,则此时 Subject对象 做为订阅者接收事件,然后会立刻将事件转发给订阅该 Subject对象 的订阅者,以便订阅者处理相应事件,到这里就完成了事件的发送与处理。
最后就是取消订阅的操作了,Rxjava 中,订阅操作会返回一个 Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个 Subscription对象,我们可以用一个 CompositeSubscription 存储起来,以进行批量的取消订阅。
到这里我们已经结合 EventBus 对 RxBus 的可行性以及大概的实现流程进行了分析,接下来结合实现代码再做进一步的解释:
先看一下这个私有的构造函数:
private RxBus() {
mSubject = new SerializedSubject<>(PublishSubject.create());
}
由于 Subject类是非线程安全的,所以我们通过它的 子类SerializedSubject 将 PublishSubject 转换成一个线程安全的Subject对象。之后可通过单例方法 getInstance() 进行 RxBus 的初始化。
在 toObservable() 根据事件类型,通过 mSubject.ofType(type);得到一个 Observable对象,让其它订阅者来订阅。其实 ofType()方法,会过滤掉不符合条件的事件类型,然后将满足条件的事件类型通过 cast()方法,转换成对应类型的 Observable对象,这点可通过源码查看。
同时封装了一个简单的订阅方法 doSubscribe(),只需要传入事件类型,相应的回调即可。其实可以根据需求在 RxBus 中扩展满足自己需求的 doSubscribe()方法,来简化使用时的代码逻辑。
在需要发送事件的地方调用 post()方法,它间接的通过mSubject.onNext(o);将事件发送给订阅者。
同时 RxBus 提供了 addSubscription()、unSubscribe()方法,分别来保存订阅时返回的 Subscription对象,以及取消订阅。
接下我们在具体的场景中测试一下:
1. 我们在 Activity的onCreate()方法 中进行进行订阅操作:
可以看到我们设定事件类型为 String,并且 Subscriber 的回调发生在主线程,同时保存了 Subscription对象。
然后通过一个 Button 发送事件:
我们直接在UI线程发送了 String类型的1024,看效果:
2. 同样在 onCreate()方法 中进行进行订阅操作:
我们使用了 RxBus 中封装好的 doSubscribe()方法,设置事件类型为Integer。这次我们通过 Button 在子线程中发送一个事件:
在子线程发送了一个 Integer类型的2048,看效果:
3. 我们再测试下在广播中发送事件,订阅方式按照场景1的方式:
定义一个检测网络状态的广播:
在网络可用与不可用时发送提示事件,然后在 onCreate()方法 中注册广播:
我们手动打开、关闭网络,可以看到mTv上会显示网络状态的提示信息,看效果:
最后不要忘了在 onDestory() 中对广播进行取消注册,以及取消订阅。
protected void onDestroy() {
super.onDestroy(); unregisterReceiver(mReceiver);
RxBus.getInstance().unSubscribe(this);}
其它场景有兴趣的可自行测试哦!到这里 RxBus 的基本功能就实现了。
但是还不够完善,一般情况我们都是先订阅事件,然后发送事件,如果我们反过来,先发送了事件,再进行订阅操作,怎么保证发送的事件不丢失呢?也就是 EventBus中的StickyEven功能。
其实通过 RxJava 实现类似的功能很简单,Subject 有一个 子类BehaviorSubject,在被订阅之前,它可以缓存最近一个发送给它的事件,当被订阅后,它会立刻将缓存事件发送给订阅者,这样就解决了我们之前的疑问。RxBus 需要做的修改很简单:
private RxBus() {
mSubject = new SerializedSubject<>(BehaviorSubject.create());
}
但是有一点需要注意 BehaviorSubject只能缓存最近的一个事件,如果有多个事件怎么办?对 RxJava 来说都不是事,Subject 还有一个 子类ReplaySubject,在被订阅之前,它可以缓存多个发送给它的事件,在被订阅后会发送所有事件给订阅者,相信如何修改 RxBus 已经很明显了。
有兴趣的话可以下载源码测试:
http://download.csdn.net/detail/shehuan320_/9593121
最后推荐一些 RxJava 的学习资源:
RxJava入门
http://blog.csdn.net/column/details/rxjava.html?&page=1
给 Android 开发者的 RxJava 详解
http://gank.io/post/560e15be2dca930e00da1083
如果你有好的技术文章想和大家分享,欢迎向我的公众号投稿,投稿具体细节请在公众号主页点击“投稿”菜单查看。
欢迎长按下图 -> 识别图中二维码或者扫一扫关注我的公众号: