Rxbus的使用

时间:2023-03-08 16:08:01

Rxbus是一种模式,在RxJava中

一、添加依赖

 compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'

二、包装 --- 创建类

 public class RxBus {
private static volatile RxBus sRxBus;
// 主题
private final Subject<Object, Object> mBus; // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
mBus = new SerializedSubject<>(PublishSubject.create());
} // 单例RxBus
public static RxBus getInstance() {
if (sRxBus == null) {
synchronized (RxBus.class) {
if (sRxBus == null) {
sRxBus = new RxBus();
}
}
}
return sRxBus;
} // 提供了一个新的事件
public void post(Object o) {
mBus.onNext(o);
} // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObservable(Class<T> eventType) {
return mBus.ofType(eventType);
// ofType = filter + cast
// return mBus.filter(new Func1<Object, Boolean>() {
// @Override
// public Boolean call(Object o) {
// return eventType.isInstance(o);
// }
// }) .cast(eventType);
}
}

说明:

1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。
2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
3、ofType操作符只发射指定类型的数据,其内部就是filter+cast

三、创建发送的事件类

  发送 toObservable(Class<T> eventType)  事件.class

 public class ChannelItemMoveEvent {
private int fromPosition;
private int toPosition; public int getFromPosition() {
return fromPosition;
} public int getToPosition() {
return toPosition;
} public ChannelItemMoveEvent(int fromPosition, int toPosition) {
this.fromPosition = fromPosition;
this.toPosition = toPosition; }
}

四、发送事件

 RxBus.getInstance().post(new ChannelItemMoveEvent(fromPosition, toPosition));

五、接收事件

 mSubscription = RxBus.getInstance().toObservable(ChannelItemMoveEvent.class)
.subscribe(new Action1<ChannelItemMoveEvent>() {
@Override
public void call(ChannelItemMoveEvent channelItemMoveEvent) {
int fromPosition = channelItemMoveEvent.getFromPosition();
int toPosition = channelItemMoveEvent.getToPosition(); }
});

六、取消订阅

一般会在基类有一个成员变量 mSubscription,并在OnDestroy的时机取消订阅。

  @Override
protected void onDestroy() {
if (mSubscription!= null && !mSubscription.isUnsubscribed()) {
mSubscription.unsubscribe();
}
}