Rxjava这么强大的类库怎么可能没有多线程切换呢?
其中observeOn()与subscribeOn()就是实现这样的作用的。本文主要讲解observeOn()与subscribeOn()的用法,不去探究其中的原理。
0. 默认情况
在默认情况下,其不做任何线程处理,Observable和Observer处于同一线程,没有做任何线程切换,依次执行,如下图所示:
可以写一个demo测试之:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source.subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("TAG", "Received " + integer + " on thread:" + Thread.currentThread().getName()); } });
1. subscribeOn()的作用
该方法是指明数据产生的线程,即Observable发射数据所在的线程,如果之后不做任何处理,操作符operator(如map,flatmap等)也在subscribeOn指定的线程做数据处理。
多次使用subscribeOn()并不能频繁地切换线程,只有距离数据源最近的一个subscribeOn()唯一确定数据源发射数据的线程。如代码所示:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } }); }
其中只有subscribeOn(Schedulers.computation())对数据源source起作用,该source在Schedulers.computation()指定的线程发射数据。如果后面没有使用observeOn(),操作符operator都会在Schedulers.computation()所指定的线程做数据变换。
2. observeOn()的作用
在Android开发中,我们经常面临这样的场景,在工作者线程中产生数据,在UI线程中更新相应的View,subscribeOn()指定了数据发射的线程,但我们更新UI的操作,不可能在发射数据的线程运行,这会造成ANR的问题。此时就必须通过observeOn()方法做线程的切换:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source.map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.i("TAG", "call: " + Thread.currentThread().getName()); return s.length(); } }).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.computation()). subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } });
上述代码的运行结果如下:
TAG: call: RxComputationScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1 TAG: call: RxComputationScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1 TAG: call: RxComputationScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1
可以看到,在observeOn()之前的操作,都运行在subscribeOn(Schedulers.computation())指定的线程,即RxComputationScheduler-1线程;而使用了observeOn()之后,在它之后的操作都
运行在了observeOn(Schedulers.newThread())指定的线程。
所以,给出一个结论observeOn()只对其之后的操作起作用;observeOn()可以使用多次,每次使用对其之后的operator起作用,对之前的操作没有影响。
上图很好地诠释了ObserveOn的作用。
3. backpressure的问题
由于ObserveOn的作用,数据流在多个线程中不断的传输,可能存在速度不匹配的情况。如下图所示,当底部的数据流发射速度快于顶部数据流的处理速度,若产生异常,可能导致一部分数据未被顶部的subscriber处理。
废话太多,说不清楚,看下代码吧:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source.map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.i("TAG", "call: " + Thread.currentThread().getName()); if (s.equals("Gamma")) throw new RuntimeException(); return s.length(); } }) .doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { Log.i("TAG", "doOnError: " + Thread.currentThread().getName()); } }) .observeOn(Schedulers.newThread()) .subscribeOn(Schedulers.computation()) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.i("TAG", "onError: " + "on thread:" + Thread.currentThread().getName()); } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } }); }
看一下运行结果:
TAG: call: RxComputationScheduler-1 TAG: call: RxComputationScheduler-1 TAG: call: RxComputationScheduler-1 TAG: doOnError: RxComputationScheduler-1 TAG: onError: on thread:RxNewThreadScheduler-1
我们就可以看到,当发射的数据为Gamma时抛出异常,之前发射的数据"Alpha","Beta"还未被subsriber的onNext方法处理,这就是backpressure问题。
4. onErrorResumeNext
onErrorResumeNext是错误恢复处理方法,当我们数据链中某个操作符抛出异常,此时会中断整个数据链,但我们想尝试恢复一下,这时可以使用
onErrorResumeNext。比如Android在过于频繁登录时,系统会弹出一个dialog(弹窗),让用户输入验证码,该逻辑就可以放在onErrorResumeNext中处理。
我们先看一段代码:
Observable<String> source = Observable.just("Alpha","Beta","Gamma"); source.map(new Func1<String, Integer>() { @Override public Integer call(String s) { Log.i("TAG", "call: " + Thread.currentThread().getName()); if (s.equals("Gamma")) throw new RuntimeException(); return s.length(); } }) .observeOn(Schedulers.newThread()) .subscribeOn(Schedulers.computation()) .onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { return Observable.just(1000).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { Log.i("TAG", "call:onErrorResumeNext: " + Thread.currentThread().getName()); return integer; } }) .subscribeOn(Schedulers.computation()); } }) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.i("TAG", "onError: " + "on thread:" + Thread.currentThread().getName()); } @Override public void onNext(Integer integer) { Log.i("TAG", "onNext: " + "on thread:" + Thread.currentThread().getName()); } });
看上段代码中标红的部分,为什么在onErrorResumeNext可以再次使用subscribeOn(),我的猜测(并没有看源码)可能是该段代码产生了新的数据源,所以可以使用subsribeOn()指定数据源发射数据的线程。
它的运行结果:
TAG: call: RxComputationScheduler-1 TAG: call: RxComputationScheduler-1 TAG: call: RxComputationScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1 TAG: onNext: on thread:RxNewThreadScheduler-1 TAG: call:onErrorResumeNext: RxComputationScheduler-2 TAG: onNext: on thread:RxComputationScheduler-2
看运行结果在onErrorResumeNext方法中使用了subscribeOn(),线程切换到了RxComputationScheduler-2,在之后没有observeOn的情况下,最后一个onNext也运行在了RxComputationScheduler-2。很神奇!!!!!