一、本期要点
写博客就应该和鞋小说一样,时常更新,那今天主要讲的是 rxjava 的另一部分内容,线程切换,之前的文章讲的是一些基础的用法,那今天就讲一下 rxjava 中的另一个强大的功能,线程切换,当我们需要在 rxjava 中执行耗时代码的时候,线程切换就很重要了,那今天我们就领略一下线程切换的魅力。
二、Schedulers
Schedulers.newThread();
Schedulers.from(new Executor() {
@Override
public void execute(@android.support.annotation.NonNull Runnable command) {
}
})
Schedulers.computation();
Schedulers.single();
Schedulers.io();
这面一共有五种,我一一说明一下每一种的作用,
三、subscribeOn
我们可以看到这节的标题是 subscribeOn,我想你从这个命名就应该可以知道这个 API 切换的是上游被观察者发送数据的的线程,其实这么说是不准切的,应该是离他最近的 Observable 的执行线程,为什么这么说呢,因为在我们调用 subscriberOn 方法的时候,我们会生成一个 ObservableSubscribeOn 这个类,在这个类中包含了一个 Observable,所以,我们切换的只是它包含的 Observable 中的代码,当我们调用 map 或者 flatmap 等一些操作符的时候都会新生成一个代理的 Observable,也就是说我们可以在调用 map 或者 flatmap 之后调用 subscribeOn 重新切换线程,这部分可能有些抽象,那之后我会在源码分析中详细的介绍一下这个的过程
下面我们看一下我们线程切换的代码:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("http://fj.ikafan.com/attachment/forum/201510/16/150108pf2fgjw7i33bq22w.jpg");
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
// 进行接口请求
//下载图片
return 下载的图片;
}
}).subscribeOn(Schedulers.newThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Bitmap bitmap) {
//setBitmap
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
可以看到我们切换了两次线程,当我们第一次调用 subscribeOn 的时候切换的是 subscribe 中的 e.next 这句代码的执行线程,那第二次调用的时候切换的是 map 中的 apply 中的代码的执行线程,所以我们准确的说,subscribeOn 切换的是离它最近的 Observable 的线程,这个地方有个东西需要注意,就是当我们多次调用 subscriberOn 的时候只有第一次生效之所以是这个原因我会在源码分析中写出
四、observeOn
当你看到这个 API 的时候你就应该能想到这个切换的是我们接受数据的观察者的线程,其实这个还是比较简单的,就是切换 Observer 中的代码,下面看一下代码:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("http://fj.ikafan.com/attachment/forum/201510/16/150108pf2fgjw7i33bq22w.jpg");
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
// 进行接口请求
//下载图片
return 下载的图片;
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Bitmap bitmap) {
//setBitmap
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
可以看到,当我们调用 observeOn 之后,除了 onSubscribe 这个方法不是在切换的线程中执行的,其余三个方法都是在切换之后的线程中执行的,至于原因会在后面文章的源码分析中找到答案
五、与 retrofit 的结合
@GET("/token")
public Observable<String> getToken();
public class Network {
private static GankApi gankApi;
private static OkHttpClient okHttpClient = new OkHttpClient();
public static GankApi getGankApi() {
if (gankApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(okHttpClient)
.baseUrl("http://gank.io/api/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
gankApi = retrofit.create(GankApi.class);
}
return gankApi;
}
}
Network.getGankApi()
.getToken().subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});