RxJava2.0功能性操作符(五)

时间:2021-08-02 17:50:29

1、subscribe()

作用:订阅,连接被观察者和观察者

//创建观察者
        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                //被观察者发射事件
                e.onNext(0);
                e.onNext(1);
                e.onError(new NullPointerException("空指针异常"));
                e.onComplete();
            }
        });
        //创建观察者
        Observer<Integer> observer=new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println("接收到的数据:"+integer);

            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("对Error事件作出响应:"+e);

            }

            @Override
            public void onComplete() {

            }
        };
        //订阅,通过订阅连接被观察者和观察者
        observable.subscribe(observer);
运行结果:

RxJava2.0功能性操作符(五)


2、线程调度

作用:指定 被观察者 和 观察者 的工作线程的类型。

观察者和被观察者默认所在的线程:

        //被观察者
        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                System.out.println("被观察者所在的线程名称:"+Thread.currentThread().getName()+",线程ID:"+Thread.currentThread().getId());
            }
        });
        //观察者
        Observer observer=new Observer() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("观察者所在的线程名称:"+Thread.currentThread().getName()+",线程ID:"+Thread.currentThread().getId());
            }

            @Override
            public void onNext(@NonNull Object o) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        //订阅
        observable.subscribe(observer);

运行结果:

RxJava2.0功能性操作符(五)

从运行的结果可以看出:默认被观察者和观察者都是在主线程工作的,但是实际的开发应用中我们需要的是:需要被观察者在子线程中做耗时操作(连接网络),而观察者需要在主线程中更新UI(呈现数据),这是就需要用到了线程调度。

在RxJava中有多种的调度线程类型:

RxJava2.0功能性操作符(五)

注意:若是被观察者指定多次所在线程,那只对第一次指定的有效。而观察者指定多次,则每一次指定的都有效,也就是说每一次都会进行线程的切换。


3、在事件生命周期中的操作

RxJava2.0功能性操作符(五)

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        })
                //每发射一个事件都会执行一次
        .doOnEach(new Consumer<Notification<Integer>>() {
            @Override
            public void accept(@NonNull Notification<Integer> integerNotification) throws Exception {
                System.out.println("doOnEach:"+"发射一次事件执行一次");
            }
        })
                //执行onNext事件之前调用
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                System.out.println("doOnNext:"+"在执行onNext事件之前调用");
            }
        })
                //执行onNext事件之后调用
        .doAfterNext(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                System.out.println("doAfterNext:"+"在执行doAfterNext之后调用");
            }
        })
                //被观察者正常发送事件完毕后调用
        .doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("doOnComplete:"+"被观察者发送事件完成之后调用");
            }
        })
                //观察者订阅时调用
        .doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(@NonNull Disposable disposable) throws Exception {
                System.out.println("doOnSubcriber:"+"观察者订阅时调用");
            }
        })
                //最后执行
        .doFinally(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("doFinally:"+"最后调用");
            }
        })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        System.out.println("接收到的事件:"+integer);
                    }
                });
RxJava2.0功能性操作符(五)


4、错误处理

总体结构图:

RxJava2.0功能性操作符(五)

onErrorReturn():

作用:遇到错误时,发送一个特殊事件  并且 正常终止

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException("空指针异常"));
            }
        })
                //捕获异常
        .onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(@NonNull Throwable throwable) throws Exception {
                //获取到错误异常
                System.out.println("捕获到异常:" + throwable.toString());

                //发生错误以后发射一个事件然后正常的终止
                return 505;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println("接收到的事件:"+integer);

            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("对Error事件作出处理:"+e);

            }

            @Override
            public void onComplete() {
                System.out.println("对onComplete作出响应");

            }
        });
运行结果:

RxJava2.0功能性操作符(五)


onErrorResumeNext():

作用:遇到错误时,发送1个新的Observable

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onError(new NullPointerException("空指针异常"));
            }
        })
                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                    @Override
                    public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {
                        //1、捕获异常
                        System.out.println("捕获到异常,进行处理:"+throwable);
                        //2、发生错误事件后,发送一个新的被观察者 并且 发送事件
                        return Observable.just(505,404);
                    }
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println("接收到的事件:"+integer);

            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("对Error事件进行处理:"+e);

            }

            @Override
            public void onComplete() {
                System.out.println("对onComplete事件进行处理");

            }
        });
运行结果:

RxJava2.0功能性操作符(五)


onExceptionResumeNext():

作用:遇到错误时,发送一个新的Observable

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException("空指针异常"));
            }
        })
                //发送一个新的Observable,而onErrorResumeNext是由对Error作出拦截在发送新的被观察者的
                .onExceptionResumeNext(Observable.just(505,404))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("接收到的事件:"+integer);

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("对Error事件进行相应:"+e);

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("对onComplete事件作出响应");

                    }
                });
运行结果: 发送一个新的Observable,而onErrorResumeNext是由对Error作出拦截在发送新的被观察者的

RxJava2.0功能性操作符(五)


retry():

作用:重试,当发生错误时,让被观察者重新发送数据。

retry()的重载:

1)retry():

作用:发生错误的时候,让被观察者重新发送数据。如若一直发生错误,则就一直重新发送


2)retry(long time)

作用:出现错误时,让被观察者重新发送数据。参数:重试的次数。


3)retry(Predicate predicate)

作用:出现错误后,判断是否需要重新发送数据(若是需要重新发送,若是一直发生错误就会一直重试)。参数:判断逻辑。


4)retry(new BiPredicate<Integer,Throwable>)

作用:出现错误以后,判断是否需要重新发送数据,若是重新发送 并且 持续发生错误,就会一直重试。参数:重试次数、异常错误信息


5)retry(long time,Predicate predicate)作用:出现错误后,判断是否重新发送数据。参数:重试次数、逻辑判断


1、retry():

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException("空指针异常"));
                e.onNext(3);
            }
        })
                .retry()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("接收到的事件:"+integer);

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("对Error事件作出响应");

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("对onComplete事件作出响应");

                    }
                });

运行结果:

RxJava2.0功能性操作符(五)


2、retry(long time)示例

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException("空指针异常"));
                e.onNext(3);
            }
        })
                .retry(3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("接收到的事件:"+integer);

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("对Error事件作出响应"+e);

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("对onComplete事件作出响应");

                    }
                });
运行结果:

RxJava2.0功能性操作符(五)

3、retry(Predicate predicate)

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException("空指针异常"));
                e.onNext(3);
            }
        })
                .retry(new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        //拦截错误后,判断是否重新请求数据
                        if (throwable instanceof NullPointerException){
                            //表示空指针,则重新发送数据
                            return true;
                        }
                        //不是空指针,则不重新发送数据 并且 调用 观察者的 onError结束
                        return false;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("接收到的事件:"+integer);

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("对Error事件作出响应"+e);

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("对onComplete事件作出响应");

                    }
                });

运行结果:会重复的发送数据。



4、retry(new BiPredicate<Integer,Throwable>)

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException("空指针异常"));
                e.onNext(3);
            }
        })
                .retry(new BiPredicate<Integer, Throwable>() {
                    @Override
                    public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
                        Log.e("RxJava","捕获到的异常:"+throwable.toString());
                        Log.e("RxJava","捕获到的异常次数:"+integer);

                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("接收到的事件:"+integer);

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("对Error事件作出响应"+e);

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("对onComplete事件作出响应");

                    }
                });
运行结果:

RxJava2.0功能性操作符(五)


retry(long time,Predicate predicate)

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(0);
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException("空指针异常"));
                e.onNext(3);
            }
        })
               .retry(3, new Predicate<Throwable>() {
                   @Override
                   public boolean test(@NonNull Throwable throwable) throws Exception {
                       Log.e("RxJava","捕获到的异常:"+throwable.toString());
                       return true;
                   }
               })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("接收到的事件:"+integer);

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("对Error事件作出响应"+e);

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("对onComplete事件作出响应");

                    }
                });

运行结果:

RxJava2.0功能性操作符(五)


retryUntil()

作用:判断出现错误后,是否重新发送数据。与retry的区别是:retry是,return true 重新发送。而 retryUntil是,return true 不再重新发送。


retryWhen()

作用:遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并且决定是否需要重新订阅原始被观察者 并且发送事件。

RxJava2.0功能性操作符(五)

运行结果:

RxJava2.0功能性操作符(五)


RxJava2.0功能性操作符(五)

运行结果:

RxJava2.0功能性操作符(五)

5、重复发送

repeat()无限次发送,repeat(Integer int)有限次发送

作用:无条件地、重复发送事件

Observable.just(0,1,2)
                .repeat(3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("接收到的事件:"+integer);

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("对Error事件作出响应"+e);

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("对onComplete事件作出响应");

                    }
                });
运行结果:

RxJava2.0功能性操作符(五)

repeatWhen()

作用:有条件地、重复发送被观察者的事件

分为下面的两种情况:

1、若新的被观察者返回的是 onComplete 或者 onError 事件,就不在订阅和发送事件

2、若新的被观察者发送的是其他的,会重新的订阅 并且发送事件

1)发送Complete示例

Observable.just(0,1)
                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                        //返回的是Complete
                        return Observable.empty();//相当于发送了一个Complete事件
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        System.out.println("接收到的事件:"+integer);

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("对Error事件作出响应:"+e);

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("对Complete作出响应");

                    }
                });

运行结果:

RxJava2.0功能性操作符(五)

2)发送Error示例

 Observable.just(0,1)
                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                        //返回的是Error
                        return Observable.error(new NullPointerException());
                    }
                })
                ...

运行结果:

RxJava2.0功能性操作符(五)

3)发送其他事件示例

 Observable.just(0,1)
                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                        //返回的是qita
                        return Observable.just(505);
                    }
                })
                ...


运行结果:

RxJava2.0功能性操作符(五)


ok,这样RxJava基本功能就讲解完毕。我会在下篇的文章中总结一下RxJava重要的操作符在项目中的具体使用.