1、subscribe()
作用:订阅,连接被观察者和观察者
//创建观察者 Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { //被观察者发射事件 e.onNext(0); e.onNext(1); e.onError(new NullPointerException("空指针异常")); e.onComplete(); } }); //创建观察者 Observer<Integer> observer=new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的数据:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出响应:"+e); } @Override public void onComplete() { } }; //订阅,通过订阅连接被观察者和观察者 observable.subscribe(observer);运行结果:
2、线程调度
作用:指定 被观察者 和 观察者 的工作线程的类型。
观察者和被观察者默认所在的线程:
//被观察者 Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { System.out.println("被观察者所在的线程名称:"+Thread.currentThread().getName()+",线程ID:"+Thread.currentThread().getId()); } }); //观察者 Observer observer=new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("观察者所在的线程名称:"+Thread.currentThread().getName()+",线程ID:"+Thread.currentThread().getId()); } @Override public void onNext(@NonNull Object o) { } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } }; //订阅 observable.subscribe(observer);
运行结果:
从运行的结果可以看出:默认被观察者和观察者都是在主线程工作的,但是实际的开发应用中我们需要的是:需要被观察者在子线程中做耗时操作(连接网络),而观察者需要在主线程中更新UI(呈现数据),这是就需要用到了线程调度。
在RxJava中有多种的调度线程类型:
注意:若是被观察者指定多次所在线程,那只对第一次指定的有效。而观察者指定多次,则每一次指定的都有效,也就是说每一次都会进行线程的切换。
3、在事件生命周期中的操作:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onNext(3); } }) //每发射一个事件都会执行一次 .doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(@NonNull Notification<Integer> integerNotification) throws Exception { System.out.println("doOnEach:"+"发射一次事件执行一次"); } }) //执行onNext事件之前调用 .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { System.out.println("doOnNext:"+"在执行onNext事件之前调用"); } }) //执行onNext事件之后调用 .doAfterNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { System.out.println("doAfterNext:"+"在执行doAfterNext之后调用"); } }) //被观察者正常发送事件完毕后调用 .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("doOnComplete:"+"被观察者发送事件完成之后调用"); } }) //观察者订阅时调用 .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { System.out.println("doOnSubcriber:"+"观察者订阅时调用"); } }) //最后执行 .doFinally(new Action() { @Override public void run() throws Exception { System.out.println("doFinally:"+"最后调用"); } }) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { System.out.println("接收到的事件:"+integer); } });
4、错误处理:
总体结构图:
onErrorReturn():
作用:遇到错误时,发送一个特殊事件 并且 正常终止
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onError(new NullPointerException("空指针异常")); } }) //捕获异常 .onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(@NonNull Throwable throwable) throws Exception { //获取到错误异常 System.out.println("捕获到异常:" + throwable.toString()); //发生错误以后发射一个事件然后正常的终止 return 505; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出处理:"+e); } @Override public void onComplete() { System.out.println("对onComplete作出响应"); } });运行结果:
onErrorResumeNext():
作用:遇到错误时,发送1个新的Observable
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onError(new NullPointerException("空指针异常")); } }) .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception { //1、捕获异常 System.out.println("捕获到异常,进行处理:"+throwable); //2、发生错误事件后,发送一个新的被观察者 并且 发送事件 return Observable.just(505,404); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件进行处理:"+e); } @Override public void onComplete() { System.out.println("对onComplete事件进行处理"); } });运行结果:
onExceptionResumeNext():
作用:遇到错误时,发送一个新的Observable
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onError(new NullPointerException("空指针异常")); } }) //发送一个新的Observable,而onErrorResumeNext是由对Error作出拦截在发送新的被观察者的 .onExceptionResumeNext(Observable.just(505,404)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件进行相应:"+e); } @Override public void onComplete() { System.out.println("对onComplete事件作出响应"); } });运行结果: 发送一个新的Observable,而onErrorResumeNext是由对Error作出拦截在发送新的被观察者的
retry():
作用:重试,当发生错误时,让被观察者重新发送数据。
retry()的重载:
1)retry():
作用:发生错误的时候,让被观察者重新发送数据。如若一直发生错误,则就一直重新发送
2)retry(long time)
作用:出现错误时,让被观察者重新发送数据。参数:重试的次数。
3)retry(Predicate predicate)
作用:出现错误后,判断是否需要重新发送数据(若是需要重新发送,若是一直发生错误就会一直重试)。参数:判断逻辑。
4)retry(new BiPredicate<Integer,Throwable>)
作用:出现错误以后,判断是否需要重新发送数据,若是重新发送 并且 持续发生错误,就会一直重试。参数:重试次数、异常错误信息
5)retry(long time,Predicate predicate)作用:出现错误后,判断是否重新发送数据。参数:重试次数、逻辑判断
1、retry():
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onError(new NullPointerException("空指针异常")); e.onNext(3); } }) .retry() .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出响应"); } @Override public void onComplete() { System.out.println("对onComplete事件作出响应"); } });
运行结果:
2、retry(long time)示例
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onError(new NullPointerException("空指针异常")); e.onNext(3); } }) .retry(3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出响应"+e); } @Override public void onComplete() { System.out.println("对onComplete事件作出响应"); } });运行结果:
3、retry(Predicate predicate)
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onError(new NullPointerException("空指针异常")); e.onNext(3); } }) .retry(new Predicate<Throwable>() { @Override public boolean test(@NonNull Throwable throwable) throws Exception { //拦截错误后,判断是否重新请求数据 if (throwable instanceof NullPointerException){ //表示空指针,则重新发送数据 return true; } //不是空指针,则不重新发送数据 并且 调用 观察者的 onError结束 return false; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出响应"+e); } @Override public void onComplete() { System.out.println("对onComplete事件作出响应"); } });
运行结果:会重复的发送数据。
4、retry(new BiPredicate<Integer,Throwable>)
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onError(new NullPointerException("空指针异常")); e.onNext(3); } }) .retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception { Log.e("RxJava","捕获到的异常:"+throwable.toString()); Log.e("RxJava","捕获到的异常次数:"+integer); return true; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出响应"+e); } @Override public void onComplete() { System.out.println("对onComplete事件作出响应"); } });运行结果:
retry(long time,Predicate predicate)
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onError(new NullPointerException("空指针异常")); e.onNext(3); } }) .retry(3, new Predicate<Throwable>() { @Override public boolean test(@NonNull Throwable throwable) throws Exception { Log.e("RxJava","捕获到的异常:"+throwable.toString()); return true; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出响应"+e); } @Override public void onComplete() { System.out.println("对onComplete事件作出响应"); } });
运行结果:
retryUntil()
作用:判断出现错误后,是否重新发送数据。与retry的区别是:retry是,return true 重新发送。而 retryUntil是,return true 不再重新发送。
retryWhen()
作用:遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并且决定是否需要重新订阅原始被观察者 并且发送事件。
运行结果:
运行结果:
5、重复发送
repeat()无限次发送,repeat(Integer int)有限次发送
作用:无条件地、重复发送事件
Observable.just(0,1,2) .repeat(3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出响应"+e); } @Override public void onComplete() { System.out.println("对onComplete事件作出响应"); } });运行结果:
repeatWhen()
作用:有条件地、重复发送被观察者的事件
分为下面的两种情况:
1、若新的被观察者返回的是 onComplete 或者 onError 事件,就不在订阅和发送事件
2、若新的被观察者发送的是其他的,会重新的订阅 并且发送事件
1)发送Complete示例
Observable.just(0,1) .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception { //返回的是Complete return Observable.empty();//相当于发送了一个Complete事件 } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { System.out.println("接收到的事件:"+integer); } @Override public void onError(@NonNull Throwable e) { System.out.println("对Error事件作出响应:"+e); } @Override public void onComplete() { System.out.println("对Complete作出响应"); } });
运行结果:
2)发送Error示例
Observable.just(0,1) .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception { //返回的是Error return Observable.error(new NullPointerException()); } }) ...
运行结果:
3)发送其他事件示例
Observable.just(0,1) .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception { //返回的是qita return Observable.just(505); } }) ...
运行结果:
ok,这样RxJava基本功能就讲解完毕。我会在下篇的文章中总结一下RxJava重要的操作符在项目中的具体使用.