在写这几篇 RxJava 笔记时,发现官方文档很久都没有更新啊。
一些前辈两年前写的学习笔记内容跟现在也基本一致,RxJava 2.x 的文档也基本没有,不知道是不是缺实习生。
本文内容为 RxJava 官方文档 学习笔记
作者:shixinzhang
读完本文你将了解:
变换型操作符
变换型操作符可以将 Observable 发射的数据进行变换。
Buffer
Buffer
可以周期性地将 Observable 发射的数据聚成一堆发出去,而不是一个个发射。
Buffer
即“缓存”,它可以将一个个发射数据的 Observable A 转换成周期性地发射元素缓存集合的 Observable B。
不同语言 Buffer
的实现有很多种,它们在选择缓存的方式上有所不同。
注意,如果源 Observable 发射了 onError 事件,转换后的 Observable 会直接发射 onError 事件,即使前面还有缓存事件没有发射。
Window
操作符和 Buffer
很相似,不同之处在于,Window
会将每波收集的缓存数据在发射前保存到独立的 Observable 中,而不是以一个数据结构的方式发射出去。
RxJava 中有多种 Buffer
实现。
buffer(count)
buffer(count)
以 List
的形式发射非重叠的缓存,每次发射至多 count 个数据。
public final Observable<List<T>> buffer(int count) {
return buffer(count, count);
}
使用例子:
private void transformingWithBuffer() {
Observable.range(2, 10)
.buffer(3)
.subscribe(getPrintSubscriber());
}
运行结果:
可以看到,经过 buffer()
后,源 Observable 发射的数据会以 3 个为缓存,缓存满了会以数组的形式发射出去。
buffer(count, skip)
前面看到, buffer(count)
的实现也是调用 buffer(count, skip)
,只不过它的 skip 就等于 count:
public final Observable<List<T>> buffer(int count) {
return buffer(count, count);
}
buffer(count, skip)
的作用是:以 List
的形式发射可能重叠的缓存(当 skip < count 时就会重叠;skip > count 时会有遗漏),从源 Observable 的第一个数据开始,每次至多缓存 count 个数据,然后就发射,下一次缓存时,跳过 skip 个数据,依次重复:
public final Observable<List<T>> buffer(int count, int skip) {
return lift(new OperatorBufferWithSize<T>(count, skip));
}
关于
lift
我们后续介绍。
使用例子:
private void transformingWithBufferSkip() {
Observable.range(2, 10)
.buffer(3, 4)
.subscribe(this.<List<Integer>>getPrintSubscriber());
}
运行结果:
可以看到,其实就是缓存 count 个数据然后发射出去,然后从后面 skip - count 个数据开始缓存、发射。
buffer(bufferClosingSelector)
当订阅到源 Observable 后,buffer(bufferClosingSelector)
会收集源发射的数据到 List 中,同时调用 bufferClosingSelector
生成一个新的 Observable。
当新 Observable 发射一个 TClosing
对象后,buffer
会把缓存的 List 发射出去,然后重复这个过程,直到源 Observable 结束。
public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) {
return lift(new OperatorBufferWithSingleObservable<T, TClosing>(bufferClosingSelector, 16));
}
使用例子:
private int emitCount;
private void transformingWithBufferClosingSelector() {
Observable.range(2, 10)
.buffer(new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
emitCount ++;
Log.d(TAG, "emitCount:" + emitCount);
return Observable.timer(3, TimeUnit.MILLISECONDS);
}
})
.subscribe(this.<List<Integer>>getPrintSubscriber());
}
运行结果:
可以看到,我们创建了一个 3 秒后发射数据的 Observable,3 秒后所有数据已经缓存完毕,因此参数里的 call()
只调用了一次。
buffer(boundary)
buffer(boundary)
的作用是,使用参数 boundary Observable 作为源 Observable 的监视器,发射不重叠的数据。
每次源 Observable 发射一个数据,它就把数据缓存到 List 中,等到 boundary Observable 发射数据时,buffer
就会把之前缓存的数据发射出去,以此重复。
这里的 boundary 就相当于一个提示发射的边界。
public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
return buffer(boundary, 16);
}
public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity) {
return lift(new OperatorBufferWithSingleObservable<T, B>(boundary, initialCapacity));
}
使用例子:
private void transformingWithBufferBoundary() {
Observable.interval(1, TimeUnit.SECONDS)
.buffer(Observable.interval(3, TimeUnit.SECONDS))
.subscribe(this.<List<Long>>getPrintSubscriber());
}
我们使用 interval()
创建一个每隔一秒发射递增整数序列的源 Observable,监视器是每隔 3 秒发射的 Observable,因此正常情况下,buffer 会收集源发射的整数到 List 中,每隔 3 秒发射一次。
运行结果:
可以看到,的确跟我们想的一样。
FlatMap
FlatMap
可以把源 Observable 发射的数据转换成多个 Observable,然后把这些 Observable 发射的数据放到一个 Observable 中。
FlatMap
操作符使用一个指定的函数对源 Observable 发射的每一项数据执行变换操作、返回一个新的 Observable,然后合并这些 Observables 发射的数据。
这个操作符的使用场景还是很多的,比如服务端返回的数据太复杂,我们只用到其中一部分数据,就可以使用 FlatMap
将数据取出来。
flatMap
注意:
FlatMap
会将最后的数据混合,因此顺序可能会改变。
RxJava 中对应的实现是 flatMap()
:
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
flatMap()
的输入是发射 T 类型的 Observable,输出是发射 R 类型的 Observable。
可以看到最后调用了
merge
,我们后续介绍它。
使用例子:
假设现在有嵌套的几种数据类型:年级、班级、学生名称,每个班级有多个学生、每个年级有多个班级,他们的结构是这样的:
//班级
public class Clazz {
private String name;
private List<String> studentNameList;
}
//年级
public class Grade {
private String name;
private List<Clazz> classList;
}
现在我们有一个年级的数据,想要拿到这个年级里所有学生的名称,以前的做法是两个 for 循环(遍历每个班、再遍历每个同学)把学生名称数据取出来放到一个单独的 List 里,现在用 RxJava 可以这样写:
private void transformingWithFlatMap() {
//数据源
Clazz secondClass = new Clazz("四年级二班", Arrays.asList("张三", "李四", "王五"));
Clazz thirdClass = new Clazz("四年级三班", Arrays.asList("赵六", "喜洋洋", "灰太狼"));
Grade forthGrade = new Grade("四年级", Arrays.asList(secondClass, thirdClass));
Observable.just(forthGrade)
.flatMap(new Func1<Grade, Observable<Clazz>>() {
@Override
public Observable<Clazz> call(final Grade grade) {
return Observable.from(grade.getClassList()); //先拿到年级里的班级数据,合并成一个班级 List
}
})
.flatMap(new Func1<Clazz, Observable<String>>() {
@Override
public Observable<String> call(final Clazz clazz) {
return Observable.from(clazz.getStudentNameList()); //再从每个班级里拿出所有学生名称数据,合并成一个 List
}
})
.subscribe(this.getPrintSubscriber());
}
两个 flatMap
搞定,逻辑上比循环套循环清晰多了。
运行结果:
注意:如果
flatMap
产生的任何一个 Observable 调用onError
异常终止了,最终合并的 Observable 会立即调用onError
并终止。
concatMap
在一些实现里,有另外一种类似的操作符 ConcatMap
,功能和 FlatMap
类似,但是会按严格的顺序将数据拼接在一起,不会改变顺序。
concatMap
类似于简单版本的 flatMap
,但是它按次序连接而不是合并那些生成的 Observables。
public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (this instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
return scalar.scalarFlatMap(func);
}
return unsafeCreate(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE));
}
使用和 flatMap
差不多:
public static Grade getGradeData() {
//数据源
Clazz secondClass = new Clazz("四年级二班", Arrays.asList("张三", "李四", "王五"));
Clazz thirdClass = new Clazz("四年级三班", Arrays.asList("赵六", "喜洋洋", "灰太狼"));
Grade forthGrade = new Grade("四年级", Arrays.asList(secondClass, thirdClass));
return forthGrade;
}
private void transformingWithConcatMap() {
Observable.just(DataCreator.getGradeData())
.concatMap(new Func1<Grade, Observable<Clazz>>() {
@Override
public Observable<Clazz> call(final Grade grade) {
return Observable.from(grade.getClassList());
}
})
.concatMap(new Func1<Clazz, Observable<?>>() {
@Override
public Observable<?> call(final Clazz clazz) {
return Observable.from(clazz.getStudentNameList());
}
})
.subscribe(this.getPrintSubscriber());
}
运行结果:
switchMap
switchMap
也可以像 flatMap
一样处理 Observable,将处理后的数据合并成一个 Observable。
不同之处在于它的 “喜新厌旧”:每次源 Observable 发射一个新的数据时,它会解除订阅之前发射的数据的 Observable,转而订阅新的数据。
就像上面的图一样,如果源 Observable 发射多个定时任务,不管前一个定时任务执行了多少,只要后一个定时任务开始执行,就不再接收前面的任务的结果了。举个例子:
private void transformingWithSwitchMap() {
Observable.just(Observable.timer(4, TimeUnit.SECONDS), Observable.range(2, 10))
.switchMap(new Func1<Observable<? extends Number>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Number> observable) {
return observable;
}
})
.subscribe(this.getPrintSubscriber());
}
上面代码中,我们的源 Observable 的数据是两个 Observable,第一个会在 4 秒后发射一个 0,第二个会立即发射从 2 往后的 10 个整数。
根据 switchMap
的特性,第一个 Observable 还没发射时第二个已经发射了,于是下游的订阅者解除对第一 Observable 的订阅,也就收不到 4 秒后发射的 0 了。
运行结果:
可以看到,的确没有收到 0 。
GroupBy
GroupBy
会将源 Observable 转换成多个 Observable,每个 Observable 发射源 Observable 的一部分数据。
数据项由哪一个 Observable 发射是由一个判定函数决定的,这个函数会给每一项数据指定一个 Key,Key相同的数据会被同一个 Observable 发射。
RxJava 中对应的实现是 groupBy()
:
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
return lift(new OperatorGroupBy<T, K, T>(keySelector));
}
groupBy()
返回的是一个 GroupedObservable
,Observable 的子类,它有一个额外的方法 getKey()
,这个 Key 就是经过计算、用于将数据分组到指定的 Observable 的值。
使用例子:
public static List<People> getPeopleData() {
return Arrays.asList(new People(15, "大熊"), new People(13, "静安"), new People(15, "胖虎"), new People(14, "多来A梦"), new People(13, "拭心"));
}
private void transformingWithGroupBy() {
Observable.from(DataCreator.getPeopleData())
.groupBy(new Func1<People, Integer>() {
@Override
public Integer call(final People people) {
return people.getAge();
}
})
.subscribe(new Action1<GroupedObservable<Integer, People>>() {
@Override
public void call(final GroupedObservable<Integer, People> integerPeopleGroupedObservable) {
integerPeopleGroupedObservable.buffer(2)
.subscribe(getPrintSubscriber());
}
});
}
我们创建了 5 个 People 对象,这个类有两个属性:age 和 name,然后在 groupBy()
中根据 age 分组,这样就可以得到一组发射 GroupedObservable
的 Observable,然后我们把它们两两一组,打印出来。
运行结果:
可以看到,的确是按 age 分了组。
注意:
groupBy()
将源 Observable 分解为多个发射GroupedObservable
的 Observable ,一旦有订阅,每个GroupedObservable
就开始缓存发射的数据。
如果你对某个GroupedObservable
没有兴趣却不进行处理,这个缓存可能形成一个潜在的内存泄露。
因此,你应该使用像take(0)
这样会丢弃自己的缓存的操作符。
Map
Map
操作符的作用是:对源 Observable 发射的每个数据都进行一个函数处理。
map
RxJava 中对应的实现有 map()
:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
使用起来也很方便:
private void transformingWithMap() {
Observable.range(1, 5)
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer integer) {
return integer * 3;
}
})
.subscribe(this.<Integer>getPrintSubscriber());
}![这里写图片描述](http://img.blog.csdn.net/20170717224345469?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvdTAxMTI0MDg3Nw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
运行结果:
cast
cast()
是 map()
的特殊版本,它的作用是将发射的数据强转成指定的类型。
public final <R> Observable<R> cast(final Class<R> klass) {
return lift(new OperatorCast<T, R>(klass));
}
使用也很简单:
private void transformingWithCast() {
Observable.range(1, 5)
.cast(String.class)
.subscribe(this.<String>getPrintSubscriber());
}
运行结果:
其实就跟强转一样,类型不一致会报错。
Scan
Scan
的作用是扫描、累积。
它可以将每次发射的数据都进行指定的函数计算,计算的结果作为参数参与下一次计算。
RxJava 中有两种实现。
scan(accumulator)
第一种是接收一个 Func2 作为参数:
public final Observable<T> scan(Func2<T, T, T> accumulator) {
return lift(new OperatorScan<T, T>(accumulator));
}
使用例子:
private void transformingWithScan() {
Observable.from(Arrays.asList(6, 4, 1, 5, 7))
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer lastResult, final Integer newItem) {
return lastResult + newItem;
}
})
.subscribe(this.<Integer>getPrintSubscriber());
}
运行结果:
scan(initialValue, accumulator)
第二种是多了一个初始值 initialValue
,它会参与第一次运算。
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}
使用例子:
private void transformingWithScan2() {
Observable.from(Arrays.asList(6, 4, 1, 5, 7))
.scan(1, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer lastResult, final Integer newItem) {
return lastResult + newItem;
}
})
.subscribe(this.<Integer>getPrintSubscriber());
}
运行结果:
可以看到,和前面的区别就是多发射了个初始值,结果多了 1 。
Window
Window
的作用是定期将源 Observable 发射的一部分数据切分为 一个 Observable 窗口,然后发射这个窗口。
Window
和 Buffer
非常相似,但它发射的不是源 Observable 数据的缓存包,而是一系列 Observable。
和 Buffer
一样,Window
也有很多变体,每一种都有自己分割源数据的方法:
使用方式和 Buffer
基本一致,这里我们只举一个例子:
private void transformingWithWindow() {
Observable.range(1, 10)
.window(3)
.subscribe(new Action1<Observable<Integer>>() {
@Override
public void call(final Observable<Integer> integerObservable) {
integerObservable.subscribe(getPrintSubscriber());
}
});
}
运行结果:
至此变换型操作符我们基本了解完了,已经成功了一小半,加油!
代码地址
发表自 张拭心的博客
Thanks
http://reactivex.io/documentation/operators.html
https://github.com/mcxiaoke/RxDocs/blob/master/Operators.md
https://github.com/mgp/effective-rxjava/blob/master/items/understand-switch-map.md