RxJava 1.x 笔记:组合型操作符

时间:2021-01-23 17:50:21

最近去检查眼睛,发现度数又涨了,唉,各位猿多注意保护自己的眼睛吧!

前面学了 RxJava 的三种关键操作符:

  1. 创建型操作符
  2. 过滤型操作符
  3. 变换型操作符

读完本文你将了解第四种(组合型操作符):

组合型操作符

组合型操作符即处理多个 Observable 数据源,在处理后将它们合并成一个 Observable。

Zip

Zip 操作符的作用是:通过一个特定的函数组合多个 Observable 发射的数据,然后发射每个组合的结果。

RxJava 1.x 笔记:组合型操作符

Zip 操作符对你选中的多个 Observable 发射的数据按顺序应用一个函数,然后返回一个 Observable,这个 Observable 发射函数的返回结果。

Zip 操作符对发射的数据的顺序很严格,如上图所示,它发射的第一个数据一定是 Observable A 发射的第一个数据和 Observable B 发射的第一个数据经过组合的结果;发射的第二个数据也一定是 A 发射的第二个数据和 B 发个的第二个数据的组合结果;一次类推,直到元素最少的 Observable 发射完元素。

RxJava 中对应的实现是 zipzipWith

zip

RxJava 1.x 笔记:组合型操作符

RxJava 中,zip() 的重载方法有 11 种:

RxJava 1.x 笔记:组合型操作符

前 2 个支持以 Iterable 或者数组的形式传入多个 Observable,后面 9 个分别支持从 1 到 9 个 Observable 作为参数,所有方法的最后一个参数是一个函数,它接收各个 Observable 按顺序发出的数据,然后对它们进行一个操作,将操作的结果发射出去。

public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return just(new Observable<?>
[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
}

使用例子:

private void zip() {
Observable<String> observableA = Observable.just("A", "B", "C", "d", "E");
Observable<Integer> observableB = Observable.just(1, 2, 3, 4);

Observable
.zip(observableA, observableB, new Func2<String, Integer, String>() {
@Override
public String call(final String s, final Integer integer) {
return s + "_" + integer;
}
})
.subscribe(this.<String>getPrintSubscriber());
}

运行结果:

07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: A_1
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: B_2
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: C_3
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: d_4
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onCompleted

可以看到,zip() 的确按顺序将 observableA 和 observableB 发射的数据组合了起来,然后发射了出去。当元素较少的一个 Observable 发射完后,zip 也就停止发射了。

zipWith

RxJava 1.x 笔记:组合型操作符

zipWith 也可以组合多个 Observable,不过和 zip 不同的是,zipWith 是非静态方法,它需要一个 Observable 来调用。

zipWith 两种重载:

public final <T2, R> Observable<R> zipWith(Observable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
return (Observable<R>)zip(this, other, zipFunction);
}
public final <T2, R> Observable<R> zipWith(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
return lift(new OperatorZipIterable<T, T2, R>(other, zipFunction));
}

第一个方法的参数是一个 Observable,它的作用是将当前 Observable 和参数 Observable 组合;第二个方法的参数是一个 Iterable,它的作用是将当前 Observable 和许多 Observable 组合。

使用例子:

private void zipWith() {
Observable<String> observableA = Observable.just("A", "B", "C", "d", "E");
Observable
.just(1, 2, 3, 4)
.zipWith(observableA, new Func2<Integer, String, String>() {
@Override
public String call(final Integer integer, final String s) {
return integer + ", " + s;
}
})
.subscribe(this.<String>getPrintSubscriber());
}

运行结果:

07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 1, A
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 2, B
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 3, C
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 4, d
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onCompleted

zip 很相似是吧。

CombineLatest

CombineLatest 操作符的作用是:当两个 Observable 中任意一个发射数据时,会结合另外一个 Observable 最近发射的数据进行一些函数操作,然后将操作的结果发射出去。

RxJava 1.x 笔记:组合型操作符

CombineLatestZip 有点相似,都是将两个 Observable 发射的数据结合起来,不同的是,每个 Observable 都发射了新元素后, Zip 才进行操作然后发射操作结果;而 CombineLatest 在每个 Observable 都发射一个数据后,只要有一个 Observable 发射数据,CombineLatest 就会进行操作然后发射操作结果。

当任何一个 Observable 发射了新数据,CombineLatest 会将这个新数据与另外的 Observable 之前发射的最新数据进行一个函数操作。

RxJava 中有两种实现:combineLatest, withLatestFrom

combineLatest

RxJava 1.x 笔记:组合型操作符

RxJava 中的 combineLatest() 有 10 种重载:

RxJava 1.x 笔记:组合型操作符

不同的地方基本就是接收 Observable 的个数不同。

public static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
return (Observable<R>)combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
}
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
return unsafeCreate(new OnSubscribeCombineLatest<T, R>(sources, combineFunction));
}

使用例子:

/***
* 将 A 发射的数据与 B 之前发射最新的数据结合,进行函数操作
*/

private void combineLatest() {
Observable<Long> observableA = Observable.interval(3, TimeUnit.SECONDS);
Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS);

Observable
.combineLatest(observableA, observableB, new Func2<Long, Long, String>() {
@Override
public String call(final Long itemA, final Long itemB) {
return "combine result: " + itemA + "/" + itemB;
}
}).subscribe(this.<String>getPrintSubscriber());
}

在上面的代码中我们创建了两个 Observable,发射速率分别为 3 秒和 2 秒。运行结果:

07-24 15:38:53.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 0/0
07-24 15:38:54.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 0/1
07-24 15:38:56.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/1
07-24 15:38:56.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/2
07-24 15:38:58.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/3
07-24 15:38:59.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 2/3
07-24 15:39:00.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 2/4
07-24 15:39:02.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/4
07-24 15:39:02.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/5
07-24 15:39:04.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/6
07-24 15:39:05.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 4/6
07-24 15:39:06.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 4/7
07-24 15:39:08.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/7
07-24 15:39:08.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/8
07-24 15:39:10.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/9
07-24 15:39:11.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 6/9
07-24 15:39:12.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 6/10
07-24 15:39:14.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/10
07-24 15:39:14.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/11
07-24 15:39:16.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/12
07-24 15:39:17.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 8/12
07-24 15:39:18.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 8/13
07-24 15:39:20.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/13
07-24 15:39:20.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/14
07-24 15:39:22.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/15
07-24 15:39:23.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 10/15

可以看到,有元素重复多次,说明一个 Observable 还没发射数据,另一个 Observable 发射数据就会出发 combineLatest

默认不在任何调度器上。

withLatestFrom

RxJava 1.x 笔记:组合型操作符

withLatestFromcombineLatest 很相似,不同之处在于,它不是静态方法,必须通过一个 Observable 对象进行调用。而他的作用就是:只有在这个 Observable 对象发射数据时,才结合其他 Observable 发射的最新数据进行相关的函数操作。

也就是说把组合的主动权都交给了调用对象。

使用例子:

private void withLatestFrom() {
Observable<Long> observableA = Observable.interval(3, TimeUnit.SECONDS);
Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS);

observableB.withLatestFrom(observableA, new Func2<Long, Long, String>() {
@Override
public String call(final Long itemA, final Long itemB) {
return "withLatestFrom: " + itemA + "/" + itemB;
}
}).subscribe(this.<String>getPrintSubscriber());
}

运行结果:

07-24 15:56:13.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 1/0
07-24 15:56:15.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 2/1
07-24 15:56:17.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 3/1
07-24 15:56:19.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 4/2
07-24 15:56:21.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 5/3
07-24 15:56:23.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 6/3
07-24 15:56:25.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 7/4
07-24 15:56:27.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 8/5
07-24 15:56:29.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 9/5
07-24 15:56:31.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 10/6
07-24 15:56:33.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 11/7
07-24 15:56:35.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 12/7

的确如我们所想,只有 observableB 发射数据时,才进行组合操作。

Join

Join 操作符的作用是:在一个 Observable 发射的一条数据的时间窗口内,另外一个 Observable 发射了一条数据,就组合这两条数据。

RxJava 1.x 笔记:组合型操作符

如上图所示,我们可以给两个 Observable 各自定义它们发射的数据的时间窗口(可以理解为生命周期),在 Observable A 发射一个元素 a 后,在 a 的生命周期内,Observable B 只要发射了数据,就会和 a 组合。如果 A 定义的时间窗口比发射速率久,就会出现 B 发射的数据跟 A 的多个数据组合;反过来也一样,在 B 发射的元素时间窗口内,A 发射数据也会和 B 的元素组合。

Join 的概念不是很容易理解,这个操作符需要多费点心。

RxJava 中的实现有两种:join()groupJoin()

join

RxJava 1.x 笔记:组合型操作符

API 对 join()的介绍是:

Correlates the items emitted by two Observables based on overlapping durations.

即:根据重叠持续时间将两个 Observable 发出的项关联起来。

join() 方法如下:

public final <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Observable<TRight> right, Func1<T, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<T, TRight, R> resultSelector) {
return unsafeCreate(new OnSubscribeJoin<T, TRight, TLeftDuration, TRightDuration, R>(this, right, leftDurationSelector, rightDurationSelector, resultSelector));
}

它接收 4 个参数:

  1. right:将要和当前 Observable 元素组合的另一个 Observable
  2. leftDurationSelector:定义当前 Observable 发射元素的时间窗口函数,返回一个 Observable
  3. rightDurationSelector:定义 right Observable 发射元素的时间窗口函数
  4. resultSelector:在这个函数中做两个 Observable 元素的组合操作

使用例子:

private void join() {
//产生 0 2 4 6 8
Observable<Long> observableA = Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(final Long aLong) {
return aLong * 2;
}
})
.take(5);

//产生 0 3 6 9 12
Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(final Long aLong) {
return aLong * 3;
}
})
.take(5);

observableA.join(observableB,
new Func1<Long, Observable<Long>>() { //定义源 Observable 发射数据的时间窗口
@Override
public Observable<Long> call(final Long aLong) {
System.out.println("A:" + aLong);
return Observable.just(aLong).delay(2000 , TimeUnit.MILLISECONDS); //延迟 500 毫秒后发射,即声明周期为 1000毫秒
}
}, new Func1<Long, Observable<Long>>() { //定义第二个 Observable 发射数据的时间窗口
@Override
public Observable<Long> call(final Long aLong) {
System.out.println("B:" + aLong);
return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Long, String>() { //组合两个 Observable 发射的数据的函数
@Override
public String call(final Long aLong, final Long aLong2) {
return "join result:" + aLong + "/" + aLong2;
}
})
.subscribe(this.<String>getPrintSubscriber());
}

在上面的代码中,我们创建了 2 个 Observable,同时调用了 join() 方法,传入的参数中,第一个函数中定义 observableA 发射元素的时间窗口,这里定义为 2 s;第二个函数中定义了 observableB 的时间创建,为 1 s。

我们先根据下面的图理解一下:

RxJava 1.x 笔记:组合型操作符

observableA 中每隔一秒发射一个元素,时间窗口为 2s,在图上表现为两个大格子;
observableB 每隔两秒发射一个元素,时间窗口为 1s,在图上表现为一个大格子。

我们可以看到,A 先发出 0 ,这时 B 还没有发射元素,所以无法结合。第二秒时 A 发出 2,B 发出 0,这时 A 发出的 0 时间窗口还没关闭,因此 A 的 0 和 2 都和 B 的 0 结合。以此类推,直到一方(这里是 A) 发射完元素后,停止结合。

运行结果:

07-24 18:14:17.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:0
07-24 18:14:18.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:2
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:0
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:0/0
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:2/0
07-24 18:14:19.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:4
07-24 18:14:19.735 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:4/0
07-24 18:14:20.745 715-769/top.shixinzhang.rxjavademo I/System.out: A:6
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:3
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:4/3
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:6/3
07-24 18:14:21.745 715-769/top.shixinzhang.rxjavademo I/System.out: A:8
07-24 18:14:21.745 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:8/3
07-24 18:14:22.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:6
07-24 18:14:22.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:8/6
07-24 18:14:23.745 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onCompleted

groupJoin

RxJava 1.x 笔记:组合型操作符

API 对 groupJoin() 的介绍是:

Returns an Observable that correlates two Observables when they overlap in time and groups the results.

即:将两个 Observable 发射的、时间上重叠的数据关联起来,然后分组结果。

groupJoin() 方法如下:

public final <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1<? super T, ? extends Observable<D1>> leftDuration,
Func1<? super T2, ? extends Observable<D2>> rightDuration,
Func2<? super T, ? super Observable<T2>, ? extends R> resultSelector) {
return unsafeCreate(new OnSubscribeGroupJoin<T, T2, D1, D2, R>(this, right, leftDuration, rightDuration, resultSelector));
}

前三个参数和 join() 一致,不同之处在与第四个参数:

new Func2<Long, Observable<Long>, Observable<String>>() {
@Override
public Observable<String> call(final Long itemA, final Observable<Long> longObservable) {
return longObservable.map(new Func1<Long, String>() {
@Override
public String call(final Long itemB) {
return "groupJoin result:" + itemA + "/" + itemB;
}
});
}
}

它返回的是一个 Observable,而不是一个 Object。

使用例子:

private void groupJoin() {

//产生 0 2 4 6 8
Observable<Long> observableA = Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(final Long aLong) {
return aLong * 2;
}
})
.take(5);

//产生 0 3 6 9 12
Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(final Long aLong) {
return aLong * 3;
}
})
.take(5);

observableA.groupJoin(observableB,
new Func1<Long, Observable<Long>>() { //定义源 Observable 发射数据的时间窗口
@Override
public Observable<Long> call(final Long aLong) {
System.out.println("A:" + aLong);
return Observable.just(aLong).delay(2000, TimeUnit.MILLISECONDS); //延迟 500 毫秒后发射,即声明周期为 1000毫秒
}
}, new Func1<Long, Observable<Long>>() { //定义第二个 Observable 发射数据的时间窗口
@Override
public Observable<Long> call(final Long aLong) {
System.out.println("B:" + aLong);
return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Observable<Long>, Observable<String>>() {
@Override
public Observable<String> call(final Long itemA, final Observable<Long> longObservable) {
return longObservable.map(new Func1<Long, String>() {
@Override
public String call(final Long itemB) {
return "groupJoin result:" + itemA + "/" + itemB;
}
});
}
})
.subscribe(new Action1<Observable<String>>() {
@Override
public void call(final Observable<String> observable) {
observable.subscribe(new Action1<String>() {
@Override
public void call(final String s) {
System.out.println("onNext:" + s);
}
});
}
});
}

运行结果:

07-26 10:46:35.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:0
07-26 10:46:36.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:0
07-26 10:46:36.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:0/0
07-26 10:46:36.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:2
07-26 10:46:36.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:2/0
07-26 10:46:37.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:4
07-26 10:46:37.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:4/0
07-26 10:46:38.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:6
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:3
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:4/3
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:6/3
07-26 10:46:39.725 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:8
07-26 10:46:39.725 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:8/3
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:6
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:8/6
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:6/6
07-26 10:46:42.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:9
07-26 10:46:44.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:12

可以看到,groupJoin() 用起来比较费劲啊,目前我还没发现具体的用途。

Merge

Merge 操作符的作用正如它的名字一样,将多个 Observable 发射的数据组合到一个 Observable 中。

RxJava 1.x 笔记:组合型操作符

Merge 不保证元素发射的顺序,可能会导致顺序错乱(与之对应的是 Concat 操作符,它可以先按顺序发射一个 Observable 发射的数据,然后再按顺序发射下一个的 )。

在上面的图中,一旦有一个 Observable 发出 onError 事件,整个 merge 的过程也就结束了。

RxJava 1.x 笔记:组合型操作符

为了处理这种问题,在许多 ReactiveX 实现中提供了 MergeDelayError 操作符,它收到 onError 事件后会保留,直到所有 Observable 都发射完数据才传递给观察者。

RxJava 中这两个操作符的实现分别为 :mergemergeDelayError

merge

RxJava 1.x 笔记:组合型操作符

这个图可以比较清晰地表现出 merge 处理 onError 事件的方式。

merge 有 14 种重载,其中主要是接受参数个数的不同,结合前面的操作符可以看到,组合类的操作符很多都这样。

RxJava 1.x 笔记:组合型操作符

merge 源码:

public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
return merge(new Observable[] { t1, t2 });
}
public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}

使用例子:

/**
* 组合两个 Observable 发出的数据,不保证顺序
*/

private void merge() {
Observable<Integer> observableA = Observable.range(0 , 5) //在另外一个线程
.subscribeOn(Schedulers.io());
Observable<Integer> observableB = Observable.range(10, 5);

Observable.merge(observableA, observableB)
.subscribe(this.<Integer>getPrintSubscriber());
}

运行结果:

merge 是静态的类方法,RxJava 还提供了实例方法 mergeWith(),作用和 merge 一样:

public final Observable<T> mergeWith(Observable<? extends T> t1) {
return merge(this, t1);
}

mergeDelayError

RxJava 1.x 笔记:组合型操作符

mergeDelayErrormerge 非常相似,不同之处就在于前面介绍的,对 onError 事件的处理。

RxJava 1.x 笔记:组合型操作符

Concat

官方文档中,Concat 属于算数聚合运算符,不输入组合型,但是为了方便介绍相关的操作符,我们在这篇一起了解了吧。

Merge 相似但又不同的是,Concat 会按顺序发射多个 Observable 发射的数据,重点就是 按顺序

RxJava 1.x 笔记:组合型操作符

Concat 操作符会将多个 Observable 发射的数据组合到一个 Observable 然后发射出去。第一个 Observable 发射的所有数据在第二个 Observable 发射数据之前发射,以此类推。

直到前面一个 Observable 终止,Concat 才会订阅后一个 Observable 。

注意:
如果你想连接一个”热的” Observable(即在创建后立即开始发射数据的 Observable,即使没有订阅者),Concat 将不会看到订阅前“热“ Observable 发射的任何数据。

在一些 ReactiveX 的实现中,还有一个 ConcatMap 操作符 (之前写的 concatMap 的链接),它会将源 Observable 发射的数据进行变换处理,拆分成多个 Observable,然后按顺序链接起来。

StartWith 操作符可以说是位置相反的 Concat

RxJava 中对应的实现是 concat

RxJava 1.x 笔记:组合型操作符

RxJava 中 concat() 是一个静态方法,有多种重载,区别就是拼接的 Observable 个数,concat() 会将参数中的 Observable 按在参数中的位置发射出去。

RxJava 1.x 笔记:组合型操作符

使用起来也很简单,没什么好说的了。

/**
* 按顺序拼接
*/

private void concat() {

Observable<Integer> observableA = Observable.range(0 , 5);
Observable<Integer> observableB = Observable.range(10, 5);

Observable.concat(observableB, observableA)
.subscribe(this.<Integer>getPrintSubscriber());
}

还有一个 concatWith() 方法, Observable.concat(a,b) 等价于 a.concatWith(b).

StartWith

StartWith 操作符的作用和名字一样,在源 Observable 发射数据之前插入指定的数据。

RxJava 1.x 笔记:组合型操作符

如果你想要一个 Observable 在发射数据时先发射一些特定的数据,可以使用 StartWith
如果你想要一个 Observable 在发射数据后再发射一些特定的数据,可以使用 Concat

RxJava 对应的实现是 startWith():

RxJava 1.x 笔记:组合型操作符

有很多重载,表示可以在前面插入的数据类型可以是 Observable, Iterable 或者直接是几个数据。

startWith() 的实现也是调用的 concat():

public final Observable<T> startWith(Observable<T> values) {
return concat(values, this);
}

使用例子:

/**
* 先发射前面的
*/

private void startWith() {
Observable<Integer> observableA = Observable.range(0 , 5);
Observable<Integer> observableB = Observable.range(10, 5);

observableB.startWith(observableA)
.subscribe(this.<Integer>getPrintSubscriber());
}

运行结果:

Switch

Switch 操作符的作用是:将一个发射多个 Observable 的 Observable 变成一个单独的 Observable,它发射那些中间 Observables 最近发射的数据项。

翻译的不太好理解,先上个图:

RxJava 1.x 笔记:组合型操作符

Switch 订阅一个发射多个 Observable 的 Observable。

每次源 Observable 发射新的 Observable,Switch 会解除对前一个 Observable 的订阅,转向订阅新的 Observable(注意,这个切换的过程发生在源 Observable 发射新的 Observable 时,而不是新 Observable 发射元素时)。

这意味着,在新 Observable 产生到它开始发射数据之前的这段时间里,前一个 Observable 发射的数据将被丢弃(就像上图里的那个黄色圆圈一样)。

RxJava 中对应的实现是 switchOnNext

RxJava 1.x 笔记:组合型操作符

switchOnNext() 是一个静态方法,参数是一个发射 Observable 的 Observable:

public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
return sequenceOfSequences.lift(OperatorSwitch.<T>instance(false));
}

使用例子:

/**
* 喜新厌旧,一发射新的 Observable,就取消订阅之前的
*/

private void switchOnNext() {
Observable<Observable<Integer>> observableObservable = Observable.unsafeCreate(new Observable.OnSubscribe<Observable<Integer>>() {
@Override
public void call(final Subscriber<? super Observable<Integer>> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(Observable.range(1, 10).delay(i, TimeUnit.SECONDS));
}
subscriber.onCompleted();
}
});

Observable.switchOnNext(observableObservable)
.subscribe(this.<Integer>getPrintSubscriber());
}

运行结果:

还有一个类似的实现是 switchOnNextDelayError,和 switchOnNext() 的不同之处也是对 onError 事件的处理。

public static <T> Observable<T> switchOnNextDelayError(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
return sequenceOfSequences.lift(OperatorSwitch.<T>instance(true));
}

Thanks

http://reactivex.io/documentation/operators.html
https://github.com/mcxiaoke/RxDocs/blob/master/Operators.md
http://blog.csdn.net/job_hesc/article/details/46612015
http://avenwu.net/2016/05/10/understand-the-join-operation-in-rx/