RxJava零基础入门(三)

时间:2022-07-12 17:46:10

本文转载自http://www.jianshu.com/p/5c221c58e141

前言

这篇讲RxJava中强大的Scheduler调度器 ,就是因为它,RxJava才能极其简便的在线程中切换,接着再讲一讲一些常用的操作符。没看过前两篇的,可以去温习下前面的基础。
RxJava零基础入门(一)
RxJava零基础入门(二)

Scheduler

在讲常用操作符前,先看看Scheduler这个东西,名之为调度器,正因为有这个东西,让RxJava可以从主线程和子线程之间轻松切换,各个Scheduler的具体使用效果看以下表解释:

调度器类型 用途
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread() 此调度器为RxAndroid特有,顾名思义,运行在Android UI线程上

具体如何使用呢,比如从数据库读取数据更新到UI上,假设数据量很大,直接从主线程读取数据,会造成UI卡顿,以前我们常用AnsyTask或者Handler去处理避免出现这类问题,个人认为手写个AnsyTask还是挺麻烦的,但用RxJava就简单多了,线程的切换就两行代码。例如:

Observable.create(new Observable.OnSubscribe<Data>() {
@Override
public void call(Subscriber<? super Data> subscriber) {
Data data = getData();//从数据库获取
subscriber.onNext(data);
subscriber.onCompleted();
}})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Data>() {
@Override
public void call(Data data) {

//更新ui
}
});

上面的代码简单粗暴的解释一下,subscribeOn( )决定了发射数据在哪个调度器上执行,observeOn(AndroidSchedulers.mainThread())则指定数据接收发生在UI线程,简直不要太方便。
总之:RxJava为我们提供了2个方法来切换线程:subscribeOn( )observeOn()
两者的区别在于:

  • subscribeOn(): 指定subscribe()订阅所发生的线程,即call()执行的线程。或者叫做事件产生的线程。
  • observeOn():指定Observer所运行在的线程,即onNext()执行的线程。或者叫做事件消费的线程。

常用操作符

  • Map:最常用且最实用的操作符之一,将对象转换成另一个对象发射出去,应用范围非常广,如数据的转换,数据的预处理等。
    例一:数据类型转换,改变最终的接收的数据类型。假设传入本地图片路径,根据路径获取图片的Bitmap。
Observable.just(filePath).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String path) {

return getBitmapByPath(path);
}}).subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {

//获取到bitmap,显示
}});

例二:对数据进行预处理,最后得到理想型数据。实际开发过程中,从后台接口获取到的数据也许不符合我们想要的,这时候可以在获取过程中对得到的数据进行预处理(结合Retrofit)。

Observable.just("12345678").map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.substring(0,4);//只要前四位
}})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("mytag",s);
}});

先说明一下,为了方便理解,所以写的例子都比较简单,不要以为明明可以简单用if-else解决的事,没必要用这种方式去写,当你真正将这些操作符使用到数据处理中去的时候,你就会发现有多方便。

  • FlatMap : 和Map很像但又有所区别,Map只是转换发射的数据类型,而flatMap可以将原始Observable转换成另一个Observable。还是举例说明吧。假设要打印全国所有学校的名称,可以直接用Map:
    为了更清晰一点,先贴一下School类:
public class School {

private String name;
private List<Student> studentList;

public List<Student> getStudentList() {
return studentList;
}
public void setStudentList(List<Student> studentList) {
this.studentList = studentList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public static class Student{
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

接着用Map打印学校名称:

List<School> schoolList = new ArrayList<>();
Observable.from(schoolList).map(new Func1<School, String>() {
@Override
public String call(School school) {
return school.getName();
}}).subscribe(new Action1<String>() {
@Override
public void call(String schoolName) {
Log.i("mytag",schoolName);
}});

再进一步,打印学校所有学生的姓名,先考虑用Map实现,将所有School对象直接转成Student:

Observable.from(schoolList).map(new Func1<School, School.Student>() {
@Override
public School.Student call(School school) {
return school.getStudentList();
}}).subscribe(new Action1<School.Student>() {
@Override
public void call(School.Student student) {

Log.i("mytag",student.getName());
}});

看似可行,但事实上,这是一段错误的代码,细心的人就会发现错误的地方

@Override
public School.Student call(School school) {
return school.getStudentList(); //错误,Student 是一个对象,返回的却是一个list
}

所以用Map是无法实现直接打印学校的所有学生名字的,因为Map是一对一的关系,无法将单一的School对象转变成多个Student。前面说到,FlatMap可以改变原始Observable变成另外一个Observable,如果我们能利用from()操作符把school.getStudentList()变成另外一个Observable问题不就迎刃而解了吗,这时候就该FlatMap上场了,来看看它是怎么实现的:

Observable.from(schoolList).flatMap(new Func1<School, Observable<School.Student>>() {
@Override
public Observable<School.Student> call(School school) {

return Observable.from(school.getStudentList()); //关键,将学生列表以另外一个Observable发射出去

}}).subscribe(new Action1<School.Student>() {

@Override
public void call(School.Student student) {
Log.i("mytag",student.getName());
}});

值得注意的是,flatMap并不保证变换后事件的发送顺序,如果要保证其顺序,建议使用concatMap操作符
Map和FlatMap在我看来就像孪生兄弟一样,非常实用,实际开发中也我也经常使用,个人觉得要想上手RxJava,掌握这两个操作符必不可少。
Map 与 flatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:

1.flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象;
2.flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中;
3.flatMap 变换后产生的每一个Observable对象发送的事件,最后都汇入同一个Observable,进而发送给Subscriber回调;
4.map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样;
5.可以对一个Observable多次使用 map 和 flatMap;

  • Buffer: 缓存,可以设置缓存大小,缓存满后,以list的方式将数据发送出去;例:
Observable.just(1,2,3).buffer(2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> list) {
Log.i("mytag","size:"+list.size());
}});

运行打印结果如下:

11-02 20:49:58.370 23392-23392/? I/mytag: size:2
11-02 20:49:58.370 23392-23392/? I/mytag: size:1

在开发当中,个人经常将Buffer和Map一起使用,常发生在从后台取完数据,对一个List中的数据进行预处理后,再用Buffer缓存后一起发送,保证最后数据接收还是一个List,如下:

List<School> schoolList = new ArrayList<>();
Observable.from(schoolList).map(new Func1<School, School>() {
@Override
public School call(School school) {
school.setName("NB大学"); //将所有学校改名
return school;
}}).buffer(schoolList.size()) //缓存起来,最后一起发送
.subscribe(new Action1<List<School>>() {
@Override
public void call(List<School> schools) {
}});
  • Take:发射前n项数据,还是用上面的例子,假设不要改所有学校的名称了,就改前四个学校的名称:
Observable.from(schoolList).take(4).map(new Func1<School, School>() {
@Override
public School call(School school) {
school.setName("NB大学");
return school;
}}).buffer(4).subscribe(new Action1<List<School>>() {
@Override
public void call(List<School> schools) {
}});
  • Distinct: 去掉重复的项,比较好理解:
Observable.just(1, 2, 1, 1, 2, 3)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println("Next: " + item);
}
});

输出

Next: 1
Next: 2
Next: 3
  • Filter:过滤,通过谓词判断的项才会被发射,例如,发射小于4的数据:
Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return( item < 4 );
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println("Next: " + item);
}});

输出:

Next: 1
Next: 2
Next: 3

以下内容摘自http://blog.csdn.net/u012124438/article/details/53730717

  • zip

Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter 1");
emitter.onNext(1);
Log.d(TAG, "emitter 2");
emitter.onNext(2);
Log.d(TAG, "emitter 3");
emitter.onNext(3);
Log.d(TAG, "emitter 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emitter A");
emitter.onNext("A");
Log.d(TAG, "emitter B");
emitter.onNext("B");
Log.d(TAG, "emitter C");
emitter.onNext("C");
Log.d(TAG, "emitter complete2");
emitter.onComplete();
}
});

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

我们分别创建了observable, 一个发送1,2,3,4,Complete, 另一个发送A,B,C,Complete, 接着用Zip把发出的事件组合, 来看看运行结果吧:
RxJava零基础入门(三)
观察发现observable1发送事件后,observable2才发送
这是因为我们两个observable都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序呀.

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);

Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);

Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);

Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);

Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);

Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);

Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);

Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

好了, 这次我们让事件都在IO线程里发送事件, 再来看看运行结果:
RxJava零基础入门(三)
第一个observable明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢?
这是因为我们之前说了, zip发送的事件数量跟observable中发送事件最少的那一个的事件数量是有关的, 在这个例子里我们observable2只发送了三个事件然后就发送了Complete, 这个时候尽管observable1还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢?

  • compose

与 flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件。
1.compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。
2.compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。
3.flatMap 每发送一个事件都创建一个 Observable,所以效率较低。而 compose 操作符只在主干数据流上执行操作。
4.建议使用 compose 代替 flatMap。

这一篇就先讲这么多吧,重点掌握Map和FlatMap操作符,因为真的很实用、很实用、很实用,重要的事讲三遍。

喜欢的点个赞吧。