RxBus的小试牛刀

时间:2021-01-25 03:30:48

前言

RxBus并不是框架,而是一种类似观察者的模式。类似效果的还有EventBus和otto,
但是如果项目中使用了Rx响应式编程,那么用RxBus这种事件通信可以加深对响应式编程的熟练,以及不需要再引用任何库,它就是一个简单的类:

public class RxBus {
private static RxBus sRxBus;
private final Subject<Object, Object> mBus;

// 注释1:PublishSubject
private RxBus() {
mBus = new SerializedSubject<>(PublishSubject.create());
} //注释2

// 单例
public static RxBus getInstance() {
if (sRxBus == null) {
synchronized (RxBus.class) {
if (sRxBus == null) {
sRxBus = new RxBus();
}
}
}
return sRxBus;
}

// 提供了一个新的事件
public void post(Object o) {
mBus.onNext(o);
}

// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObservable(Class<T> eventType) {
return mBus.ofType(eventType);
}
}

看一下注释1:

PublishSubject 分析

查看源码,有PublishSubject的示例:

    PublishSubject<Object> subject = PublishSubject.create();
//observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
//observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();

PublishSubject和 通常用的 Observable.subscribe(observer)或者Observable.subscribe(subscriber)不一样,Observable.subscribe会触发事件,
而PublishSubject.subscribe不会马上触发事件,而是onNext的时候才会触发。
所以订阅的subscriber或者observer 只能收到 它后面的onNext的事件。

看一下注释2:

SerializedSubject 分析

查看源码,有SerializedSubject 有这么一段注释:
Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads. When you use an ordinary {@link Subject} as a {@link Subscriber}, you must take care not to call its {@link Subscriber#onNext} method (or its other {@code on} methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resulting Subject.
意思就是说 你用普通的Subject 发射onNext的时候要注意不能在不同的线程发射,会引发线程安全的。

通常我们用 mySafeSubject = new SerializedSubject( myUnsafeSubject ); 将普通的Subject包裹进SerializedSubject 可以解决线程安全的问题。

继续看SerializedSubject 的源码,看一下它是怎么解决线程安全的问题。

public class SerializedSubject<T, R> extends Subject<T, R> {
private final SerializedObserver<T> observer;
private final Subject<T, R> actual;

public SerializedSubject(final Subject<T, R> actual) {
super(new OnSubscribe<R>() {

@Override
public void call(Subscriber<? super R> child) {
actual.unsafeSubscribe(child);
}

});
this.actual = actual;
this.observer = new SerializedObserver<T>(actual);
}

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}
//原来SerializedSubject把onCompleted、onError、onNext都交给了SerializedObserver去处理。
@Override
public void onNext(T t) {
observer.onNext(t);
}

@Override
public boolean hasObservers() {
return actual.hasObservers();
}
}

SerializedSubject把onNext交给SerializedObserver去处理了,接着看看SerializedObserver的源码。

SerializedObserver 分析

还是看看源码的注释:
When multiple threads are emitting and/or notifying they will be serialized by:
Allowing only one thread at a time to emit
Adding notifications to a queue if another thread is already emitting
Not holding any locks or blocking any threads while emitting
有道词典翻译加个人理解:只允许onNext在同一时间只能在一个线程发射,
如果已经存在一个线程发射onNext,那么另一个发射onNext将添加到队列中,
在发射onNext过程中,不会持有任何锁和阻塞线程。

贴出SerializedObserver.onNext代码

    @Override
public void onNext(T t) {
if (terminated) {
return;
}
synchronized (this) {
if (terminated) {
return;
}
//emitting为true,说明有线程已经正在发射onNext中
if (emitting) {
FastList list = queue;
if (list == null) {
list = new FastList();
queue = list;
}
//把现在准备要发射的事件加到队列中,然后直接返回
list.add(nl.next(t));
return;
}
emitting = true;
}
try {
//没有线程在发射onNext就直接 onNext
actual.onNext(t);
} catch (Throwable e) {
terminated = true;
Exceptions.throwOrReport(e, actual, t);
return;
}
for (;;) {
for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
FastList list;
synchronized (this) {
list = queue;
//如果 队列 是空的,直接返回,发射ing 也置为false
if (list == null) {
emitting = false;
return;
}
queue = null;
}
for (Object o : list.array) {
if (o == null) {
break;
}
try {
//下面分析
//事件为 COMPLETED或者ERROR ,才终止
if (nl.accept(actual, o)) {
terminated = true;
return;
}
} catch (Throwable e) {
terminated = true;
Exceptions.throwIfFatal(e);
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
return;
}
}
}
}
}

上面看到了nl.accept(actual, o),跟进去。 看到只有事件为 COMPLETED或者ERROR,才返回true,如果为ON_NEXT,依旧 死循环 for (;;)

public boolean accept(Observer<? super T> o, Object n) {
if (n == ON_COMPLETED_SENTINEL) {
o.onCompleted();
return true;
} else if (n == ON_NEXT_NULL_SENTINEL) {
o.onNext(null);
return false;
} else if (n != null) {
if (n.getClass() == OnErrorSentinel.class) {
o.onError(((OnErrorSentinel) n).e);
return true;
}
o.onNext((T) n);
return false;
} else {
throw new IllegalArgumentException("The lite notification can not be null");
}
}

回到RxBus,发射事件

    public void post(Object o) {
//bus就是PublishSubject,所以随时发射onNext都可以
mBus.onNext(o);
}

回到RxBus,订阅事件

    public <T> Observable<T> toObservable(Class<T> eventType) {
return mBus.ofType(eventType);
}

ofType = filter+cast

public final <R> Observable<R> ofType(final Class<R> klass) {
return filter(InternalObservableUtils.isInstanceOf(klass)).cast(klass);
}

filter 根据传进来的类过滤,post的时候只发射给对应的订阅者
cast 类型转换

结束

本文是分析源码以及查看资料,然后写的,有哪些地方不对的欢迎指出。
哈哈O(∩_∩)O哈哈~。