RxJava系列5:RxJava操作符-过滤、组合、功能、布尔操作符

时间:2021-08-02 17:50:35

过滤、组合、功能、布尔操作符

根据指定条件过滤事件

通过设置指定的过滤条件,当且仅当该事件满足条件,就将该事件过滤(不发送)

filter

过滤特定条件的事件

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
            }
        }).filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 3;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d(":..nht", integer + "");
            }
        });

12-20 22:30:00.137 16244-16244/com.sankuai.moviepro D/:..nht: 4
12-20 22:30:00.137 16244-16244/com.sankuai.moviepro D/:..nht: 5

解释:
过滤掉小于等于3的事件

oftype

过滤 特定数据类型的数据

Observable.just(1, "carson", 3, "ho", 5)
                .ofType(Integer.class)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(":..nht", integer + "");
                    }
                });

12-20 22:33:15.354 17503-17503/? D/:..nht: 1
12-20 22:33:15.355 17503-17503/? D/:..nht: 3
12-20 22:33:15.355 17503-17503/? D/:..nht: 5

skip/skiplast

跳过某个事件

// 使用1:根据顺序跳过数据项
        Observable.just(1, 2, 3, 4, 5)
                .skip(1) // 跳过正序的前1项
                .skipLast(2) // 跳过正序的后2项
                  .subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept( Integer integer) throws Exception {
                          Log.d(TAG,"获取到的整型事件元素是: "+ integer);
                      }
        });

// 使用2:根据时间跳过数据项
        // 发送事件特点:发送数据0-5,每隔1s发送一次,每次递增1;第1次发送延迟0s
        Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
                .skip(1, TimeUnit.SECONDS) // 跳过第1s发送的数据
                .skipLast(1, TimeUnit.SECONDS) // 跳过最后1s发送的数据
                .subscribe(new Consumer<Long>() {

                    @Override
                    public void accept( Long along ) throws Exception {
                        Log.d(TAG,"获取到的整型事件元素是: "+ along);
                    }
                });

distinct/distinctUntilChanged

过滤事件序列中重复的事件 / 连续重复的事件

// 使用1:过滤事件序列中重复的事件
        Observable.just(1, 2, 3, 1 , 2 )
                .distinct()
                .subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept( Integer integer) throws Exception {
                          Log.d(TAG,"不重复的整型事件元素是: "+ integer);
                      }
        });

        // 使用2:过滤事件序列中 连续重复的事件
        // 下面序列中,连续重复的事件 = 3、4
        Observable.just(1,2,3,1,2,3,3,4,4 )
                .distinctUntilChanged()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept( Integer integer) throws Exception {
                        Log.d(TAG,"不连续重复的整型事件元素是: "+ integer);
                    }
                });

根据指定事件数量过滤事件

通过设置指定的事件数量,仅发送特定数量的事件

take

指定观察者最多能接收到的事件数量

takeLast

指定观察者只能接收到被观察者发送的最后几个事件

根据指定时间过滤事件

通过设置指定的时间,仅发送在该时间内的事件

throttleFirst()

在某段时间内,只发送该段时间内第1次事件 / 最后1次事件
如,1段时间内连续点击按钮,但只执行第1次的点击操作

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber)  {
                try {
                    //1000milliseconds = 1s
                    subscriber.onNext(1);

                    Thread.sleep(500);

                    subscriber.onNext(2);

                    Thread.sleep(400);

                    subscriber.onNext(3);

                    Thread.sleep(300);

                    subscriber.onNext(4);


                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).throttleFirst(1, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(":..nht", integer + "");
                    }
                });

12-21 18:03:08.443 17948-17948/com.sankuai.moviepro D/:..nht: 1
12-21 18:03:09.643 17948-17948/com.sankuai.moviepro D/:..nht: 4

throttleLast()

在某段时间内,只发送该段时间内最后1次事件

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber)  {
                try {
                    //1000milliseconds = 1s
                    subscriber.onNext(1);

                    Thread.sleep(500);

                    subscriber.onNext(2);

                    Thread.sleep(400);

                    subscriber.onNext(3);

                    Thread.sleep(300);

                    subscriber.onNext(4);


                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }//每一秒采用的数据
        }).throttleLast(1, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(":..nht", integer + "");
                    }
                });

12-21 18:05:51.848 18838-19161/com.sankuai.moviepro D/:..nht: 3
12-21 18:05:52.847 18838-19161/com.sankuai.moviepro D/:..nht: 4

sample

在某段时间内,只发送该段时间内最新(最后)1次事件,与 throttleLast() 操作符类似

throttleWithTimeout () / debounce()

作用
发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据

根据指定事件位置过滤事件

需求场景
通过设置指定的位置,过滤在该位置的事件

firstElement/lastElement

作用
仅选取第1个元素 / 最后一个元素

elementAt

作用
指定接收某个元素(通过 索引值 确定)
注:允许越界,即获取的位置索引 > 发送事件序列长度

elementAtOrError

作用
在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常

参考资料

Android RxJava:过滤操作符 全面讲解
https://www.jianshu.com/p/c3a930a03855

RxJava:功能性操作符

Android RxJava:功能性操作符 全面讲解
https://www.jianshu.com/p/b0c3669affdb

错误处理

发送事件过程中,遇到错误时的处理机制

onErrorReturn

作用
遇到错误时,发送1个特殊事件 & 正常终止(所有不会走调用error事件了)
可捕获在它之前发生的异常

rx.Observable.create(new rx.Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onError(new Throwable("发生错误了"));
            }
        }).onErrorReturn(new Func1<Throwable, Integer>() {
            @Override
            public Integer call(Throwable throwable) {
                // 捕捉错误异常
                Log.e(":nht...", "在onErrorReturn处理了错误: "+throwable.toString() );
                // 发生错误事件后,发送一个"666"事件,最终正常结束
                return 666;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Log.d(":nht...", "对Error事件作出响应");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(":nht...", "接收到了事件: "+integer );
            }
        });

12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 接收到了事件: 1
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 接收到了事件: 2
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 接收到了事件: 3
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 在onErrorReturn处理了错误: java.lang.Throwable: 发生错误了
12-25 07:01:20.977 1259-1259/com.sankuai.moviepro E/:nht...: 接收到了事件: 666

onErrorResumeNext

作用
遇到错误时,发送1个新的Observable

注:
onErrorResumeNext()拦截的错误 = Throwable;若需拦截Exception请用onExceptionResumeNext()
若onErrorResumeNext()拦截的错误 = Exception,则会将错误传递给观察者的onError方法
这里就看你onError方法里传递的是throwable还是exception了。

        rx.Observable.create(new rx.Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new Throwable("发生错误了"));
            }
        }).onErrorResumeNext(new Func1<Throwable, rx.Observable<? extends Integer>>() {
            @Override
            public rx.Observable<? extends Integer> call(Throwable throwable) {
                Log.e(":nht...", "在onErrorResumeNext处理了错误: " + throwable.toString());
                return rx.Observable.just(4, 5, 5);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(":nht...", "对onCompleted事件作出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(":nht...", "对Error事件作出响应");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(":nht...", "接收到事件:" + integer);
            }
        });

12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:1
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:2
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro E/:nht...: 在onErrorResumeNext处理了错误: java.lang.Throwable: 发生错误了
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:4
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:5
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 接收到事件:5
12-25 07:17:03.953 1909-1909/com.sankuai.moviepro D/:nht...: 对onCompleted事件作出响应

onExceptionResumeNext

作用
遇到错误时,发送1个新的Observable

注:
onExceptionResumeNext()拦截的错误 = Exception;若需拦截Throwable请用onErrorResumeNext()
若onExceptionResumeNext()拦截的错误 = Throwable,则会将错误传递给观察者的onError方法

retry

作用
重试,即当出现错误时,让被观察者(Observable)重新发射数据
接收到 onError()时,重新订阅 & 发送事件
Throwable 和 Exception都可拦截

<-- 1. retry() --> // 作用:出现错误时,让被观察者重新发送数据 // 注:若一直错误,则一直重新发送

<-- 2. retry(long time) --> // 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制 // 参数 = 重试次数

<-- 3. retry(Predicate predicate) --> // 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试) // 参数 = 判断逻辑

<-- 4. retry(new BiPredicate<Integer, Throwable>) --> // 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试 // 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)

<-- 5. retry(long time,Predicate predicate) --> // 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制 // 参数 = 设置重试次数 & 判断逻辑

retryUntil

作用
出现错误后,判断是否需要重新发送数据
若需要重新发送 & 持续遇到错误,则持续重试
作用类似于retry(Predicate predicate)

具体使用
具体使用类似于retry(Predicate predicate),唯一区别:返回 true 则不重新发送数据事件。此处不作过多描述

retryWhen

作用
遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件
具体使用

rx.Observable.create(new rx.Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new Exception("发生错误了"));
                subscriber.onNext(3);
            }
        }).retryWhen(new Func1<rx.Observable<? extends Throwable>, rx.Observable<?>>() {
            @Override
            public rx.Observable<?> call(rx.Observable<? extends Throwable> observable) {
                return observable.flatMap(new Func1<Throwable, rx.Observable<?>>() {
                    @Override
                    public rx.Observable<?> call(Throwable throwable) {
                        //会触发之前Observable的重新订阅,但是自己的这个事件是不会被接受者接受的
                        //仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() /  Error()事件
                        return rx.Observable.just(10);
                    }
                });
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(":nht...", "对onCompleted事件作出响应");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(":nht...", "对onError事件作出响应");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(":nht...", "接受到事件" + integer + "");
            }
        });

12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件2
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件1
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件2
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件1
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件2
12-25 08:31:25.981 3085-3085/com.sankuai.moviepro D/:nht...: 接受到事件1

与repeatWhen比较类似

RxJava:组合操作符

Android RxJava:组合 / 合并操作符 详细教程
https://www.jianshu.com/p/c2a7c03da16d

RxJava:条件操作符

Android RxJava:详解 条件 / 布尔操作符
https://www.jianshu.com/p/954426f90325