RxJava 学习笔记(四)

时间:2022-07-07 19:37:33

1.线程控制Scheduler(二)

给 Android 开发者的 RxJava 详解

1)Scheduler的API(二)

前面讲到了,可以利用 subscribeOn()结合 observeOn()来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap()等变换方法后,n能不能多切换几次线程?

答案是:能。因为observeOn() 指定的是 Subscriber 的线程,而这个Subscriber并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe()参数中的 Subscriber,而是observeOn()执行时的当前 Observable所对应的 Subscriber,即它的直接下级Subscriber 。换句话说,observeOn()指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()即可。上代码:

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定

如上,通过 observeOn()的多次调用,程序实现了线程的多次切换。

不过,不同于 observeOn()subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?

这个问题先放着,我们还是从 RxJava线程控制的原理说起吧。

2)Scheduler的原理(二)

下面这个是 1.1.0的 版本:

public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}

可以看出其实 subscribeOn()Observable()的内部实现,也是用的lift()

下面这个 是1.1.6的版本:

public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

具体看图(不同颜色的箭头表示不同的线程):

subscribeOn()原理图:
                                      RxJava 学习笔记(四)

observeOn()原理图:
                                      RxJava 学习笔记(四)

从图中可以看出,subscribeOn()observeOn()都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn() 的线程切换发生在OnSubscribe中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn()的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级Subscriber 发送事件时,因此 observeOn()控制的是它后面的线程。

(1) subscribeOn()

版本为1.1.01.1.6的 内部源码实现不同

compile 'io.reactivex:rxjava:1.1.0'
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.6'
compile 'io.reactivex:rxandroid:1.2.1'
❶ 这边是版本1.1.0的源码

首先判断当前observable是不是ScalarSynchronousObservable类型的,如果是则会直接执行scalarScheduleOn方法,如果不是,则会通过nest()方法创建一个ScalarSynchronousObservable并把当前的Observable包装进去,并且lift一个OperatorSubscribeOn进去。

public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}

return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

subscribeOn 就是把 一个Observable转为 另一个 Observable, 那

一步步分析:

nest()做了什么呢?看下面源码可以知道,他创建了一个ScalarSynchronousObservable

public final Observable<Observable<T>> nest() {
return just(this);
}

public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}

OperatorSubscribeOn做了什么? 看他构造方法可以知道他,保存了一个scheduler对象 ,注意看他的call方法的 ,它运行时的线程,在inner这个Worker上,于是它的运行线程已经被改了

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;

public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(final Observable<T> o) {
// inner.schedule 就是在这边进行线程切换,
inner.schedule(new Action0() {

@Override
public void call() {
final Thread t = Thread.currentThread();

o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {

@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
producer.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
producer.request(n);
}
});
}
}

});
}

});
}
});
}

};
}
}

但是 OperatorSubscribeOn 的call 方法在什么时候调用呢?那我们就要来看下lift(), 就是在hook.onLift(operator).call(o)这边进行调用的,


public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
// 就是在这边调用的哦!!!!切换了线程
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
});
}

❷ 这边是版本1.1.6的源码 建议看这个
 public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}

return create(new OperatorSubscribeOn<T>(this, scheduler)); // 这边是重点,就是这边修改了
}

Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2OnSubscribe_2(= OperatorSubscribeOn)

那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的引用,即下面代码的source(上面代码的this)。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);

inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};

source.unsafeSubscribe(s);
}
});
}
}

我们看到它实现OnSubscribe,并保存了原来的ObservableScheduler。创建一个用于在不同线程执行的Worker对象,然后worker调用schedule()去执行Observable1subscribe()

再来看subscribeOn中调用的create方法

public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
//hook的onCreate方法
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}

没做什么,就是把使用刚刚创建OperatorSubscribeOn作为参数,返回给方法链下一个。 那到这里我们就清晰了,其实调用了subscribeOn后,返回的Observable已经不是最开始创建那个。我们来总结一下,先把create创建的Observable称为Observable1,把它的OnSubscribe称为OnSubscribe1,把subscribeOn中创建的Observable称为Observable2,把它的OnSubscribe(OperatorSubscribeOn implements OnSubscribe)称为OnSubscribe2

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

在方法链最后调用了subscribe(),应该是调用了Observable2subscribe(),那这时候就会调用到OnSubscribe2call方法,call中的参数就我们在方法链中传进来的。call中会使用worker切换线程去执行,到最后才是调用了OnSubscribe1unsafeSubscribe方法,并把重新封装的Subscriber传过去。而unsafeSubscribe会调用OnSubscribe1call(),所以其他流程是不变的,只是subscribe()执行的线程改了。

那为什么多次调用subscribeOn是无效的呢?因为多调用一次,只是多了一层先把OnSubscribe,但其实会对Observable1subscribe()产生影响的是OnSubscribe2,其他的只会对它前面的产生影响。就好比下面这样

new Thread() {
@Override
public void run() {

new Thread() {
@Override
public void run() {
Log.i(TAG,"run");
}
}.start();

}
}.start();
(2) observeOn()

1.1.6版本源码

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

observeOn()这边用到的是lift()(这边的lift()1.1.0lift(),略有差别,但是本质call方法是一样的),在这边 创建了一个新的 Onsubscribe(=OnSubsribeLift)

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

final OnSubscribe<T> parent;

final Operator<? extends R, ? super T> operator;

public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}

@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
st.onStart();
parent.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
}

可以看到在lift中调用了OperatorObserveOncall,传进去Subscriber创建了一个新的Subscriber,再将其传入onSubscribe.call()中,说明了这个新的Subscriber.onNext(),会被调用,而且它肯定在里面还调用了原来的Subscriber.onNext()方法。 到OperatorObserveOn中就会发现,这个是新的SubscriberOperatorObserveOn的内部类ObserveOnSubscriber

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
....... }

void init() {
......
Subscriber<? super T> localChild = child;
......
localChild.add(recursiveScheduler);
localChild.add(this);
}

@Override
public void onStart() {
......
}

@Override
public void onNext(final T t) {
......
schedule();
}

@Override
public void onCompleted() {
......
schedule();
}

@Override
public void onError(final Throwable e) {
......
schedule();
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}

// only execute this from schedule()
@Override
public void call() {
......

for (;;) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}

......
localChild.onNext(localOn.getValue(v));
......
}
}

boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
......
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
......
return false;
}
}

我们看到onNextonCompletedonError中都调用了scheduleschedule中又调用了recursiveScheduler.schedule(this)recursiveScheduler就是根据我们设置的线程模式生成的Worker,在这里线程就切换到我们指定的那边了。然后又调用到this.call(),在call里面又调用了localChild.onNextonErroronCompleted,这个localChild就是原来的Subscriber,所以onNextonErroronCompleted就会在我们指定的线程调用了。

那为什么会对observeOn后面的mapflatMap起作用,因为map里面其实也是调用了lift去创建一个新的Subscriber,所以也就会在新的线程中执行。

为什么每次observeOn都会起作用,因为它跟下面这个差不多。

new Thread() {
@Override
public void run() {
map();
new Thread() {
@Override
public void run() {
map();
onNext();
}
}.start();

}
}.start();
(3) 延伸:doOnSubscribe()

然而,虽然超过一个的 subscribeOn()对事件处理的流程没有影响,但在流程之前却是可以利用的。

在前面讲Subscriber 的时候,提到过 SubscriberonStart()可以用作流程开始前的初始化。然而 onStart()由于在subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()被调用时的线程。这就导致如果onStart()中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe()将会在什么线程执行。

而与 Subscriber.onStart()相对应的,有一个方法Observable.doOnSubscribe()。它和 Subscriber.onStart()同样是在subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在doOnSubscribe()之后有 subscribeOn()的话,它将执行在离它最近的 subscribeOn()所指定的线程。

示例代码:

Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一个 subscribeOn(),就能指定准备工作的线程了。

参考
给 Android 开发者的 RxJava 详解
RxJava中的线程切换源码分析