上一篇中,我们对最基本的使用方法进行了源码分析,可是这远远不能体现rxJava的强大,我们没有看到线程转换,没有看到强大的操作符,还记得我们想通过分析RxJava2源码所达到的目的么?有5个。
我们的目的:
- 知道源头(Observable)是如何将数据发送出去的。
- 知道终点(Observer)是如何接收到数据的。
- 何时将源头和终点关联起来的
- 知道线程调度是怎么实现的
- 知道操作符是怎么实现的
通过上一篇文章的介绍,已经解决了目的中的1、2、3,这篇文章中先分析最常使用的map操作符。
map的使用
老规矩,先附上使用方法,通过它来进行源码的分析。
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1111");
e.onComplete();
}
});
observable.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG,value+"");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
这里先简单介绍一下map操作符的作用,它将我们从源头传入的类型转为了另一种类型,这个很简单,不多解释,直接看源码分析。
map源码分析
create方法上一篇文章已经分析过,直接看map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//这里是判null操作,我们不用管
ObjectHelper.requireNonNull(mapper, "mapper is null");
//返回值中,RxJavaPlugins.onAssembly()是一个hook方法,暂时不管,需要关注的是ObservableMap。
//记住,他是一个observable
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
注释里解释的很清楚了,这里需要注意两点,第一,作为参数传进的function
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
它只有一个方法apply,就是将T转换成R,这个方法对于整个map进行类型的转换起到至关重要的作用。
第二,记住返回值中的ObservableMap它是一个observable。记住这两点,接着往下看new ObservableMap()。
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//存储source,他也是一个Observable
super(source);
//存储function
this.function = function;
}
接着就是subscribe方法,
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//真正subscribe的地方,这里的Observable取决于之前调
//用了什么方法,如果是调用了map方法,这里就是ObservableMap
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
通过上面的分析我们知道返回的observable对象是ObservableMap。现在很明了了,看ObservableMap。
public void subscribeActual(Observer<? super U> t) {
//源头与终点产生联系的地方,在这个过程中运用了大量的装饰者模式
//ObservableMap与MapObserver分别是对Observable与Observer的装饰
source.subscribe(new MapObserver<T, U>(t, function));
}
在这里t是我们创建的那个observer,然后通过new MapObserver传入到MapObserver中。在mapObserver中重写了onnext(),
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//hook方法忽略,重点关注mapper.apply(t),在这里讲我们传入的t转为了v.
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
这里的actual就是传进来的由我们new出来的observer,在里面通过v=mapper.apply(t)进行了类型的转换,如果用我们的例子来说就是integer=mapper.apply(String)。这是我们observer的onNext的参数由最开始的t转换成了v,即actual.onNext(v),至此整个map的流程结束。
总结
其实map操作符很简单,最核心的方法就是function中的apply方法进行的类型转换。其实无论是map操作符,还是线程转换,因为都是对当前的observable进行包装装饰,我们就得把握到,我们是在哪个Observable中执行的subscribeActual方法的,这点很重要。
线程转换之subscribeOn的使用
使用subscribeOn我们就只增加一行代码,指定一个线程,在下面的例子中将线程转换到了io线程中,看到这里我们应当有一个疑问,这个io线程影响的范围到底是多大,接下来的分析中你们会找到答案。
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1111");
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Schedulers.io()相信大家都能理解,就是一个io线程。
重点看subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
通过前面的分析相信已经很清楚这两行代码了,一个是判null的,一个是hook,在我们需要new ObservableSubscribeOn(this, scheduler))这行代码中一定也是一个observable的装饰类,里面保存了我们的observable和scheduler信息。
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
果然,就是这样。通过之前对map以及第一篇文章的分析,我们知道subscribe方法后,最终会执行到我们这个ObservableSubscribeOn的subscribeActual方法,接着分析
public void subscribeActual(final Observer<? super T> s) {
//主要是将我们创建的Observer保存起来,其实也就是由SubscribeOnObserver来装饰我们的observer进行操作
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//SubscribeOnObserver是一个Disposable,这里传入的是Disposable
s.onSubscribe(parent);
//SubscribeTask继承自Runnable,在new SubscribeTask(parent)将SubscribeOnObserver传入,在在
//run方法中进行注册的操作,即source.subscribe(parent)的操作,此时我们的注册以后的操作就会在subsrcribeon指定线程执行
//在这里需要特别注意,我们虽然先创建的observer.create方法并进行一系列重写,可是由源码分析,那些
// 方法是在我们调用subscribe后才开始执行的,也就是说订阅流程是由下游向上游传递的
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
代码中的注释粗略说完了整个过程,也许有点难以理解,我们还是一行行看下去分析,第一行代码很简单,就像注释所说那样,没什么好分析的,
SubscribeOnObserver(Observer<? super T> actual) {
//我们传入的observer保存起来
this.actual = actual;
//用于保存上游的Disposable,以便在自身dispose时,连同上游一起dispose
this.s = new AtomicReference<Disposable>();
}
第二行代码很简单,重点是第三行,首先看new SubscribeTask(parent),
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
记住这里的run方法,这里的source是发射源,parent是我们创建的observer的装饰类,可以视为终点。从这以后重写的subscribe方法才开始执行,都是在subscribeOn指定的线程中
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1111");
e.onComplete();
}
接着以后的操作就会在我们传入的scheduler中执行。我们继续看scheduler.scheduleDirect(new SubscribeTask(parent))
public Disposable scheduleDirect(@NonNull Runnable run) {
//由方法可知,立刻执行线程
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
依据名字和参数我们也能看出来,就是立即执行这个任务。
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
看这个DisposeTask task = new DisposeTask(decoratedRun, w);
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
}
}
在它的run方法中执行了我们刚刚的在SubscribeTask的run方法。接下来就不用在深入代码细节继续分析了。
最后,有一个在我们刚接触rxjava最容易忽略的问题是,整个订阅的流程是从下游到上游的,虽然e.onXXX方法写在了前面,可是当source.subscribe(parent)没有调动前这些方法都没有执行,所以当subscribe那一刻,才会有数据的发送。所以我们的e.onXXX方法是执行在subscribeOn指定的线程当中的。至此,整个subscribeOn的流程结束。
总结
- 首先切换了线程,再执行Observale.subscribe(observer)方法
- 我们知道整个事件调用的过程是从下游到上游,根据上一篇文章知道,上游发送数据仅仅是调用下游观察者对应的onXXX()方法而已,所以此时操作是在切换后的线程中进行。
- 最后是一个问题,为什么多次调用subscribeOn总会以第一次为准,或者说是以离数据源最近的那次为准,这个将在最后与observeOn的比较中来进行解释。
线程转换之observerOn的使用
使用observerOn只需要实现一行代码即可
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG,"currentThread="+Thread.currentThread().getName());
e.onNext("11");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
//添加这个方法
.observeOn(AndroidSchedulers.mainThread());
这个用法也很简单,没什么好讲的,接下来老规矩,随着方法进入源码分析
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
这里的Scheduler就是我们传入的线程
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
经过前几次的分析,这几行代码的套路我们已经很熟悉了,重点关注hook里的new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize))
在new方法中也是保存了我们需要用到的一些参数,observable以及scheduler等。通过前几个操作符的分析,相信我们已经很清楚注册的流程最终会走到ObservableObserveOn的subscribeActual方法中
protected void subscribeActual(Observer<? super T> observer) {
//false
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//ObservableSource,在这里由ObserveOnObserver来订阅上游数
//据源,这样当数据从上游push下来,会由ObserveOnObserver对
//应的onXXX()处理。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
注释已经解释的很清楚了,接下来看ObserveOnObserver对应的方法。首先看它里面比较重要的几个成员变量
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
//真实的observer
final Observer<? super T> actual;
//我们传入线程对应的worker
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
//存放上游发送数据的队列
SimpleQueue<T> queue;
接着看onXXX方法,我们以onNext为例
public void onNext(T t) {
if (done) {
return;
}
//默认是同步的
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
因为默认是同步的,所以会执行queue.offer(t)
这行代码将上游发送的数据存入队列中。schedule()
会执行worker的schedule()方法。注意,这时候我们仍然在默认线程中,还未到observerOn方法设置的线程中,且我们真实observer的onNext方法还未调用,我们这个ObserveOnObserver它也是一个Runnable,所以我们可以看看他的run方法
public void run() {
//outputFused默认是false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
所以接下来看drainNormal方法
void drainNormal() {
int missed = 1;
//存储消息的队列
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//从队列中取出上游传来的数据
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//在这里执行了我们new出来的真实observer的onnext方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
重要方法我已经进行了注释。接下来的问题是这个run方法是在什么时候执行,还记得我们onnext方法的最后一句么,会执行会执行worker的schedule(),一般来说就会在这个方法中执行,在我们写的案例中,传入的是AndroidSchedulers.mainThread()
对应的Scheduler是HandlerScheduler,我们看他的createWorker方法,
public Worker createWorker() {
return new HandlerWorker(handler);
}
创建的是HandlerWorker,所以看它的schedule方法,会有这样的一行ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
而这个ScheduledRunnable的run方法
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
执行了我们ObserveOnObserver中的run,至此整个observerOn的流程结束。
subscribeOn与ObserveOn的比较
我们知道subscribeOn只有第一次调用起效果,ObserveOn每一次调用都起效果。那是因为每一次调用subscribeOn其实就是在包装一次observer与observable,无论包装多少次,都会以最里面一层也就是第一次调用subscribeOn那一层为主,所以只有第一次起效果。而ObserveOn是在subscribe后包装了observer,在observer的onXXX方法中进行的线程转换,所以每一次调用都有作用。下面两幅图其实就能很直观体现出来。
相信这两幅图已经表示的比较清晰了。所以在使用上subscribeOn只使用一次,因为只有第一次才生效,observeOn可以调用多次,且每次都生效。