RxJava操作符(四)——组合操作符

时间:2021-05-23 17:48:09

 1、简介:

 之前几篇讲解的操作符多是单个被观察者对象发送事件,本篇来介绍下组合操作符的使用,组合操作符的作用:

组合 多个被观察者(Observable) & 合并需要发送的事件

 2、类型:

RxJava操作符(四)——组合操作符

3、操作符介绍

  • concat() / concatArray()
  1. 作用:合并多个被观察者 ,发送的顺序与产生的顺序相同(串行发送)
  2. 二者联系:concat 使用时最多只能发送4个被观察者对象,concatArray可发送大于4个
  3. 使用场景:多个被观察者发送事件
  4. 使用实例:
Observable.concatArray(
                Observable.just("1"),
                Observable.just("2", "3"),
                Observable.just("4", "5", "6"),
                Observable.just("7", "8", "9","10"),
                Observable.just("A", "B", "C")

        ).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("concat=====", s);
            }
        });

输出的结果

05-03 20:04:52.379 6302-6302/? E/concat=====: 1
05-03 20:04:52.379 6302-6302/? E/concat=====: 2
05-03 20:04:52.379 6302-6302/? E/concat=====: 3
05-03 20:04:52.379 6302-6302/? E/concat=====: 4
05-03 20:04:52.379 6302-6302/? E/concat=====: 5
05-03 20:04:52.379 6302-6302/? E/concat=====: 6
05-03 20:04:52.379 6302-6302/? E/concat=====: 7
05-03 20:04:52.379 6302-6302/? E/concat=====: 8
05-03 20:04:52.379 6302-6302/? E/concat=====: 9
05-03 20:04:52.379 6302-6302/? E/concat=====: 10
05-03 20:04:52.379 6302-6302/? E/concat=====: A
05-03 20:04:52.379 6302-6302/? E/concat=====: B
05-03 20:04:52.379 6302-6302/? E/concat=====: C
  • merge() / mergeArray()
  1. 作用:合并多个被观察者(并行发送)
  2. 二者联系:merge使用时最多只能发送4个被观察者对象,mergeArray可发送大于4个
  3. 使用场景:并行发送多个被观察者发送事件
  4. 使用实例:
Observable.mergeArray(
                Observable.intervalRange(2,3,1000,1000, TimeUnit.MILLISECONDS),
                Observable.intervalRange(5,3,1000,1000, TimeUnit.MILLISECONDS),
                Observable.intervalRange(8,3,1000,1000, TimeUnit.MILLISECONDS)
        ).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long s) throws Exception {
                Log.e("concat=====", s+"");
            }
        });
输出结果:
05-03 20:14:55.558 6948-6974/? E/concat=====: 2
05-03 20:14:55.560 6948-6976/? E/concat=====: 8
05-03 20:14:55.562 6948-6975/? E/concat=====: 5
05-03 20:14:56.558 6948-6974/? E/concat=====: 3
05-03 20:14:56.559 6948-6975/? E/concat=====: 6
05-03 20:14:56.560 6948-6976/? E/concat=====: 9
05-03 20:14:57.558 6948-6974/? E/concat=====: 4
05-03 20:14:57.559 6948-6975/? E/concat=====: 7
05-03 20:14:57.560 6948-6976/? E/concat=====: 10
上面输出的顺序为 2,5,8 ——3,6,9——4,7,10 
  • concatDelayError() / mergeDelayError()
 我们来看一种情况,在使用  concat() / concatArray()、   merge() / mergeArray()时,当其中一个 出现Exception时后面的Observale如何处理呢?下面我们尝试发送异常:
 
Observable.mergeArray(
                Observable.create(new ObservableOnSubscribe<Long>() {
                    @Override
                    public void subscribe(ObservableEmitter<Long> e) throws Exception {
                        e.onNext(1L);
                        e.onNext(2L);
                        e.onNext(3L);
                        e.onError(new Exception("Error"));
                        e.onComplete();
                    }
                }),
                Observable.just(4L,5L,6L)
        ).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long s) throws Exception {
                Log.e("concat=====", s + "");
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.e("throwable=====", throwable.getMessage());
            }
        });
输出结果:
05-03 20:52:22.003 7661-7661/? E/concat=====: 1
05-03 20:52:22.003 7661-7661/? E/concat=====: 2
05-03 20:52:22.003 7661-7661/? E/concat=====: 3
05-03 20:52:22.005 7661-7661/? E/throwable=====: Error
由此可见当其中一个发送异常时,后面的被观察者将不会再发送数据,那么我们如果想让异常延后,是所有的时间都发送完成后在发送异常的话,这时就要用到concatDelayError() / mergeDelayError()方法,将上述代码中使用的mergeArray变换为对应的
mergeDelayError(),运行程序输出结果为:
05-03 20:56:37.450 7954-7954/? E/concat=====: 1
05-03 20:56:37.450 7954-7954/? E/concat=====: 2
05-03 20:56:37.450 7954-7954/? E/concat=====: 3
05-03 20:56:37.451 7954-7954/? E/concat=====: 4
05-03 20:56:37.451 7954-7954/? E/concat=====: 5
05-03 20:56:37.451 7954-7954/? E/concat=====: 6
05-03 20:56:37.451 7954-7954/? E/throwable=====: Error
在1——6输出完成后才输出异常信息。
  • zip() 

  1. 作用:对多个被观察进行某个操作后生成一个新的被观察者序列,然后发送数据,(一一对应操作)
  2. 原理:对每个观察者之间的数据按照发送的顺序进行合并,最后生成新的被观察者对象进行发送
  3. 使用场景——需要对个被观察者队形中的数据进行组合
  4. 使用实例:
  • 创建两个发送数据 的被观察者对象                                                                   
  Observable observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> e) throws Exception {
                e.onNext(1L);
                Log.e("observable=====","发送"+1);
                e.onNext(2L);
                Log.e("observable=====","发送"+2);
                e.onNext(3L);
                Log.e("observable=====","发送"+3);
                e.onComplete();
            }
        });
        Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("=A");
                Log.e("observable2=====","发送 =A");
                e.onNext("=B");
                Log.e("observable2=====","发送 =B");
                e.onNext("=C");
                Log.e("observable2=====","发送 =C");
                e.onComplete();
            }
        });
  • 使用Zip()操作符合并发送数据:
Observable.zip(observable, observable2, new BiFunction<Long, String, String>() {
            @Override
            public String apply(Long aLong, String s) throws Exception {
                return aLong +"="+s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("zip=====",s);
            }
        });

输出结果:

05-03 21:09:14.206 8411-8411/? E/observable=====: 发送1
05-03 21:09:14.206 8411-8411/? E/observable=====: 发送2
05-03 21:09:14.206 8411-8411/? E/observable=====: 发送3
05-03 21:09:14.207 8411-8411/? E/zip=====: 1==A
05-03 21:09:14.207 8411-8411/? E/observable2=====: 发送 =A
05-03 21:09:14.207 8411-8411/? E/zip=====: 2==B
05-03 21:09:14.208 8411-8411/? E/observable2=====: 发送 =B
05-03 21:09:14.208 8411-8411/? E/zip=====: 3==C
05-03 21:09:14.208 8411-8411/? E/observable2=====: 发送 =C

第一个Observable发送的数据为1,2,3,第二个发送的为A,B,C ,组合后生成成对的数据 1==A、2==B、3==C

注意:

  • 事件组合方式 = 严格按照原先事件序列 进行对位合并
  • 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
  • 没有配对的数据仍然会发送,只是没有组合输出

我们将上面的1,2,3换成1,2,3,4,看看发送的数据是否会改变

05-03 21:17:42.746 8860-8860/? E/observable=====: 发送1
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送2
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送3
05-03 21:17:42.746 8860-8860/? E/observable=====: 发送4
05-03 21:17:42.747 8860-8860/? E/zip=====: 1==A
05-03 21:17:42.747 8860-8860/? E/observable2=====: 发送 =A
05-03 21:17:42.748 8860-8860/? E/zip=====: 2==B
05-03 21:17:42.748 8860-8860/? E/observable2=====: 发送 =B
05-03 21:17:42.749 8860-8860/? E/zip=====: 3==C
05-03 21:17:42.749 8860-8860/? E/observable2=====: 发送 =C

上述的4虽然没有与之相应的字母配对,但数据仍是会发送,只是没有被打印

  • combineLatest()

1、作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据

2、原理:第一个Observables 最后发送的数据将被保存下来,与后面Observables 发送的数据进行组合

3、与zip的比较:zip是针对发送事件进行1对1组合,而combineLatest是将最后的数据和之后的事件组合,从发送的顺序和时间上组合

4、使用实例:

 Observable observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> e) throws Exception {
                e.onNext(1L);
                Log.e("observable=====", "发送" + 1);
                e.onNext(2L);
                Log.e("observable=====", "发送" + 2);
                e.onNext(3L);
                Log.e("observable=====", "发送" + 3);
                e.onNext(4L);
                Log.e("observable=====", "发送" + 4);
                e.onComplete();
            }
        });

        Observable.combineLatest(observable,
                Observable.just(6, 7, 8)
                , new BiFunction<Long, Integer, Long>() {
                    @Override
                    public Long apply(Long aLong, Integer integer) throws Exception {
                        return aLong + integer;
                    }
                }).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long o) throws Exception {
                Log.e("combineLatest=====", o + "");
            }
        });

输出结果:

05-03 21:30:32.338 9218-9218/? E/observable=====: 发送1
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送2
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送3
05-03 21:30:32.338 9218-9218/? E/observable=====: 发送4
05-03 21:30:32.338 9218-9218/? E/combineLatest=====: 10
05-03 21:30:32.339 9218-9218/? E/combineLatest=====: 11
05-03 21:30:32.339 9218-9218/? E/combineLatest=====: 12

在第一个Observable发送的最后数字为4,然后4依次和后面发送的6,7,8进行相加,最终输出。

  • combineLatestDelayError()
    作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述
  • reduce()
  1. 作用:将要发送的一组数据按照相应的规则聚合为一个数据,一起发送
  2. 原理:将前两个数据组合后依次和后面的数据进行组合
  3. 使用场景——需要对数据进行整合
  4. 使用实例:数据的累加
Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("reduce=====", integer+"");
            }
        });

输出结果:

05-03 21:39:05.823 9565-9565/? E/reduce=====: 10
  • collect

  1. 作用:将发送的数据装换为一个集合,最后发送
  2. 使用示例:
Observable.just(1, 2, 3 ,4, 5, 6)
                .collect(
                        // 1. 创建数据结构(容器),用于收集被观察者发送的数据
                        new Callable<ArrayList<Integer>>() {
                            @Override
                            public ArrayList<Integer> call() throws Exception {
                                return new ArrayList<>();
                            }
                            // 2. 对发送的数据进行收集
                        }, new BiConsumer<ArrayList<Integer>, Integer>() {
                            @Override
                            public void accept(ArrayList<Integer> list, Integer integer)
                                    throws Exception {
                                // 参数说明:list = 容器,integer = 后者数据
                                list.add(integer);
                                // 对发送的数据进行收集
                            }
                        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                Log.e(TAG, "本次发送的数据是: "+s);

            }
        });
  • startWith /starWithArray()

  1. 作用:在发送之前追加发送事件
  2. 二者比较:startWith 追加单个数据 或 被观察者对象,starWithArray 追加多个数据
  3. 使用示例:
 Observable.just(4, 5, 6)
                  .startWith(0)  // 追加单个数据 = startWith()
                  .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

注意:输出的顺序是按照 后调用先追加 ,按照追加的顺序 倒序输出,最后才输出原始发送的事件,例如上面会先发送

startWithArray(1, 2, 3) 的数据,然后是.startWith(0) 最后才是Observable发送的4,5,6.

要实现追加观察者对象,只需把上面发送的数据换成 被观察者对象,

 Observable.just(4, 5, 6)
                .startWith(Observable.just(1, 2, 3))
  • count() 

  1. 作用:获取发送事件的个数
  2. 使用场景:需要统计发送事件的数量
  3. 使用示例:
        Observable.just(1, 2, 3, 4)
                  .count()
                  .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "发送的事件数量 =  "+aLong);
                    }
                });
本篇的介绍就到这里,本篇介绍的操作符比较多,但组合使用可以解决我们实际开发的很多问题,关于本篇操作符的使用将在下一篇单独介绍。