最近去检查眼睛,发现度数又涨了,唉,各位猿多注意保护自己的眼睛吧!
前面学了 RxJava 的三种关键操作符:
读完本文你将了解第四种(组合型操作符):
组合型操作符
组合型操作符即处理多个 Observable 数据源,在处理后将它们合并成一个 Observable。
Zip
Zip
操作符的作用是:通过一个特定的函数组合多个 Observable 发射的数据,然后发射每个组合的结果。
Zip
操作符对你选中的多个 Observable 发射的数据按顺序应用一个函数,然后返回一个 Observable,这个 Observable 发射函数的返回结果。
Zip
操作符对发射的数据的顺序很严格,如上图所示,它发射的第一个数据一定是 Observable A 发射的第一个数据和 Observable B 发射的第一个数据经过组合的结果;发射的第二个数据也一定是 A 发射的第二个数据和 B 发个的第二个数据的组合结果;一次类推,直到元素最少的 Observable 发射完元素。
RxJava 中对应的实现是 zip
和 zipWith
。
zip
RxJava 中,zip()
的重载方法有 11 种:
前 2 个支持以 Iterable
或者数组的形式传入多个 Observable,后面 9 个分别支持从 1 到 9 个 Observable 作为参数,所有方法的最后一个参数是一个函数,它接收各个 Observable 按顺序发出的数据,然后对它们进行一个操作,将操作的结果发射出去。
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return just(new Observable<?>[] { o1, o2 }).lift(new OperatorZip<R>(zipFunction));
}
使用例子:
private void zip() {
Observable<String> observableA = Observable.just("A", "B", "C", "d", "E");
Observable<Integer> observableB = Observable.just(1, 2, 3, 4);
Observable
.zip(observableA, observableB, new Func2<String, Integer, String>() {
@Override
public String call(final String s, final Integer integer) {
return s + "_" + integer;
}
})
.subscribe(this.<String>getPrintSubscriber());
}
运行结果:
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: A_1
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: B_2
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: C_3
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onNext: d_4
07-19 14:59:11.219 12254-12254/top.shixinzhang.rxjavademo I/System.out: onCompleted
可以看到,zip()
的确按顺序将 observableA 和 observableB 发射的数据组合了起来,然后发射了出去。当元素较少的一个 Observable 发射完后,zip 也就停止发射了。
zipWith
zipWith
也可以组合多个 Observable,不过和 zip
不同的是,zipWith
是非静态方法,它需要一个 Observable 来调用。
zipWith
两种重载:
public final <T2, R> Observable<R> zipWith(Observable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
return (Observable<R>)zip(this, other, zipFunction);
}
public final <T2, R> Observable<R> zipWith(Iterable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
return lift(new OperatorZipIterable<T, T2, R>(other, zipFunction));
}
第一个方法的参数是一个 Observable,它的作用是将当前 Observable 和参数 Observable 组合;第二个方法的参数是一个 Iterable,它的作用是将当前 Observable 和许多 Observable 组合。
使用例子:
private void zipWith() {
Observable<String> observableA = Observable.just("A", "B", "C", "d", "E");
Observable
.just(1, 2, 3, 4)
.zipWith(observableA, new Func2<Integer, String, String>() {
@Override
public String call(final Integer integer, final String s) {
return integer + ", " + s;
}
})
.subscribe(this.<String>getPrintSubscriber());
}
运行结果:
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 1, A
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 2, B
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 3, C
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onNext: 4, d
07-19 16:28:50.969 27717-27717/top.shixinzhang.rxjavademo I/System.out: onCompleted
和 zip
很相似是吧。
CombineLatest
CombineLatest
操作符的作用是:当两个 Observable 中任意一个发射数据时,会结合另外一个 Observable 最近发射的数据进行一些函数操作,然后将操作的结果发射出去。
CombineLatest
和 Zip
有点相似,都是将两个 Observable 发射的数据结合起来,不同的是,每个 Observable 都发射了新元素后, Zip
才进行操作然后发射操作结果;而 CombineLatest
在每个 Observable 都发射一个数据后,只要有一个 Observable 发射数据,CombineLatest
就会进行操作然后发射操作结果。
当任何一个 Observable 发射了新数据,CombineLatest
会将这个新数据与另外的 Observable 之前发射的最新数据进行一个函数操作。
RxJava 中有两种实现:combineLatest
, withLatestFrom
。
combineLatest
RxJava 中的 combineLatest()
有 10 种重载:
不同的地方基本就是接收 Observable 的个数不同。
public static <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
return (Observable<R>)combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
}
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
return unsafeCreate(new OnSubscribeCombineLatest<T, R>(sources, combineFunction));
}
使用例子:
/***
* 将 A 发射的数据与 B 之前发射最新的数据结合,进行函数操作
*/
private void combineLatest() {
Observable<Long> observableA = Observable.interval(3, TimeUnit.SECONDS);
Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS);
Observable
.combineLatest(observableA, observableB, new Func2<Long, Long, String>() {
@Override
public String call(final Long itemA, final Long itemB) {
return "combine result: " + itemA + "/" + itemB;
}
}).subscribe(this.<String>getPrintSubscriber());
}
在上面的代码中我们创建了两个 Observable,发射速率分别为 3 秒和 2 秒。运行结果:
07-24 15:38:53.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 0/0
07-24 15:38:54.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 0/1
07-24 15:38:56.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/1
07-24 15:38:56.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/2
07-24 15:38:58.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 1/3
07-24 15:38:59.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 2/3
07-24 15:39:00.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 2/4
07-24 15:39:02.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/4
07-24 15:39:02.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/5
07-24 15:39:04.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 3/6
07-24 15:39:05.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 4/6
07-24 15:39:06.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 4/7
07-24 15:39:08.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/7
07-24 15:39:08.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/8
07-24 15:39:10.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 5/9
07-24 15:39:11.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 6/9
07-24 15:39:12.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 6/10
07-24 15:39:14.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/10
07-24 15:39:14.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/11
07-24 15:39:16.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 7/12
07-24 15:39:17.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 8/12
07-24 15:39:18.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 8/13
07-24 15:39:20.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/13
07-24 15:39:20.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/14
07-24 15:39:22.505 27111-27162/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 9/15
07-24 15:39:23.505 27111-27161/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: combine result: 10/15
可以看到,有元素重复多次,说明一个 Observable 还没发射数据,另一个 Observable 发射数据就会出发 combineLatest
。
默认不在任何调度器上。
withLatestFrom
withLatestFrom
和 combineLatest
很相似,不同之处在于,它不是静态方法,必须通过一个 Observable 对象进行调用。而他的作用就是:只有在这个 Observable 对象发射数据时,才结合其他 Observable 发射的最新数据进行相关的函数操作。
也就是说把组合的主动权都交给了调用对象。
使用例子:
private void withLatestFrom() {
Observable<Long> observableA = Observable.interval(3, TimeUnit.SECONDS);
Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS);
observableB.withLatestFrom(observableA, new Func2<Long, Long, String>() {
@Override
public String call(final Long itemA, final Long itemB) {
return "withLatestFrom: " + itemA + "/" + itemB;
}
}).subscribe(this.<String>getPrintSubscriber());
}
运行结果:
07-24 15:56:13.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 1/0
07-24 15:56:15.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 2/1
07-24 15:56:17.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 3/1
07-24 15:56:19.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 4/2
07-24 15:56:21.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 5/3
07-24 15:56:23.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 6/3
07-24 15:56:25.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 7/4
07-24 15:56:27.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 8/5
07-24 15:56:29.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 9/5
07-24 15:56:31.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 10/6
07-24 15:56:33.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 11/7
07-24 15:56:35.775 28434-28450/top.shixinzhang.rxjavademo D/top.shixinzhang.rxjavademo: onNext: withLatestFrom: 12/7
的确如我们所想,只有 observableB 发射数据时,才进行组合操作。
Join
Join
操作符的作用是:在一个 Observable 发射的一条数据的时间窗口内,另外一个 Observable 发射了一条数据,就组合这两条数据。
如上图所示,我们可以给两个 Observable 各自定义它们发射的数据的时间窗口(可以理解为生命周期),在 Observable A 发射一个元素 a 后,在 a 的生命周期内,Observable B 只要发射了数据,就会和 a 组合。如果 A 定义的时间窗口比发射速率久,就会出现 B 发射的数据跟 A 的多个数据组合;反过来也一样,在 B 发射的元素时间窗口内,A 发射数据也会和 B 的元素组合。
Join 的概念不是很容易理解,这个操作符需要多费点心。
RxJava 中的实现有两种:join()
和 groupJoin()
。
join
API 对 join()
的介绍是:
Correlates the items emitted by two Observables based on overlapping durations.
即:根据重叠持续时间将两个 Observable 发出的项关联起来。
join()
方法如下:
public final <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Observable<TRight> right, Func1<T, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<T, TRight, R> resultSelector) {
return unsafeCreate(new OnSubscribeJoin<T, TRight, TLeftDuration, TRightDuration, R>(this, right, leftDurationSelector, rightDurationSelector, resultSelector));
}
它接收 4 个参数:
- right:将要和当前 Observable 元素组合的另一个 Observable
- leftDurationSelector:定义当前 Observable 发射元素的时间窗口函数,返回一个 Observable
- rightDurationSelector:定义 right Observable 发射元素的时间窗口函数
- resultSelector:在这个函数中做两个 Observable 元素的组合操作
使用例子:
private void join() {
//产生 0 2 4 6 8
Observable<Long> observableA = Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(final Long aLong) {
return aLong * 2;
}
})
.take(5);
//产生 0 3 6 9 12
Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(final Long aLong) {
return aLong * 3;
}
})
.take(5);
observableA.join(observableB,
new Func1<Long, Observable<Long>>() { //定义源 Observable 发射数据的时间窗口
@Override
public Observable<Long> call(final Long aLong) {
System.out.println("A:" + aLong);
return Observable.just(aLong).delay(2000 , TimeUnit.MILLISECONDS); //延迟 500 毫秒后发射,即声明周期为 1000毫秒
}
}, new Func1<Long, Observable<Long>>() { //定义第二个 Observable 发射数据的时间窗口
@Override
public Observable<Long> call(final Long aLong) {
System.out.println("B:" + aLong);
return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Long, String>() { //组合两个 Observable 发射的数据的函数
@Override
public String call(final Long aLong, final Long aLong2) {
return "join result:" + aLong + "/" + aLong2;
}
})
.subscribe(this.<String>getPrintSubscriber());
}
在上面的代码中,我们创建了 2 个 Observable,同时调用了 join()
方法,传入的参数中,第一个函数中定义 observableA
发射元素的时间窗口,这里定义为 2 s;第二个函数中定义了 observableB
的时间创建,为 1 s。
我们先根据下面的图理解一下:
observableA
中每隔一秒发射一个元素,时间窗口为 2s,在图上表现为两个大格子; observableB
每隔两秒发射一个元素,时间窗口为 1s,在图上表现为一个大格子。
我们可以看到,A 先发出 0 ,这时 B 还没有发射元素,所以无法结合。第二秒时 A 发出 2,B 发出 0,这时 A 发出的 0 时间窗口还没关闭,因此 A 的 0 和 2 都和 B 的 0 结合。以此类推,直到一方(这里是 A) 发射完元素后,停止结合。
运行结果:
07-24 18:14:17.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:0
07-24 18:14:18.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:2
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:0
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:0/0
07-24 18:14:18.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:2/0
07-24 18:14:19.735 715-769/top.shixinzhang.rxjavademo I/System.out: A:4
07-24 18:14:19.735 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:4/0
07-24 18:14:20.745 715-769/top.shixinzhang.rxjavademo I/System.out: A:6
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:3
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:4/3
07-24 18:14:20.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:6/3
07-24 18:14:21.745 715-769/top.shixinzhang.rxjavademo I/System.out: A:8
07-24 18:14:21.745 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:8/3
07-24 18:14:22.745 715-770/top.shixinzhang.rxjavademo I/System.out: B:6
07-24 18:14:22.745 715-770/top.shixinzhang.rxjavademo D/SubscriberCreator: onNext: join result:8/6
07-24 18:14:23.745 715-769/top.shixinzhang.rxjavademo D/SubscriberCreator: onCompleted
groupJoin
API 对 groupJoin()
的介绍是:
Returns an Observable that correlates two Observables when they overlap in time and groups the results.
即:将两个 Observable 发射的、时间上重叠的数据关联起来,然后分组结果。
groupJoin()
方法如下:
public final <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1<? super T, ? extends Observable<D1>> leftDuration,
Func1<? super T2, ? extends Observable<D2>> rightDuration,
Func2<? super T, ? super Observable<T2>, ? extends R> resultSelector) {
return unsafeCreate(new OnSubscribeGroupJoin<T, T2, D1, D2, R>(this, right, leftDuration, rightDuration, resultSelector));
}
前三个参数和 join()
一致,不同之处在与第四个参数:
new Func2<Long, Observable<Long>, Observable<String>>() {
@Override
public Observable<String> call(final Long itemA, final Observable<Long> longObservable) {
return longObservable.map(new Func1<Long, String>() {
@Override
public String call(final Long itemB) {
return "groupJoin result:" + itemA + "/" + itemB;
}
});
}
}
它返回的是一个 Observable,而不是一个 Object。
使用例子:
private void groupJoin() {
//产生 0 2 4 6 8
Observable<Long> observableA = Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(final Long aLong) {
return aLong * 2;
}
})
.take(5);
//产生 0 3 6 9 12
Observable<Long> observableB = Observable.interval(2, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(final Long aLong) {
return aLong * 3;
}
})
.take(5);
observableA.groupJoin(observableB,
new Func1<Long, Observable<Long>>() { //定义源 Observable 发射数据的时间窗口
@Override
public Observable<Long> call(final Long aLong) {
System.out.println("A:" + aLong);
return Observable.just(aLong).delay(2000, TimeUnit.MILLISECONDS); //延迟 500 毫秒后发射,即声明周期为 1000毫秒
}
}, new Func1<Long, Observable<Long>>() { //定义第二个 Observable 发射数据的时间窗口
@Override
public Observable<Long> call(final Long aLong) {
System.out.println("B:" + aLong);
return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Observable<Long>, Observable<String>>() {
@Override
public Observable<String> call(final Long itemA, final Observable<Long> longObservable) {
return longObservable.map(new Func1<Long, String>() {
@Override
public String call(final Long itemB) {
return "groupJoin result:" + itemA + "/" + itemB;
}
});
}
})
.subscribe(new Action1<Observable<String>>() {
@Override
public void call(final Observable<String> observable) {
observable.subscribe(new Action1<String>() {
@Override
public void call(final String s) {
System.out.println("onNext:" + s);
}
});
}
});
}
运行结果:
07-26 10:46:35.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:0
07-26 10:46:36.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:0
07-26 10:46:36.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:0/0
07-26 10:46:36.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:2
07-26 10:46:36.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:2/0
07-26 10:46:37.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:4
07-26 10:46:37.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:4/0
07-26 10:46:38.715 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:6
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:3
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:4/3
07-26 10:46:38.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:6/3
07-26 10:46:39.725 25244-25259/top.shixinzhang.rxjavademo I/System.out: A:8
07-26 10:46:39.725 25244-25259/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:8/3
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:6
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:8/6
07-26 10:46:40.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: onNext:groupJoin result:6/6
07-26 10:46:42.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:9
07-26 10:46:44.715 25244-25260/top.shixinzhang.rxjavademo I/System.out: B:12
可以看到,groupJoin()
用起来比较费劲啊,目前我还没发现具体的用途。
Merge
Merge
操作符的作用正如它的名字一样,将多个 Observable 发射的数据组合到一个 Observable 中。
Merge
不保证元素发射的顺序,可能会导致顺序错乱(与之对应的是 Concat
操作符,它可以先按顺序发射一个 Observable 发射的数据,然后再按顺序发射下一个的 )。
在上面的图中,一旦有一个 Observable 发出 onError
事件,整个 merge 的过程也就结束了。
为了处理这种问题,在许多 ReactiveX 实现中提供了 MergeDelayError
操作符,它收到 onError
事件后会保留,直到所有 Observable 都发射完数据才传递给观察者。
RxJava 中这两个操作符的实现分别为 :merge
和 mergeDelayError
。
merge
这个图可以比较清晰地表现出 merge
处理 onError
事件的方式。
merge
有 14 种重载,其中主要是接受参数个数的不同,结合前面的操作符可以看到,组合类的操作符很多都这样。
merge
源码:
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
return merge(new Observable[] { t1, t2 });
}
public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}
使用例子:
/**
* 组合两个 Observable 发出的数据,不保证顺序
*/
private void merge() {
Observable<Integer> observableA = Observable.range(0 , 5) //在另外一个线程
.subscribeOn(Schedulers.io());
Observable<Integer> observableB = Observable.range(10, 5);
Observable.merge(observableA, observableB)
.subscribe(this.<Integer>getPrintSubscriber());
}
运行结果:
merge
是静态的类方法,RxJava 还提供了实例方法 mergeWith()
,作用和 merge
一样:
public final Observable<T> mergeWith(Observable<? extends T> t1) {
return merge(this, t1);
}
mergeDelayError
mergeDelayError
和 merge
非常相似,不同之处就在于前面介绍的,对 onError
事件的处理。
Concat
在官方文档中,Concat
属于算数聚合运算符,不输入组合型,但是为了方便介绍相关的操作符,我们在这篇一起了解了吧。
和 Merge
相似但又不同的是,Concat
会按顺序发射多个 Observable 发射的数据,重点就是 按顺序。
Concat
操作符会将多个 Observable 发射的数据组合到一个 Observable 然后发射出去。第一个 Observable 发射的所有数据在第二个 Observable 发射数据之前发射,以此类推。
直到前面一个 Observable 终止,Concat
才会订阅后一个 Observable 。
注意:
如果你想连接一个”热的” Observable(即在创建后立即开始发射数据的 Observable,即使没有订阅者),Concat
将不会看到订阅前“热“ Observable 发射的任何数据。
在一些 ReactiveX 的实现中,还有一个 ConcatMap
操作符 (之前写的 concatMap 的链接),它会将源 Observable 发射的数据进行变换处理,拆分成多个 Observable,然后按顺序链接起来。
StartWith
操作符可以说是位置相反的 Concat
。
RxJava 中对应的实现是 concat
。
RxJava 中 concat()
是一个静态方法,有多种重载,区别就是拼接的 Observable 个数,concat()
会将参数中的 Observable 按在参数中的位置发射出去。
使用起来也很简单,没什么好说的了。
/**
* 按顺序拼接
*/
private void concat() {
Observable<Integer> observableA = Observable.range(0 , 5);
Observable<Integer> observableB = Observable.range(10, 5);
Observable.concat(observableB, observableA)
.subscribe(this.<Integer>getPrintSubscriber());
}
还有一个 concatWith()
方法, Observable.concat(a,b)
等价于 a.concatWith(b)
.
StartWith
StartWith
操作符的作用和名字一样,在源 Observable 发射数据之前插入指定的数据。
如果你想要一个 Observable 在发射数据时先发射一些特定的数据,可以使用 StartWith
;
如果你想要一个 Observable 在发射数据后再发射一些特定的数据,可以使用 Concat
。
RxJava 对应的实现是 startWith()
:
有很多重载,表示可以在前面插入的数据类型可以是 Observable, Iterable 或者直接是几个数据。
startWith()
的实现也是调用的 concat()
:
public final Observable<T> startWith(Observable<T> values) {
return concat(values, this);
}
使用例子:
/**
* 先发射前面的
*/
private void startWith() {
Observable<Integer> observableA = Observable.range(0 , 5);
Observable<Integer> observableB = Observable.range(10, 5);
observableB.startWith(observableA)
.subscribe(this.<Integer>getPrintSubscriber());
}
运行结果:
Switch
Switch
操作符的作用是:将一个发射多个 Observable 的 Observable 变成一个单独的 Observable,它发射那些中间 Observables 最近发射的数据项。
翻译的不太好理解,先上个图:
Switch
订阅一个发射多个 Observable 的 Observable。
每次源 Observable 发射新的 Observable,Switch
会解除对前一个 Observable 的订阅,转向订阅新的 Observable(注意,这个切换的过程发生在源 Observable 发射新的 Observable 时,而不是新 Observable 发射元素时)。
这意味着,在新 Observable 产生到它开始发射数据之前的这段时间里,前一个 Observable 发射的数据将被丢弃(就像上图里的那个黄色圆圈一样)。
RxJava 中对应的实现是 switchOnNext
。
switchOnNext()
是一个静态方法,参数是一个发射 Observable 的 Observable:
public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
return sequenceOfSequences.lift(OperatorSwitch.<T>instance(false));
}
使用例子:
/**
* 喜新厌旧,一发射新的 Observable,就取消订阅之前的
*/
private void switchOnNext() {
Observable<Observable<Integer>> observableObservable = Observable.unsafeCreate(new Observable.OnSubscribe<Observable<Integer>>() {
@Override
public void call(final Subscriber<? super Observable<Integer>> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(Observable.range(1, 10).delay(i, TimeUnit.SECONDS));
}
subscriber.onCompleted();
}
});
Observable.switchOnNext(observableObservable)
.subscribe(this.<Integer>getPrintSubscriber());
}
运行结果:
还有一个类似的实现是 switchOnNextDelayError
,和 switchOnNext()
的不同之处也是对 onError
事件的处理。
public static <T> Observable<T> switchOnNextDelayError(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
return sequenceOfSequences.lift(OperatorSwitch.<T>instance(true));
}
Thanks
http://reactivex.io/documentation/operators.html
https://github.com/mcxiaoke/RxDocs/blob/master/Operators.md
http://blog.csdn.net/job_hesc/article/details/46612015
http://avenwu.net/2016/05/10/understand-the-join-operation-in-rx/