RxJava 1.x 笔记:变换型操作符

时间:2022-08-29 17:48:50

在写这几篇 RxJava 笔记时,发现官方文档很久都没有更新啊。

一些前辈两年前写的学习笔记内容跟现在也基本一致,RxJava 2.x 的文档也基本没有,不知道是不是缺实习生。

本文内容为 RxJava 官方文档 学习笔记
作者:shixinzhang

读完本文你将了解:

变换型操作符

变换型操作符可以将 Observable 发射的数据进行变换。

Buffer

Buffer 可以周期性地将 Observable 发射的数据聚成一堆发出去,而不是一个个发射。

RxJava 1.x 笔记:变换型操作符

Buffer 即“缓存”,它可以将一个个发射数据的 Observable A 转换成周期性地发射元素缓存集合的 Observable B。

不同语言 Buffer 的实现有很多种,它们在选择缓存的方式上有所不同。

注意,如果源 Observable 发射了 onError 事件,转换后的 Observable 会直接发射 onError 事件,即使前面还有缓存事件没有发射。

Window 操作符和 Buffer 很相似,不同之处在于,Window 会将每波收集的缓存数据在发射前保存到独立的 Observable 中,而不是以一个数据结构的方式发射出去。

RxJava 中有多种 Buffer 实现。

buffer(count)

buffer(count)List 的形式发射非重叠的缓存,每次发射至多 count 个数据。

RxJava 1.x 笔记:变换型操作符

public final Observable<List<T>> buffer(int count) {
return buffer(count, count);
}

使用例子:

private void transformingWithBuffer() {
Observable.range(2, 10)
.buffer(3)
.subscribe(getPrintSubscriber());
}

运行结果:

RxJava 1.x 笔记:变换型操作符

可以看到,经过 buffer() 后,源 Observable 发射的数据会以 3 个为缓存,缓存满了会以数组的形式发射出去。

buffer(count, skip)

前面看到, buffer(count) 的实现也是调用 buffer(count, skip),只不过它的 skip 就等于 count:

public final Observable<List<T>> buffer(int count) {
return buffer(count, count);
}

buffer(count, skip) 的作用是:以 List 的形式发射可能重叠的缓存(当 skip < count 时就会重叠;skip > count 时会有遗漏),从源 Observable 的第一个数据开始,每次至多缓存 count 个数据,然后就发射,下一次缓存时,跳过 skip 个数据,依次重复:

RxJava 1.x 笔记:变换型操作符

public final Observable<List<T>> buffer(int count, int skip) {
return lift(new OperatorBufferWithSize<T>(count, skip));
}

关于 lift 我们后续介绍。

使用例子:

private void transformingWithBufferSkip() {
Observable.range(2, 10)
.buffer(3, 4)
.subscribe(this.<List<Integer>>getPrintSubscriber());
}

运行结果:

RxJava 1.x 笔记:变换型操作符

可以看到,其实就是缓存 count 个数据然后发射出去,然后从后面 skip - count 个数据开始缓存、发射。

buffer(bufferClosingSelector)

RxJava 1.x 笔记:变换型操作符

当订阅到源 Observable 后,buffer(bufferClosingSelector) 会收集源发射的数据到 List 中,同时调用 bufferClosingSelector 生成一个新的 Observable。

当新 Observable 发射一个 TClosing 对象后,buffer 会把缓存的 List 发射出去,然后重复这个过程,直到源 Observable 结束。

public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) {
return lift(new OperatorBufferWithSingleObservable<T, TClosing>(bufferClosingSelector, 16));
}

使用例子:

private int emitCount;
private void transformingWithBufferClosingSelector() {
Observable.range(2, 10)
.buffer(new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
emitCount ++;
Log.d(TAG, "emitCount:" + emitCount);
return Observable.timer(3, TimeUnit.MILLISECONDS);
}
})
.subscribe(this.<List<Integer>>getPrintSubscriber());
}

运行结果:

RxJava 1.x 笔记:变换型操作符

可以看到,我们创建了一个 3 秒后发射数据的 Observable,3 秒后所有数据已经缓存完毕,因此参数里的 call() 只调用了一次。

buffer(boundary)

RxJava 1.x 笔记:变换型操作符

buffer(boundary) 的作用是,使用参数 boundary Observable 作为源 Observable 的监视器,发射不重叠的数据。

每次源 Observable 发射一个数据,它就把数据缓存到 List 中,等到 boundary Observable 发射数据时,buffer 就会把之前缓存的数据发射出去,以此重复。

这里的 boundary 就相当于一个提示发射的边界。

public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
return buffer(boundary, 16);
}
public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity) {
return lift(new OperatorBufferWithSingleObservable<T, B>(boundary, initialCapacity));
}

使用例子:

private void transformingWithBufferBoundary() {
Observable.interval(1, TimeUnit.SECONDS)
.buffer(Observable.interval(3, TimeUnit.SECONDS))
.subscribe(this.<List<Long>>getPrintSubscriber());
}

我们使用 interval() 创建一个每隔一秒发射递增整数序列的源 Observable,监视器是每隔 3 秒发射的 Observable,因此正常情况下,buffer 会收集源发射的整数到 List 中,每隔 3 秒发射一次。

运行结果:

RxJava 1.x 笔记:变换型操作符

可以看到,的确跟我们想的一样。

FlatMap

FlatMap 可以把源 Observable 发射的数据转换成多个 Observable,然后把这些 Observable 发射的数据放到一个 Observable 中。

RxJava 1.x 笔记:变换型操作符

FlatMap 操作符使用一个指定的函数对源 Observable 发射的每一项数据执行变换操作、返回一个新的 Observable,然后合并这些 Observables 发射的数据。

这个操作符的使用场景还是很多的,比如服务端返回的数据太复杂,我们只用到其中一部分数据,就可以使用 FlatMap 将数据取出来。

flatMap

注意:FlatMap 会将最后的数据混合,因此顺序可能会改变。

RxJava 中对应的实现是 flatMap():

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}

flatMap() 的输入是发射 T 类型的 Observable,输出是发射 R 类型的 Observable。

可以看到最后调用了 merge,我们后续介绍它。

使用例子:

假设现在有嵌套的几种数据类型:年级、班级、学生名称,每个班级有多个学生、每个年级有多个班级,他们的结构是这样的:

//班级
public class Clazz {
private String name;
private List<String> studentNameList;
}
//年级
public class Grade {
private String name;
private List<Clazz> classList;
}

现在我们有一个年级的数据,想要拿到这个年级里所有学生的名称,以前的做法是两个 for 循环(遍历每个班、再遍历每个同学)把学生名称数据取出来放到一个单独的 List 里,现在用 RxJava 可以这样写:

private void transformingWithFlatMap() {
//数据源
Clazz secondClass = new Clazz("四年级二班", Arrays.asList("张三", "李四", "王五"));
Clazz thirdClass = new Clazz("四年级三班", Arrays.asList("赵六", "喜洋洋", "灰太狼"));
Grade forthGrade = new Grade("四年级", Arrays.asList(secondClass, thirdClass));

Observable.just(forthGrade)
.flatMap(new Func1<Grade, Observable<Clazz>>() {
@Override
public Observable<Clazz> call(final Grade grade) {
return Observable.from(grade.getClassList()); //先拿到年级里的班级数据,合并成一个班级 List
}
})
.flatMap(new Func1<Clazz, Observable<String>>() {
@Override
public Observable<String> call(final Clazz clazz) {
return Observable.from(clazz.getStudentNameList()); //再从每个班级里拿出所有学生名称数据,合并成一个 List
}
})
.subscribe(this.getPrintSubscriber());
}

两个 flatMap 搞定,逻辑上比循环套循环清晰多了。

运行结果:

RxJava 1.x 笔记:变换型操作符

注意:如果 flatMap 产生的任何一个 Observable 调用 onError 异常终止了,最终合并的 Observable 会立即调用 onError 并终止。

concatMap

在一些实现里,有另外一种类似的操作符 ConcatMap,功能和 FlatMap 类似,但是会按严格的顺序将数据拼接在一起,不会改变顺序。

RxJava 1.x 笔记:变换型操作符

concatMap 类似于简单版本的 flatMap,但是它按次序连接而不是合并那些生成的 Observables。

public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (this instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
return scalar.scalarFlatMap(func);
}
return unsafeCreate(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE));
}

使用和 flatMap 差不多:

public static Grade getGradeData() {
//数据源
Clazz secondClass = new Clazz("四年级二班", Arrays.asList("张三", "李四", "王五"));
Clazz thirdClass = new Clazz("四年级三班", Arrays.asList("赵六", "喜洋洋", "灰太狼"));
Grade forthGrade = new Grade("四年级", Arrays.asList(secondClass, thirdClass));
return forthGrade;
}
private void transformingWithConcatMap() {
Observable.just(DataCreator.getGradeData())
.concatMap(new Func1<Grade, Observable<Clazz>>() {
@Override
public Observable<Clazz> call(final Grade grade) {
return Observable.from(grade.getClassList());
}
})
.concatMap(new Func1<Clazz, Observable<?>>() {
@Override
public Observable<?> call(final Clazz clazz) {
return Observable.from(clazz.getStudentNameList());
}
})
.subscribe(this.getPrintSubscriber());
}

运行结果:

RxJava 1.x 笔记:变换型操作符

switchMap

RxJava 1.x 笔记:变换型操作符

switchMap 也可以像 flatMap 一样处理 Observable,将处理后的数据合并成一个 Observable。

不同之处在于它的 “喜新厌旧”:每次源 Observable 发射一个新的数据时,它会解除订阅之前发射的数据的 Observable,转而订阅新的数据。

就像上面的图一样,如果源 Observable 发射多个定时任务,不管前一个定时任务执行了多少,只要后一个定时任务开始执行,就不再接收前面的任务的结果了。举个例子:

private void transformingWithSwitchMap() {
Observable.just(Observable.timer(4, TimeUnit.SECONDS), Observable.range(2, 10))
.switchMap(new Func1<Observable<? extends Number>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Number> observable) {
return observable;
}
})
.subscribe(this.getPrintSubscriber());
}

上面代码中,我们的源 Observable 的数据是两个 Observable,第一个会在 4 秒后发射一个 0,第二个会立即发射从 2 往后的 10 个整数。

根据 switchMap 的特性,第一个 Observable 还没发射时第二个已经发射了,于是下游的订阅者解除对第一 Observable 的订阅,也就收不到 4 秒后发射的 0 了。

运行结果:

RxJava 1.x 笔记:变换型操作符

可以看到,的确没有收到 0 。

GroupBy

GroupBy 会将源 Observable 转换成多个 Observable,每个 Observable 发射源 Observable 的一部分数据。

RxJava 1.x 笔记:变换型操作符

数据项由哪一个 Observable 发射是由一个判定函数决定的,这个函数会给每一项数据指定一个 Key,Key相同的数据会被同一个 Observable 发射。

RxJava 中对应的实现是 groupBy():

public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
return lift(new OperatorGroupBy<T, K, T>(keySelector));
}

groupBy() 返回的是一个 GroupedObservable,Observable 的子类,它有一个额外的方法 getKey(),这个 Key 就是经过计算、用于将数据分组到指定的 Observable 的值。

使用例子:

public static List<People> getPeopleData() {
return Arrays.asList(new People(15, "大熊"), new People(13, "静安"), new People(15, "胖虎"), new People(14, "多来A梦"), new People(13, "拭心"));
}
private void transformingWithGroupBy() {
Observable.from(DataCreator.getPeopleData())
.groupBy(new Func1<People, Integer>() {
@Override
public Integer call(final People people) {
return people.getAge();
}
})
.subscribe(new Action1<GroupedObservable<Integer, People>>() {
@Override
public void call(final GroupedObservable<Integer, People> integerPeopleGroupedObservable) {
integerPeopleGroupedObservable.buffer(2)
.subscribe(getPrintSubscriber());
}
});
}

我们创建了 5 个 People 对象,这个类有两个属性:age 和 name,然后在 groupBy() 中根据 age 分组,这样就可以得到一组发射 GroupedObservable 的 Observable,然后我们把它们两两一组,打印出来。

运行结果:

RxJava 1.x 笔记:变换型操作符

可以看到,的确是按 age 分了组。

注意:groupBy() 将源 Observable 分解为多个发射 GroupedObservable 的 Observable ,一旦有订阅,每个 GroupedObservable 就开始缓存发射的数据。
如果你对某个 GroupedObservable 没有兴趣却不进行处理,这个缓存可能形成一个潜在的内存泄露。
因此,你应该使用像 take(0) 这样会丢弃自己的缓存的操作符。

Map

Map 操作符的作用是:对源 Observable 发射的每个数据都进行一个函数处理。

RxJava 1.x 笔记:变换型操作符

map

RxJava 中对应的实现有 map():

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

使用起来也很方便:

private void transformingWithMap() {
Observable.range(1, 5)
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(final Integer integer) {
return integer * 3;
}
})
.subscribe(this.<Integer>getPrintSubscriber());
}![这里写图片描述](http://img.blog.csdn.net/20170717224345469?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvdTAxMTI0MDg3Nw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)

运行结果:

RxJava 1.x 笔记:变换型操作符

cast

RxJava 1.x 笔记:变换型操作符

cast()map() 的特殊版本,它的作用是将发射的数据强转成指定的类型。

public final <R> Observable<R> cast(final Class<R> klass) {
return lift(new OperatorCast<T, R>(klass));
}

使用也很简单:

private void transformingWithCast() {
Observable.range(1, 5)
.cast(String.class)
.subscribe(this.<String>getPrintSubscriber());
}

运行结果:

RxJava 1.x 笔记:变换型操作符

其实就跟强转一样,类型不一致会报错。

Scan

Scan 的作用是扫描、累积。

它可以将每次发射的数据都进行指定的函数计算,计算的结果作为参数参与下一次计算。

RxJava 1.x 笔记:变换型操作符

RxJava 中有两种实现。

scan(accumulator)

第一种是接收一个 Func2 作为参数:

public final Observable<T> scan(Func2<T, T, T> accumulator) {
return lift(new OperatorScan<T, T>(accumulator));
}

使用例子:

private void transformingWithScan() {
Observable.from(Arrays.asList(6, 4, 1, 5, 7))
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer lastResult, final Integer newItem) {
return lastResult + newItem;
}
})
.subscribe(this.<Integer>getPrintSubscriber());
}

运行结果:

RxJava 1.x 笔记:变换型操作符

scan(initialValue, accumulator)

第二种是多了一个初始值 initialValue,它会参与第一次运算。

RxJava 1.x 笔记:变换型操作符

public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

使用例子:

private void transformingWithScan2() {
Observable.from(Arrays.asList(6, 4, 1, 5, 7))
.scan(1, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer lastResult, final Integer newItem) {
return lastResult + newItem;
}
})
.subscribe(this.<Integer>getPrintSubscriber());
}

运行结果:

RxJava 1.x 笔记:变换型操作符

可以看到,和前面的区别就是多发射了个初始值,结果多了 1 。

Window

Window 的作用是定期将源 Observable 发射的一部分数据切分为 一个 Observable 窗口,然后发射这个窗口。

RxJava 1.x 笔记:变换型操作符

WindowBuffer 非常相似,但它发射的不是源 Observable 数据的缓存包,而是一系列 Observable。

Buffer 一样,Window 也有很多变体,每一种都有自己分割源数据的方法:

RxJava 1.x 笔记:变换型操作符

使用方式和 Buffer 基本一致,这里我们只举一个例子:

private void transformingWithWindow() {
Observable.range(1, 10)
.window(3)
.subscribe(new Action1<Observable<Integer>>() {
@Override
public void call(final Observable<Integer> integerObservable) {
integerObservable.subscribe(getPrintSubscriber());
}
});
}

运行结果:

RxJava 1.x 笔记:变换型操作符

至此变换型操作符我们基本了解完了,已经成功了一小半,加油!

代码地址

发表自 张拭心的博客

Thanks

http://reactivex.io/documentation/operators.html
https://github.com/mcxiaoke/RxDocs/blob/master/Operators.md
https://github.com/mgp/effective-rxjava/blob/master/items/understand-switch-map.md