RxJava使用详解--转换操作符

时间:2022-06-12 17:49:09

RxJava使用详解系列文章

RxJava使用详解--创建操作符

RxJava使用详解--过滤操作符

详细的例子可以查看文章末尾的源码

RxJava使用详解--转换操作符

这篇文章主要将RxJava中常见 的转换操作符。


1.Buffer():

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。Buffer操作符将一个Observable变换成另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。如果原来的Observable发射了一个onError通知,buffer会立即传递这个通知,而不是首先发射缓存的数据,即使已经缓存了数据。buffer有很多的变体,有多个重载方法。

RxJava使用详解--转换操作符

buffer(count):以列表(List集合)的形式发射非重叠(不是指没有重复元素,而是集合的界限)的缓存数据,每一个缓存(List集合)最多包含来自原始Observable的count个数据(最后发射的列表List可能少于count个,最后一个list可能没有count个元素),此处的count相当于集合的size()
//1.buffer(count):以列表(List集合)的形式发射非重叠(不是指没有重复元素,而是集合的界限)的缓存数据,每一个缓存(List集合)最多包含来自原始Observable的count个数据(最后发射的列表List可能少于count个,最后一个list可能没有count个元素),此处的count相当于集合的size()
System.out.println("buffer(count) ");
Observable.range(1, 6).buffer(2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
System.out.println("onNnext:" + integers.toString());
}
});
/**
* 2.buffer(count,skip) :有这么一种情况 buffer(count) 等价于 buffer(count,count).
* 详细说一下: buffer(2)输出为 【1,2】【3,4】【5,6】 那么buffer(2,3)就要这么算:首先从第一项开始缓存2个数据,【1,2】,此时的skip=3,意思是第二个集合的第一项是第skip+1项的数据,也就是第四个,集合长度是2,结果就是【4,5】,
* 此时可用的数据是4,5,6,再计算第三个集合的时候,第三个集合的第一项就应该是第skip+1项的数据,然而没有第四个数据,就没有第三个集合了。
*/
System.out.println("- - - - - - - - - - - - - - - - - - - - - ");
System.out.println("buffer(count,skip) ");
Observable.range(1, 6).buffer(3, 2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
System.out.println("onNnext:" + integers.toString());
}
});

输出结果:

buffer(count)
onNnext:[1, 2]
onNnext:[3, 4]
onNnext:[5, 6]
onCompleted:完成
- - - - - - - - - - - - - - - - - - - - -
buffer(count,skip)
[1, 2, 3]
[3, 4, 5]
[5, 6]
onCompleted:完成


2.map() flatMap()

map():对Observable发射的每一项数据使用函数执行变换操作,然后在发射出去。返回的对象可以随便指定,可以实现一对一的转换
flatmap():使用指定函数对原始的Observable发射的每一项数据执行变换操作,返回一个Observable,这个Observable也可以发射数据,可以实现一对多转换,可能还会出现次序出错的问题,使用的merge,不能保证原来的顺序
相同点:都是讲传入的参数转化之后返回另一个对象,不同点:flatmap()返回的是Observable对象,
flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
RxJava使用详解--转换操作符

RxJava使用详解--转换操作符


定义一个学生类
class Student{
private String name;
private List<String> course = new ArrayList<>();
.....省略
}

RxJava使用详解--转换操作符


输出结果:

onNext:数学  所在线程:main
onNext:语文  所在线程:main
onNext:英语  所在线程:main
- - - - - - - - - - - -
小明
onNext:小刚 所在线程:main


3.concatMap()

类似于最简单版本的flatMap,唯一不同是concatMap是按次序连接而不是合并那那些生成的Observables,然后产生自己的数据序列,concatMap操作符在处理产生的Observable时,采用的是“concat”连接的方式,不是“merge”,如果需求是要保证顺序的话建议用concatMap()

RxJava使用详解--转换操作符

RxJava使用详解--转换操作符

输出结果:

onNext:1 所在线程:main
onNext:2 所在线程:main
onNext:3 所在线程:main
onNext:4 所在线程:main
onNext:5 所在线程:main
onNext:6 所在线程:main


4.switchMap():

类似于flatMap(),有一点不同,只监视当前Observable,或者是最后发射的数据。需要延时执行,当是延时是0 的时候回发射第一个数据,延时是大于0的任何值都是发射最后一个值

有一个使用场景:搜索的时候,在输入完毕之后再去请求数据,大可不必没每输入一个都要发送请求。
RxJava使用详解--转换操作符

Observable.just(10, 20, 30).switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {

return Observable.just(integer).delay(0, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("switchMap onNext:" + integer);
}
});

输出结果:

12-18 15:23:08.180 27918-27918/? I/System.out: switchMap onNext:10
12-18 15:23:08.181 27918-27918/? I/System.out: switchMap onNext:30


5.cast():

类似于map操作符,map()操作符可以自定义规则,将值A1编程A2,A1和A2的类型可以一样也可以不一样;而cast()操作符主要是做类型转换的,

传入参数的类型class,如果Observable发射的结果不能转成指定的class,就会抛出ClassCastException运行异常
RxJava使用详解--转换操作符

Observable.just("苹果","香蕉").cast(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("cast onNext " + s);
}
});

输出结果:

cast onNext 苹果
cast onNext 香蕉

6.groupBy():

对原来Observable发射的数据进行分组,形成一个GroupedObservable的结果集,GroupedObservable.getKey()可以获取键,

注意:由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅也不对其进行别的操作符运算)就有可能出现内存泄漏,所以如果不对GroupedObservable进行处理,最好对其使用操作符take(0)处理

RxJava使用详解--转换操作符


Observable.just(1,2,3,4).take(4).groupBy(new Func1<Integer, String>() {
@Override
public String call(Integer value) {
return value < 3 ? "第一组":"第二组"; //这里执行分组的函数
}
}).subscribe(new Action1<GroupedObservable<String, Integer>>() {
@Override
public void call(final GroupedObservable<String, Integer> result) {
result.subscribe(new Action1<Integer>() {
@Override
public void call(Integer value) {
System.out.println("GroupBy onNext " + result.getKey() +":"+ value);
}
});
}
});

输出结果:

12-18 15:28:21.825 1776-1776/? I/System.out: GroupBy  onNext 第一组:1
12-18 15:28:21.825 1776-1776/? I/System.out: GroupBy  onNext 第一组:2
12-18 15:28:21.825 1776-1776/? I/System.out: GroupBy  onNext 第二组:3
12-18 15:28:21.825 1776-1776/? I/System.out: GroupBy  onNext 第二组:4


7.scan():

通过遍历原来Observable产生的结果,每一次对每一个结果按照指定规则进行运算,

计算的结果作为下一个迭代项参数,每一次都会把迭代项数据发射给订阅者,这里result就是那个迭代项,scan()操作符有个变体,可以设置初始值,下方就是讲初始值设置成了6,发射的第一个值就是6而不是1
RxJava使用详解--转换操作符

RxJava使用详解--转换操作符

Observable.just(1,2,3,4,5).scan(6,new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer result, Integer item2) {
return result + item2 ;//result是上一次计算的结果
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("scan onNext " + integer);
}
});

输出结果:

scan onNext 6
scan onNext 7
scan onNext 9
scan onNext 12
scan onNext 16
scan onNext 21


8.window():

类似于buffer()操作符,区别在于buffer操作符产生的结果是List缓存,而window()操作符产生的是一个Observable对象,订阅者可以对这个产生的Observable对象重新进行订阅处理,window操作符有很多重载方法

RxJava使用详解--转换操作符

Observable.interval(1, TimeUnit.SECONDS).take(6)
.window(3,TimeUnit.SECONDS).subscribe(new Action1<Observable<Long>>() {
@Override
public void call(Observable<Long> observable) {
observable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("window onNext " + aLong);
}
});
}
});

输出结果:

12-18 15:38:24.100 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 0
12-18 15:38:25.100 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 1
12-18 15:38:26.099 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 2
12-18 15:38:27.099 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 3
12-18 15:38:28.099 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 4
12-18 15:38:29.099 12515-12553/com.dingmouren.rxjavademo I/System.out: window onNext 5


更多详细内容,可以查看源码