RxJava使用详解系列文章
详细的例子可以查看文章末尾的源码
这篇文章主要将RxJava中常见 的转换操作符。
1.Buffer():
定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。Buffer操作符将一个Observable变换成另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。如果原来的Observable发射了一个onError通知,buffer会立即传递这个通知,而不是首先发射缓存的数据,即使已经缓存了数据。buffer有很多的变体,有多个重载方法。
buffer(count):以列表(List集合)的形式发射非重叠(不是指没有重复元素,而是集合的界限)的缓存数据,每一个缓存(List集合)最多包含来自原始Observable的count个数据(最后发射的列表List可能少于count个,最后一个list可能没有count个元素),此处的count相当于集合的size()
//1.buffer(count):以列表(List集合)的形式发射非重叠(不是指没有重复元素,而是集合的界限)的缓存数据,每一个缓存(List集合)最多包含来自原始Observable的count个数据(最后发射的列表List可能少于count个,最后一个list可能没有count个元素),此处的count相当于集合的size()
System.out.println("buffer(count) ");
Observable.range(1, 6).buffer(2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
System.out.println("onNnext:" + integers.toString());
}
});
/**
* 2.buffer(count,skip) :有这么一种情况 buffer(count) 等价于 buffer(count,count).
* 详细说一下: buffer(2)输出为 【1,2】【3,4】【5,6】 那么buffer(2,3)就要这么算:首先从第一项开始缓存2个数据,【1,2】,此时的skip=3,意思是第二个集合的第一项是第skip+1项的数据,也就是第四个,集合长度是2,结果就是【4,5】,
* 此时可用的数据是4,5,6,再计算第三个集合的时候,第三个集合的第一项就应该是第skip+1项的数据,然而没有第四个数据,就没有第三个集合了。
*/
System.out.println("- - - - - - - - - - - - - - - - - - - - - ");
System.out.println("buffer(count,skip) ");
Observable.range(1, 6).buffer(3, 2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
System.out.println("onNnext:" + integers.toString());
}
});
输出结果:
buffer(count)
onNnext:[1, 2]
onNnext:[3, 4]
onNnext:[5, 6]
onCompleted:完成
- - - - - - - - - - - - - - - - - - - - -
buffer(count,skip)
[1, 2, 3]
[3, 4, 5]
[5, 6]
onCompleted:完成
2.map() flatMap()
map():对Observable发射的每一项数据使用函数执行变换操作,然后在发射出去。返回的对象可以随便指定,可以实现一对一的转换
flatmap():使用指定函数对原始的Observable发射的每一项数据执行变换操作,返回一个Observable,这个Observable也可以发射数据,可以实现一对多转换,可能还会出现次序出错的问题,使用的merge,不能保证原来的顺序
相同点:都是讲传入的参数转化之后返回另一个对象,不同点:flatmap()返回的是Observable对象,flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
定义一个学生类class Student{}
private String name;
private List<String> course = new ArrayList<>();
.....省略
输出结果:
onNext:数学 所在线程:main
onNext:语文 所在线程:main
onNext:英语 所在线程:main
- - - - - - - - - - - -
小明
onNext:小刚 所在线程:main
3.concatMap()
类似于最简单版本的flatMap,唯一不同是concatMap是按次序连接而不是合并那那些生成的Observables,然后产生自己的数据序列,concatMap操作符在处理产生的Observable时,采用的是“concat”连接的方式,不是“merge”,如果需求是要保证顺序的话建议用concatMap()
输出结果:
onNext:1 所在线程:main
onNext:2 所在线程:main
onNext:3 所在线程:main
onNext:4 所在线程:main
onNext:5 所在线程:main
onNext:6 所在线程:main
4.switchMap():
类似于flatMap(),有一点不同,只监视当前Observable,或者是最后发射的数据。需要延时执行,当是延时是0 的时候回发射第一个数据,延时是大于0的任何值都是发射最后一个值
有一个使用场景:搜索的时候,在输入完毕之后再去请求数据,大可不必没每输入一个都要发送请求。
Observable.just(10, 20, 30).switchMap(new Func1<Integer, Observable<Integer>>() {输出结果:
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer).delay(0, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("switchMap onNext:" + integer);
}
});12-18 15:23:08.180 27918-27918/? I/System.out: switchMap onNext:10
12-18 15:23:08.181 27918-27918/? I/System.out: switchMap onNext:30
5.cast():
类似于map操作符,map()操作符可以自定义规则,将值A1编程A2,A1和A2的类型可以一样也可以不一样;而cast()操作符主要是做类型转换的,
传入参数的类型class,如果Observable发射的结果不能转成指定的class,就会抛出ClassCastException运行异常
Observable.just("苹果","香蕉").cast(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("cast onNext " + s);
}
});输出结果:
cast onNext 苹果
cast onNext 香蕉
6.groupBy():对原来Observable发射的数据进行分组,形成一个GroupedObservable的结果集,GroupedObservable.getKey()可以获取键,
注意:由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅也不对其进行别的操作符运算)就有可能出现内存泄漏,所以如果不对GroupedObservable进行处理,最好对其使用操作符take(0)处理
Observable.just(1,2,3,4).take(4).groupBy(new Func1<Integer, String>() {
@Override
public String call(Integer value) {
return value < 3 ? "第一组":"第二组"; //这里执行分组的函数
}
}).subscribe(new Action1<GroupedObservable<String, Integer>>() {
@Override
public void call(final GroupedObservable<String, Integer> result) {
result.subscribe(new Action1<Integer>() {
@Override
public void call(Integer value) {
System.out.println("GroupBy onNext " + result.getKey() +":"+ value);
}
});
}
});输出结果:
12-18 15:28:21.825 1776-1776/? I/System.out: GroupBy onNext 第一组:1
12-18 15:28:21.825 1776-1776/? I/System.out: GroupBy onNext 第一组:2
12-18 15:28:21.825 1776-1776/? I/System.out: GroupBy onNext 第二组:3
12-18 15:28:21.825 1776-1776/? I/System.out: GroupBy onNext 第二组:4
7.scan():
通过遍历原来Observable产生的结果,每一次对每一个结果按照指定规则进行运算,
计算的结果作为下一个迭代项参数,每一次都会把迭代项数据发射给订阅者,这里result就是那个迭代项,scan()操作符有个变体,可以设置初始值,下方就是讲初始值设置成了6,发射的第一个值就是6而不是1
Observable.just(1,2,3,4,5).scan(6,new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer result, Integer item2) {
return result + item2 ;//result是上一次计算的结果
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("scan onNext " + integer);
}
});输出结果:
scan onNext 6
scan onNext 7
scan onNext 9
scan onNext 12
scan onNext 16
scan onNext 21
8.window():
类似于buffer()操作符,区别在于buffer操作符产生的结果是List缓存,而window()操作符产生的是一个Observable对象,订阅者可以对这个产生的Observable对象重新进行订阅处理,window操作符有很多重载方法
Observable.interval(1, TimeUnit.SECONDS).take(6)
.window(3,TimeUnit.SECONDS).subscribe(new Action1<Observable<Long>>() {
@Override
public void call(Observable<Long> observable) {
observable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("window onNext " + aLong);
}
});
}
});输出结果:
12-18 15:38:24.100 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 0
12-18 15:38:25.100 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 1
12-18 15:38:26.099 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 2
12-18 15:38:27.099 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 3
12-18 15:38:28.099 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 4
12-18 15:38:29.099 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 5
更多详细内容,可以查看源码