1、简介:
之前几篇讲解的操作符多是单个被观察者对象发送事件,本篇来介绍下组合操作符的使用,组合操作符的作用:
组合 多个被观察者(Observable) & 合并需要发送的事件
2、类型:
3、操作符介绍
- concat() / concatArray():
- 作用:合并多个被观察者 ,发送的顺序与产生的顺序相同(串行发送)
- 二者联系:concat 使用时最多只能发送4个被观察者对象,concatArray可发送大于4个
- 使用场景:多个被观察者发送事件
- 使用实例:
Observable.concatArray( Observable.just("1"), Observable.just("2", "3"), Observable.just("4", "5", "6"), Observable.just("7", "8", "9","10"), Observable.just("A", "B", "C") ).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("concat=====", s); } });
输出的结果
05-03 20:04:52.379 6302-6302/? E/concat=====: 1 05-03 20:04:52.379 6302-6302/? E/concat=====: 2 05-03 20:04:52.379 6302-6302/? E/concat=====: 3 05-03 20:04:52.379 6302-6302/? E/concat=====: 4 05-03 20:04:52.379 6302-6302/? E/concat=====: 5 05-03 20:04:52.379 6302-6302/? E/concat=====: 6 05-03 20:04:52.379 6302-6302/? E/concat=====: 7 05-03 20:04:52.379 6302-6302/? E/concat=====: 8 05-03 20:04:52.379 6302-6302/? E/concat=====: 9 05-03 20:04:52.379 6302-6302/? E/concat=====: 10 05-03 20:04:52.379 6302-6302/? E/concat=====: A 05-03 20:04:52.379 6302-6302/? E/concat=====: B 05-03 20:04:52.379 6302-6302/? E/concat=====: C
- merge() / mergeArray()
- 作用:合并多个被观察者(并行发送)
- 二者联系:merge使用时最多只能发送4个被观察者对象,mergeArray可发送大于4个
- 使用场景:并行发送多个被观察者发送事件
- 使用实例:
Observable.mergeArray( Observable.intervalRange(2,3,1000,1000, TimeUnit.MILLISECONDS), Observable.intervalRange(5,3,1000,1000, TimeUnit.MILLISECONDS), Observable.intervalRange(8,3,1000,1000, TimeUnit.MILLISECONDS) ).subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { Log.e("concat=====", s+""); } });输出结果:
05-03 20:14:55.558 6948-6974/? E/concat=====: 2 05-03 20:14:55.560 6948-6976/? E/concat=====: 8 05-03 20:14:55.562 6948-6975/? E/concat=====: 5 05-03 20:14:56.558 6948-6974/? E/concat=====: 3 05-03 20:14:56.559 6948-6975/? E/concat=====: 6 05-03 20:14:56.560 6948-6976/? E/concat=====: 9 05-03 20:14:57.558 6948-6974/? E/concat=====: 4 05-03 20:14:57.559 6948-6975/? E/concat=====: 7 05-03 20:14:57.560 6948-6976/? E/concat=====: 10上面输出的顺序为 2,5,8 ——3,6,9——4,7,10
-
concatDelayError() / mergeDelayError()
Observable.mergeArray( Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(ObservableEmitter<Long> e) throws Exception { e.onNext(1L); e.onNext(2L); e.onNext(3L); e.onError(new Exception("Error")); e.onComplete(); } }), Observable.just(4L,5L,6L) ).subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { Log.e("concat=====", s + ""); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.e("throwable=====", throwable.getMessage()); } });输出结果:
05-03 20:52:22.003 7661-7661/? E/concat=====: 1 05-03 20:52:22.003 7661-7661/? E/concat=====: 2 05-03 20:52:22.003 7661-7661/? E/concat=====: 3 05-03 20:52:22.005 7661-7661/? E/throwable=====: Error由此可见当其中一个发送异常时,后面的被观察者将不会再发送数据,那么我们如果想让异常延后,是所有的时间都发送完成后在发送异常的话,这时就要用到concatDelayError() / mergeDelayError()方法,将上述代码中使用的mergeArray变换为对应的
mergeDelayError(),运行程序输出结果为:
05-03 20:56:37.450 7954-7954/? E/concat=====: 1 05-03 20:56:37.450 7954-7954/? E/concat=====: 2 05-03 20:56:37.450 7954-7954/? E/concat=====: 3 05-03 20:56:37.451 7954-7954/? E/concat=====: 4 05-03 20:56:37.451 7954-7954/? E/concat=====: 5 05-03 20:56:37.451 7954-7954/? E/concat=====: 6 05-03 20:56:37.451 7954-7954/? E/throwable=====: Error在1——6输出完成后才输出异常信息。
- zip()
- 作用:对多个被观察进行某个操作后生成一个新的被观察者序列,然后发送数据,(一一对应操作)
- 原理:对每个观察者之间的数据按照发送的顺序进行合并,最后生成新的被观察者对象进行发送
- 使用场景——需要对个被观察者队形中的数据进行组合
- 使用实例:
- 创建两个发送数据 的被观察者对象
Observable observable = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(ObservableEmitter<Long> e) throws Exception { e.onNext(1L); Log.e("observable=====","发送"+1); e.onNext(2L); Log.e("observable=====","发送"+2); e.onNext(3L); Log.e("observable=====","发送"+3); e.onComplete(); } }); Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("=A"); Log.e("observable2=====","发送 =A"); e.onNext("=B"); Log.e("observable2=====","发送 =B"); e.onNext("=C"); Log.e("observable2=====","发送 =C"); e.onComplete(); } });
- 使用Zip()操作符合并发送数据:
Observable.zip(observable, observable2, new BiFunction<Long, String, String>() { @Override public String apply(Long aLong, String s) throws Exception { return aLong +"="+s; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("zip=====",s); } });
输出结果:
05-03 21:09:14.206 8411-8411/? E/observable=====: 发送1 05-03 21:09:14.206 8411-8411/? E/observable=====: 发送2 05-03 21:09:14.206 8411-8411/? E/observable=====: 发送3 05-03 21:09:14.207 8411-8411/? E/zip=====: 1==A 05-03 21:09:14.207 8411-8411/? E/observable2=====: 发送 =A 05-03 21:09:14.207 8411-8411/? E/zip=====: 2==B 05-03 21:09:14.208 8411-8411/? E/observable2=====: 发送 =B 05-03 21:09:14.208 8411-8411/? E/zip=====: 3==C 05-03 21:09:14.208 8411-8411/? E/observable2=====: 发送 =C
第一个Observable发送的数据为1,2,3,第二个发送的为A,B,C ,组合后生成成对的数据 1==A、2==B、3==C
注意:
- 事件组合方式 = 严格按照原先事件序列 进行对位合并
- 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
- 没有配对的数据仍然会发送,只是没有组合输出
我们将上面的1,2,3换成1,2,3,4,看看发送的数据是否会改变
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送1
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送2
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送3
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送4
05-03 21:17:42.747 8860-8860/? E/zip=====: 1==A
05-03 21:17:42.747 8860-8860/? E/observable2=====: 发送 =A
05-03 21:17:42.748 8860-8860/? E/zip=====: 2==B
05-03 21:17:42.748 8860-8860/? E/observable2=====: 发送 =B
05-03 21:17:42.749 8860-8860/? E/zip=====: 3==C
05-03 21:17:42.749 8860-8860/? E/observable2=====: 发送 =C
上述的4虽然没有与之相应的字母配对,但数据仍是会发送,只是没有被打印
-
combineLatest()
1、作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
2、原理:第一个Observables 最后发送的数据将被保存下来,与后面Observables 发送的数据进行组合
3、与zip的比较:zip是针对发送事件进行1对1组合,而combineLatest是将最后的数据和之后的事件组合,从发送的顺序和时间上组合
4、使用实例:
Observable observable = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(ObservableEmitter<Long> e) throws Exception { e.onNext(1L); Log.e("observable=====", "发送" + 1); e.onNext(2L); Log.e("observable=====", "发送" + 2); e.onNext(3L); Log.e("observable=====", "发送" + 3); e.onNext(4L); Log.e("observable=====", "发送" + 4); e.onComplete(); } }); Observable.combineLatest(observable, Observable.just(6, 7, 8) , new BiFunction<Long, Integer, Long>() { @Override public Long apply(Long aLong, Integer integer) throws Exception { return aLong + integer; } }).subscribe(new Consumer<Long>() { @Override public void accept(Long o) throws Exception { Log.e("combineLatest=====", o + ""); } });
输出结果:
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送1 05-03 21:30:32.338 9218-9218/? E/observable=====: 发送2 05-03 21:30:32.338 9218-9218/? E/observable=====: 发送3 05-03 21:30:32.338 9218-9218/? E/observable=====: 发送4 05-03 21:30:32.338 9218-9218/? E/combineLatest=====: 10 05-03 21:30:32.339 9218-9218/? E/combineLatest=====: 11 05-03 21:30:32.339 9218-9218/? E/combineLatest=====: 12
在第一个Observable发送的最后数字为4,然后4依次和后面发送的6,7,8进行相加,最终输出。
-
combineLatestDelayError()
作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述
- reduce()
- 作用:将要发送的一组数据按照相应的规则聚合为一个数据,一起发送
- 原理:将前两个数据组合后依次和后面的数据进行组合
- 使用场景——需要对数据进行整合
- 使用实例:数据的累加
Observable.just(1,2,3,4) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e("reduce=====", integer+""); } });
输出结果:
05-03 21:39:05.823 9565-9565/? E/reduce=====: 10
-
collect
- 作用:将发送的数据装换为一个集合,最后发送
- 使用示例:
Observable.just(1, 2, 3 ,4, 5, 6) .collect( // 1. 创建数据结构(容器),用于收集被观察者发送的数据 new Callable<ArrayList<Integer>>() { @Override public ArrayList<Integer> call() throws Exception { return new ArrayList<>(); } // 2. 对发送的数据进行收集 }, new BiConsumer<ArrayList<Integer>, Integer>() { @Override public void accept(ArrayList<Integer> list, Integer integer) throws Exception { // 参数说明:list = 容器,integer = 后者数据 list.add(integer); // 对发送的数据进行收集 } }).subscribe(new Consumer<ArrayList<Integer>>() { @Override public void accept(@NonNull ArrayList<Integer> s) throws Exception { Log.e(TAG, "本次发送的数据是: "+s); } });
- startWith /starWithArray()
- 作用:在发送之前追加发送事件
- 二者比较:startWith 追加单个数据 或 被观察者对象,starWithArray 追加多个数据
- 使用示例:
Observable.just(4, 5, 6) .startWith(0) // 追加单个数据 = startWith() .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray() .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });
注意:输出的顺序是按照 后调用先追加 ,按照追加的顺序 倒序输出,最后才输出原始发送的事件,例如上面会先发送
startWithArray(1, 2, 3) 的数据,然后是.startWith(0) 最后才是Observable发送的4,5,6.
要实现追加观察者对象,只需把上面发送的数据换成 被观察者对象,
Observable.just(4, 5, 6) .startWith(Observable.just(1, 2, 3))
- count()
- 作用:获取发送事件的个数
- 使用场景:需要统计发送事件的数量
- 使用示例:
Observable.just(1, 2, 3, 4) .count() .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.e(TAG, "发送的事件数量 = "+aLong); } });本篇的介绍就到这里,本篇介绍的操作符比较多,但组合使用可以解决我们实际开发的很多问题,关于本篇操作符的使用将在下一篇单独介绍。