Rxjava2从入门到源码(二)

时间:2023-01-20 17:44:50

一、本期要点

  
    

写博客就应该和鞋小说一样,时常更新,那今天主要讲的是 rxjava 的另一部分内容,线程切换,之前的文章讲的是一些基础的用法,那今天就讲一下 rxjava 中的另一个强大的功能,线程切换,当我们需要在 rxjava 中执行耗时代码的时候,线程切换就很重要了,那今天我们就领略一下线程切换的魅力。


   

二、Schedulers



 
在讲线程调度之前,我们先来说一下  schedulers ,那这个东西是什么呢,在 rxjava 中这个东西叫做调度器,也就是调度线程的,在这个 schedulers 中有很多线程可以切换列举一下:
  
 
Schedulers.newThread();
Schedulers.from(new Executor() {
@Override
public void execute(@android.support.annotation.NonNull Runnable command) {

}
})
Schedulers.computation();
Schedulers.single();
Schedulers.io();

这面一共有五种,我一一说明一下每一种的作用,
Schedulers.newThread :这个其实不用说了,很明显是创建一个新的线程。
 
Schedulers.from:这个方法其实是需要我们自己去创建一个 Exector 对象,然后每次从这里面获取执行的线程

Schedulers.computatuion:这个线程其实是用于 CPU 密集计算的,例如图形计算等

Schedulers.io : 这个就很明显了,用于我们的 io 操作

Schedulers.single :这个线程的用法就是只能执行一次的操作,可以用这个线程

那我们可以切换的线程已经介绍的差不多了,那下面我们就讲讲线程是如何在观察者和被观察者之间切换的



   

三、subscribeOn 



  

我们可以看到这节的标题是 subscribeOn,我想你从这个命名就应该可以知道这个 API 切换的是上游被观察者发送数据的的线程,其实这么说是不准切的,应该是离他最近的 Observable 的执行线程,为什么这么说呢,因为在我们调用 subscriberOn 方法的时候,我们会生成一个 ObservableSubscribeOn 这个类,在这个类中包含了一个 Observable,所以,我们切换的只是它包含的 Observable 中的代码,当我们调用 map 或者 flatmap 等一些操作符的时候都会新生成一个代理的 Observable,也就是说我们可以在调用 map 或者 flatmap 之后调用 subscribeOn 重新切换线程,这部分可能有些抽象,那之后我会在源码分析中详细的介绍一下这个的过程


下面我们看一下我们线程切换的代码:


  

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("http://fj.ikafan.com/attachment/forum/201510/16/150108pf2fgjw7i33bq22w.jpg");
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
// 进行接口请求
//下载图片
return 下载的图片;
}
}).subscribeOn(Schedulers.newThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {

}

@Override
public void onNext(@NonNull Bitmap bitmap) {
//setBitmap
}

@Override
public void onError(@NonNull Throwable e) {

}

@Override
public void onComplete() {

}
});



可以看到我们切换了两次线程,当我们第一次调用 subscribeOn 的时候切换的是 subscribe 中的 e.next 这句代码的执行线程,那第二次调用的时候切换的是 map 中的 apply 中的代码的执行线程,所以我们准确的说,subscribeOn 切换的是离它最近的 Observable 的线程,这个地方有个东西需要注意,就是当我们多次调用 subscriberOn 的时候只有第一次生效之所以是这个原因我会在源码分析中写出



四、observeOn




当你看到这个 API 的时候你就应该能想到这个切换的是我们接受数据的观察者的线程,其实这个还是比较简单的,就是切换 Observer 中的代码,下面看一下代码:

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("http://fj.ikafan.com/attachment/forum/201510/16/150108pf2fgjw7i33bq22w.jpg");
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
// 进行接口请求
//下载图片
return 下载的图片;
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {

}

@Override
public void onNext(@NonNull Bitmap bitmap) {
//setBitmap
}

@Override
public void onError(@NonNull Throwable e) {

}

@Override
public void onComplete() {

}
});


可以看到,当我们调用 observeOn 之后,除了 onSubscribe 这个方法不是在切换的线程中执行的,其余三个方法都是在切换之后的线程中执行的,至于原因会在后面文章的源码分析中找到答案




五、与 retrofit 的结合





总所周知 rxjava + retrofit + okhttp 在我们当前的开发中有着不可代替的位置,使用着三个框架可以输出成吨伤害,那它们是怎么结合的呢,其实很简单,真的很简单,我们之前用 okhttp + retrofit 的时候当接口有数据返回的时候我们只需要在 Callback 中去处理一下设置 UI 就可以了,那 retrofit 为了兼容 rxjava 就提供了返回值是 Observable 的请求方法,那当我们请求的时候,返回一个带数据的 Observable 这个对于我们构建观察者模式就非常方便了,我们看一下代码就会很好理解了:
  
@GET("/token")
public Observable<String> getToken();
public class Network {
private static GankApi gankApi;
private static OkHttpClient okHttpClient = new OkHttpClient();

public static GankApi getGankApi() {
if (gankApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(okHttpClient)
.baseUrl("http://gank.io/api/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
gankApi = retrofit.create(GankApi.class);
}
return gankApi;
}
}
Network.getGankApi()
.getToken().subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {

}

@Override
public void onNext(@NonNull String s) {

}

@Override
public void onError(@NonNull Throwable e) {

}

@Override
public void onComplete() {

}
});



从上面的代码中可以看出,我们的 retrofit 对象返回的是 Observable<string>  而且数据都在这个 Observable 中,这样我就会很好处理的,所以这就是 retrofit 与 rxjava 的无缝连接,其实我们主要记住一个思想就可以了,就是把我们的 Callback 换成 Observable 就可以了



总结:

这篇文章主要讲的是线程切换和与 retrofit 结合,下一篇文章本来想讲背压的,但是背压的东西实在是太多了,等之后有机会再去分享背压的知识吧,后面的文章主要就是分析一下主要流程的源码,敬请期待
今天的内容就这么多,虽然内容有写不多,但是如果完全掌握还是需要一定时间的,所以一定要多找几个例子实践一下,这样记忆就会非常深刻。本文如有什么错误欢迎指出,共同进步