RxJava作为目前一款超火的框架,它便捷的线程切换一直被人们津津乐道,本文从源码的角度,来对RxJava的线程模型做一次深入理解。(注:本文的多处代码都并非原本的RxJava的源码,而是用来说明逻辑的伪代码)
入手体验
RxJava 中切换线程非常简单,例如最常见的异步线程处理,主线程回调的模型,可以很优雅的用如下代码来做处理:
Observable.just("magic")
.map(str -> doExpensiveWork(str))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(obj -> System.out.print(String.valueOf(obj)));
如上,subscribeOn(Schedulers.io())
保证了doExpensiveWork
函数发生在io线程,observeOn(AndroidSchedulers.mainThread())
保证了subscribe
回调发生在Android 的主线程。所以,这自然而然的引出了本文的关键点,subscribeOn
与observeOn
到底区别在哪里?
流程浅析
要想回答上面的问题,我们首先需要对RxJava的流程有大体了解,一个Observable从产生,到最终执行subscribe,中间可以经历n个变换,每次变换会产生一个新的Observable,就像奥运开幕的传递火炬一样,每次火炬都会传递到下一个人,最终点燃圣火的是最后一个火炬手,即最终执行subscribe操作的是最后一个Observable,所以,每个Observable之间必须有联系,这种关系在代码中的体现就是,每个变换后的Observable都会持有上一个Observable 中OnSubscribe对象的引用(Observable.create 函数所需的参数),最终 Observable的subscribe函数中的关键代码是这一句:
observable.onSubscribe.call(subscriber)
这个observable就是最后一个变换后的observable,那这个onSubscribe对象是谁呢?如何一个observable没有经过任何变换,直接执行了subscribe,当然就是我们在create中传入的onSubscribe, 但如果中间经过map、reduce等变换,这个onSubscribe显然就应该是创建变换后的observable传入的参数,大部分变换最终都交由lift函数:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
所以,上文所提到的onSubscribe对象应该是OnSubscribeLift的实例,而这个OnSubscribeLift所接收的两个参数,一个是前文提到的,上一个Observable中的OnSubscribe对象,而operator则是每种变换的一个抽象接口。再来看这个OnSubscribeLift对象的call方法:
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = operator.call(o);
parent.call(st);
}
operator与parent就是前文提到的两个参数,可见,operator接口会拥有call方法,接收一个Subscriber, 并返回一个新的Subscriber对象,而接下来的parent.call(st)
是回调上一层observable的onSubscribe的call方法,这样如此继续,一直到一个onSubscribe截止。这样我们首先理清了一条线路,就是从最后一个observable的subscribe后,OnSubscribe调用的顺序是从后向前的。
这就带来了另外一个疑问,从上面的代码可以看到,在执行parent.call(st)
之前已经执行了operator.call(o)
方法,如果call方法里就把变换的操作执行了的话,那似乎变换也会是从后向前传递的呀?所以这个operator.call
方法绝对不是我们想象的那么简单。这里以map操作符为例,看源码:
public Subscriber<? super T> call(final Subscriber<? super R> s) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
}
这里果然没有执行变换操作,而是生成一个MapSubscriber对象,这里需要注意MapSubscriber构造函数的两个参数,transformer是真正要执行变换的Func1对象,这很好理解,那对于o这个Subscriber是哪一个呢?什么意思?举个