Android RxJava(三)组合操作符

时间:2022-05-16 17:51:44

Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的组合操作符。

merge()

原理图:
Android RxJava(三)组合操作符
方法:

public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)

作用:
将两个Observable发射的事件序列组合并成一个时间序列,就想是一个Observable发射的一样,合并后数据是无序的。
代码:

        //这里的ob1和ob2将在下述所有示例中使用,将不再特殊描述
        final String[] str = new String[]{"a","b","c"};
        final int[] ints = new int[]{1,2,3,4,5};
        Observable ob1 = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Function<Long,String>() {

            @Override
            public String apply(Long aLong) throws Exception {
                return str[aLong.intValue()];
            }
        }).take(str.length);
        Observable ob2 = Observable.interval(300,TimeUnit.MILLISECONDS)
                .map(new Function<Long,Integer>() {

                    @Override
                    public Integer apply(Long aLong) throws Exception {
                        return ints[aLong.intValue()];
                    }
                }).take(ints.length);
       Observable.merge(ob1,ob2).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

上述代码中第一个Observable作用是每500毫秒从字符串数组中取一个元素,第二个Observable的作用是每300毫秒从整型数组中取一个元素,打印结果为:

06-06 22:48:53.410 17668-17707/ E/—: 1
06-06 22:48:53.610 17668-17706/ E/—: a
06-06 22:48:53.710 17668-17707/ E/—: 2
06-06 22:48:54.010 17668-17707/ E/—: 3
06-06 22:48:54.110 17668-17706/ E/—: b
06-06 22:48:54.310 17668-17707/ E/—: 4
06-06 22:48:54.610 17668-17706/ E/—: c
06-06 22:48:54.610 17668-17707/ E/—: 5

mergeArray()

原理图:
Android RxJava(三)组合操作符
方法:

public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources) 

作用:
作用和merge类似,只不过是组合多个Observable

concat()

原理图:
Android RxJava(三)组合操作符
方法:

public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
...
 public static <T> Observable<T> concat(
            ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
            ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)

作用:
功能和merge类似,也是用于将多个Observable合并,最多支持4个,但是concat是有序的,也就是说前一个Observable没发射完是不会发射后一个Observable的数据的。
代码:
将上述代码里面的merge直接换成concat

       Observable.concat(ob1,ob2).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

代码执行结果如下:

06-06 23:01:16.189 27785-27822/ E/---: a
06-06 23:01:16.689 27785-27822/ E/---: b
06-06 23:01:17.189 27785-27822/ E/---: c
06-06 23:01:17.490 27785-27872/ E/---: 1
06-06 23:01:17.790 27785-27872/ E/---: 2
06-06 23:01:18.090 27785-27872/ E/---: 3
06-06 23:01:18.390 27785-27872/ E/---: 4
06-06 23:01:18.690 27785-27872/ E/---: 5

concatArray()

方法:

 public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)

作用:
作用同concat类似,将多个Observable进行数据合并

mergeArrayDelayError() & concatArrayDelayError()

方法:

public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

作用:
在mergeArray()和concatArray()两个方法中,如果其中一个Observable发送了一个Error事件,那么就会停止发送事件,如果想onError()事件延迟到所有Observable都发送完事件后再执行,就可以使用mergeArrayDelayError()和concatArrayDelayError()
代码:
下面通过代码测试下如果中途发送onError,Observable是否会中断发送

       Observable.mergeArray(ob1,Observable.create(new ObservableOnSubscribe() {
           @Override
           public void subscribe(ObservableEmitter e) throws Exception {
               e.onNext(1);
               e.onError(new NumberFormatException());
           }
       })).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","--onError");
            }

            @Override
            public void onComplete() {
            }
        });

上述执行结果为如下:

06-06 23:18:58.930 8575-8575/ E/---: 1
06-06 23:18:58.930 8575-8575/ E/---: --onError

可以发现使用mergeArray()时如果中途发送onError()会中断数据的发送,下面将mergeArray改成mergeArrayDelayError

       Observable.mergeArrayDelayError(ob1,Observable.create(new ObservableOnSubscribe() {
           @Override
           public void subscribe(ObservableEmitter e) throws Exception {
               e.onNext(1);
               e.onError(new NumberFormatException());
           }
       })).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
            }
        });

更改之后的执行结果如下:

06-06 23:14:18.191 5668-5668/ E/---: 1
06-06 23:14:18.691 5668-5700/ E/---: a
06-06 23:14:19.191 5668-5700/ E/---: b
06-06 23:14:19.691 5668-5700/ E/---: c
06-06 23:14:19.691 5668-5700/ E/---: --onError

startWith() & startWithArray()

原理图:
Android RxJava(三)组合操作符
方法:

public final Observable<T> startWith(ObservableSource<? extends T> other)
public final Observable<T> startWithArray(T... items)

作用:
用于在源Observable发射的数据前插入另一个Observable发射的数据
代码:

       ob1.startWith(ob2).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

代码执行结果如下:

06-06 23:07:16.014 32379-32405/ E/---: 1
06-06 23:07:16.314 32379-32405/ E/---: 2
06-06 23:07:16.614 32379-32405/ E/---: 3
06-06 23:07:16.913 32379-32405/ E/---: 4
06-06 23:07:17.214 32379-32405/ E/---: 5
06-06 23:07:17.716 32379-32444/ E/---: a
06-06 23:07:18.215 32379-32444/ E/---: b
06-06 23:07:18.714 32379-32444/ E/---: c

zip()

原理图:
Android RxJava(三)组合操作符
方法:

public static <T1, T2, R> Observable<R> zip(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
            BiFunction<? super T1, ? super T2, ? extends R> zipper)

作用:
用来合并两个Observable发射的事件,根据BiFunction函数生成一个新的值发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发送数据。也就是说正常的情况下数据长度会与两个Observable中最少事件的数量一样。
代码:
简单的将两个Observable的数据进行拼接

       Observable.zip(ob1, ob2, new BiFunction<String,Integer,String>() {

           @Override
           public String apply(String s, Integer integer) throws Exception {
               return s+integer;
           }
       }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
                Log.e("---", String.valueOf(value));
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","--onError");
            }

            @Override
            public void onComplete() {
            }
        });

运行结果如下:

06-06 23:29:56.547 14888-14926/ E/---: a1
06-06 23:29:57.049 14888-14926/ E/---: b2
06-06 23:29:57.547 14888-14926/ E/---: c3

combineLatest() & combineLatestDelayError()

原理图:
Android RxJava(三)组合操作符
可能上面这张图不是太好理解,可以看下面这张图
Android RxJava(三)组合操作符
方法:

public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)

作用:
用于将两个Observable最近发射的数据经BiFunction函数的规则进行组合,combineLatest()发送事件的序列是与发送的时间线有关的。拿上图解释当发送A之后会从上一个Observ拿最近发送的1进行组合生成‘1A’,当发送2时拿第二个Observable最近发送的数据B组合成‘2B’,接下来到事件C时还是取第一个Observable最近发送的时间2进行组合成‘2C’,以此类推。
代码:

       Observable.combineLatest(ob1, ob2, new BiFunction<String,Integer,String>() {

           @Override
           public String apply(String s, Integer integer) throws Exception {
               return s + integer;
           }
       }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object value) {
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","--onError");
            }

            @Override
            public void onComplete() {
            }
        });

执行结果如下:

06-06 23:39:58.087 22418-22447/ E/---: a1
06-06 23:39:58.190 22418-22448/ E/---: a2
06-06 23:39:58.488 22418-22448/ E/---: a3
06-06 23:39:58.589 22418-22447/ E/---: b3
06-06 23:39:58.788 22418-22448/ E/---: b4
06-06 23:39:59.088 22418-22447/ E/---: c4
06-06 23:39:59.088 22418-22448/ E/---: c5

reduce()

方法:

public final Maybe<T> reduce(BiFunction<T, T, T> reducer)

作用:
与scan()操作符类似,作用是将数据以一定的逻辑聚合起来,这两个的区别在于scan()没处理一次数据将会发送一个事件给观察者,但是reduce()会将所有数据聚合在一起之后才会发送给观察者,还有一点区别就是scan的返回值是Observable,而reduce的返回值是Maybe
代码:

        Observable.just(1,2,3,4,5).reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("---",integer+"");
            }
        });

上述代码的作用就是对数组数据进行相加处理,最终输出数据为15

count()

方法:

public final Single<Long> count() 

作用:
统计要发送事件的总数
代码:

        Observable.just(1,1,2,2).count().subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e("---",aLong+"");
            }
        });

运行结果为:

E/---: 4

collect()

方法:

public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)

作用:
收集数据到一个可变的数据结构中
代码:

        Observable.just("1","2","3","2")
                .collect(new Callable<List<Integer>>() { //创建数据结构
                    @Override
                    public List<Integer> call() throws Exception {
                        return new ArrayList<Integer>();
                    }
                }, new BiConsumer<List<Integer>, String>() {//收集器
                    @Override
                    public void accept(List<Integer> integers, String s) throws Exception {
                        integers.add(Integer.valueOf(s));
                    }
                }).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.e("---",integers+"");
            }
        });

打印结果为:

E/---: [1, 2, 3, 2]