RxJava的操作符3

时间:2021-07-23 14:52:43

Android RxJava使用介绍(四) RxJava的操作符

标签: android操作符 8794人阅读 评论(18)收藏举报 本文章已收录于: RxJava的操作符3分类: 作者同类文章X

    目录(?)[+]

    1. Combining ObservablesObservable的组合操作符
      1. combineLatest操作符
      2. join操作符
      3. groupJoin操作符
      4. merge操作符
      5. mergeDelayError操作符
      6. startWith操作符
      7. switchOnNext操作符
      8. zip操作符
    2. Error Handling OperatorsObservable的错误处理操作符
      1. onErrorReturn操作符
      2. onErrorResumeNext操作符
      3. onExceptionResumeNext操作符
      4. retry操作符
      5. retryWhen操作符

    本篇文章继续介绍以下类型的操作符

    • Combining Observables(Observable的组合操作符)
    • Error Handling Operators(Observable的错误处理操作符)

    Combining Observables(Observable的组合操作符)

    combineLatest操作符

    combineLatest操作符把两个Observable产生的结果进行合并,合并的结果组成一个新的Observable。这两个Observable中任意一个Observable产生的结果,都和另一个Observable最后产生的结果,按照一定的规则进行合并。流程图如下:
    RxJava的操作符3
    调用例子如下:

    //产生0,5,10,15,20数列
    Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 5;
    }
    }).take(5);

    //产生0,10,20,30,40数列
    Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 10;
    }
    }).take(5);


    Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
    @Override
    public Long call(Long aLong, Long aLong2) {
    return aLong+aLong2;
    }
    }).subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Long aLong) {
    System.out.println("Next: " + aLong);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    运行结果如下:
    Next: 0
    Next: 5
    Next: 15
    Next: 20
    Next: 30
    Next: 35
    Next: 45
    Next: 50
    Next: 60
    Sequence complete.

    join操作符

    join操作符把类似于combineLatest操作符,也是两个Observable产生的结果进行合并,合并的结果组成一个新的Observable,但是join操作符可以控制每个Observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个Observable产生的结果按照一定的规则进行合并,流程图如下:
    RxJava的操作符3

    join方法的用法如下:
    observableA.join(observableB,
    observableA产生结果生命周期控制函数,
    observableB产生结果生命周期控制函数,
    observableA产生的结果与observableB产生的结果的合并规则)

    调用例子如下:

    //产生0,5,10,15,20数列
    Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 5;
    }
    }).take(5);

    //产生0,10,20,30,40数列
    Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 10;
    }
    }).take(5);

    observable1.join(observable2, new Func1<Long, Observable<Long>>() {
    @Override
    public Observable<Long> call(Long aLong) {
    //使Observable延迟600毫秒执行
    return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
    }
    }, new Func1<Long, Observable<Long>>() {
    @Override
    public Observable<Long> call(Long aLong) {
    //使Observable延迟600毫秒执行
    return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
    }
    }, new Func2<Long, Long, Long>() {
    @Override
    public Long call(Long aLong, Long aLong2) {
    return aLong + aLong2;
    }
    }).subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Long aLong) {
    System.out.println("Next: " + aLong);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    运行结果如下:
    Next: 0
    Next: 5
    Next: 15
    Next: 20
    Next: 30
    Next: 35
    Next: 45
    Next: 50
    Next: 60
    Sequence complete.

    groupJoin操作符

    groupJoin操作符非常类似于join操作符,区别在于join操作符中第四个参数的传入函数不一致,其流程图如下:
    RxJava的操作符3

    调用例子如下:

    //产生0,5,10,15,20数列
    Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 5;
    }
    }).take(5);

    //产生0,10,20,30,40数列
    Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 10;
    }
    }).take(5);

    observable1.groupJoin(observable2, new Func1<Long, Observable<Long>>() {
    @Override
    public Observable<Long> call(Long aLong) {
    return Observable.just(aLong).delay(1600, TimeUnit.MILLISECONDS);
    }
    }, new Func1<Long, Observable<Long>>() {
    @Override
    public Observable<Long> call(Long aLong) {
    return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
    }
    }, new Func2<Long, Observable<Long>, Observable<Long>>() {
    @Override
    public Observable<Long> call(Long aLong, Observable<Long> observable) {
    return observable.map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong2) {
    return aLong + aLong2;
    }
    });
    }
    }).subscribe(new Subscriber<Observable<Long>>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Observable<Long> observable) {
    observable.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Long aLong) {
    System.out.println("Next: " + aLong);
    }
    });
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    运行结果如下:
    Next: 0
    Next: 5
    Next: 10
    Next: 15
    Next: 20
    Next: 25
    Next: 30
    Next: 35
    Next: 40
    Next: 45
    Next: 50
    Next: 60
    Next: 55
    Sequence complete.

    merge操作符

    merge操作符是按照两个Observable提交结果的时间顺序,对Observable进行合并,如ObservableA每隔500毫秒产生数据为0,5,10,15,20;而ObservableB每隔500毫秒产生数据0,10,20,30,40,其中第一个数据延迟500毫秒产生,最后合并结果为:0,0,5,10,10,20,15,30,20,40;其流程图如下:
    RxJava的操作符3

    调用例子如下:

    //产生0,5,10,15,20数列
    Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 5;
    }
    }).take(5);

    //产生0,10,20,30,40数列
    Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 10;
    }
    }).take(5);

    Observable.merge(observable1, observable2)
    .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Long aLong) {
    System.out.println("Next:" + aLong);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    运行结果如下:
    Next:0
    Next:0
    Next:5
    Next:10
    Next:10
    Next:20
    Next:15
    Next:30
    Next:20
    Next:40
    Sequence complete.

    mergeDelayError操作符

    从merge操作符的流程图可以看出,一旦合并的某一个Observable中出现错误,就会马上停止合并,并对订阅者回调执行onError方法,而mergeDelayError操作符会把错误放到所有结果都合并完成之后才执行,其流程图如下:
    RxJava的操作符3

    调用例子如下:

    //产生0,5,10数列,最后会产生一个错误
    Observable<Long> errorObservable = Observable.error(new Exception("this is end!"));
    Observable < Long > observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 5;
    }
    }).take(3).mergeWith(errorObservable.delay(3500, TimeUnit.MILLISECONDS));

    //产生0,10,20,30,40数列
    Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
    .map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 10;
    }
    }).take(5);

    Observable.mergeDelayError(observable1, observable2)
    .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Long aLong) {
    System.out.println("Next:" + aLong);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    运行结果如下:
    Next:0
    Next:0
    Next:5
    Next:10
    Next:10
    Next:20
    Next:30
    Next:40
    Error: this is end!

    startWith操作符

    startWith操作符是在源Observable提交结果之前,插入指定的某些数据,其流程图如下:
    RxJava的操作符3

    调用例子如下:

    Observable.just(10,20,30).startWith(2, 3, 4).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Integer value) {
    System.out.println("Next:" + value);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    运行结果如下:
    Next:2
    Next:3
    Next:4
    Next:10
    Next:20
    Next:30
    Sequence complete.

    switchOnNext操作符

    switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者,其流程图如下:
    RxJava的操作符3

    调用例子如下:

    //每隔500毫秒产生一个observable
    Observable<Observable<Long>> observable = Observable.timer(0, 500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
    @Override
    public Observable<Long> call(Long aLong) {
    //每隔200毫秒产生一组数据(0,10,20,30,40)
    return Observable.timer(0, 200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
    return aLong * 10;
    }
    }).take(5);
    }
    }).take(2);

    Observable.switchOnNext(observable).subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Long aLong) {
    System.out.println("Next:" + aLong);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    运行结果如下:
    Next:0
    Next:10
    Next:20
    Next:0
    Next:10
    Next:20
    Next:30
    Next:40
    Sequence complete.

    zip操作符

    zip操作符是把两个observable提交的结果,严格按照顺序进行合并,其流程图如下:
    RxJava的操作符3

    调用例子如下:

    Observable<Integer> observable1 = Observable.just(10,20,30);
    Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
    Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) {
    return integer + integer2;
    }
    }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Integer value) {
    System.out.println("Next:" + value);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    运行结果如下:
    Next:14
    Next:28
    Next:42
    Sequence complete.

    Error Handling Operators(Observable的错误处理操作符)

    onErrorReturn操作符

    onErrorReturn操作符是在Observable发生错误或异常的时候(即将回调oError方法时),拦截错误并执行指定的逻辑,返回一个跟源Observable相同类型的结果,最后回调订阅者的onComplete方法,其流程图如下:
    RxJava的操作符3
    调用例子如下:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    if (subscriber.isUnsubscribed()) return;
    //循环输出数字
    try {
    for (int i = 0; i < 10; i++) {
    if (i == 4) {
    throw new Exception("this is number 4 error!");
    }
    subscriber.onNext(i);
    }
    subscriber.onCompleted();
    } catch (Exception e) {
    subscriber.onError(e);
    }
    }
    });

    observable.onErrorReturn(new Func1<Throwable, Integer>() {
    @Override
    public Integer call(Throwable throwable) {
    return 1004;
    }
    }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Integer value) {
    System.out.println("Next:" + value);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    运行结果如下:
    Next:0
    Next:1
    Next:2
    Next:3
    Next:1004
    Sequence complete.

    onErrorResumeNext操作符

    onErrorResumeNext操作符跟onErrorReturn类似,只不过onErrorReturn只能在错误或异常发生时只返回一个和源Observable相同类型的结果,而onErrorResumeNext操作符是在错误或异常发生时返回一个Observable,也就是说可以返回多个和源Observable相同类型的结果,其流程图如下:
    RxJava的操作符3
    调用例子如下:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    if (subscriber.isUnsubscribed()) return;
    //循环输出数字
    try {
    for (int i = 0; i < 10; i++) {
    if (i == 4) {
    throw new Exception("this is number 4 error!");
    }
    subscriber.onNext(i);
    }
    subscriber.onCompleted();
    } catch (Exception e) {
    subscriber.onError(e);
    }
    }
    });

    observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
    @Override
    public Observable<? extends Integer> call(Throwable throwable) {
    return Observable.just(100,101, 102);
    }
    }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Integer value) {
    System.out.println("Next:" + value);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    运行结果如下:
    Next:0
    Next:1
    Next:2
    Next:3
    Next:100
    Next:101
    Next:102
    Sequence complete.

    onExceptionResumeNext操作符

    onExceptionResumeNext操作符和onErrorResumeNext操作符类似,不同的地方在于onErrorResumeNext操作符是当Observable发生错误或异常时触发,而onExceptionResumeNext是当Observable发生异常时才触发。

    这里要普及一个概念就是,Java的异常分为错误(error)和异常(exception)两种,它们都是继承于Throwable类。

    错误(error)一般是比较严重的系统问题,比如我们经常遇到的OutOfMemoryError、*Error等都是错误。错误一般继承于Error类,而Error类又继承于Throwable类,如果需要捕获错误,需要使用try..catch(Error e)或者try..catch(Throwable e)句式。使用try..catch(Exception e)句式无法捕获错误

    异常(Exception)也是继承于Throwable类,一般是根据实际处理业务抛出的异常,分为运行时异常(RuntimeException)和普通异常。普通异常直接继承于Exception类,如果方法内部没有通过try..catch句式进行处理,必须通过throws关键字把异常抛出外部进行处理(即checked异常);而运行时异常继承于RuntimeException类,如果方法内部没有通过try..catch句式进行处理,不需要显式通过throws关键字抛出外部,如IndexOutOfBoundsException、NullPointerException、ClassCastException等都是运行时异常,当然RuntimeException也是继承于Exception类,因此是可以通过try..catch(Exception e)句式进行捕获处理的。
    onExceptionResumeNext流程图如下:
    RxJava的操作符3

    调用例子如下:

     Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    if (subscriber.isUnsubscribed()) return;
    //循环输出数字
    try {
    for (int i = 0; i < 10; i++) {
    if (i == 4) {
    throw new Exception("this is number 4 error!");
    }
    subscriber.onNext(i);
    }
    subscriber.onCompleted();
    } catch (Throwable e) {
    subscriber.onError(e);
    }
    }
    });

    observable.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Integer value) {
    System.out.println("Next:" + value);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    运行结果如下:
    Next:0
    Next:1
    Next:2
    Next:3
    Next:100
    Next:101
    Next:102
    Sequence complete.

    retry操作符

    retry操作符是当Observable发生错误或者异常时,重新尝试执行Observable的逻辑,如果经过n次重新尝试执行后仍然出现错误或者异常,则最后回调执行onError方法;当然如果源Observable没有错误或者异常出现,则按照正常流程执行。其流程图如下:
    RxJava的操作符3

    调用例子如下:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    if (subscriber.isUnsubscribed()) return;
    //循环输出数字
    try {
    for (int i = 0; i < 10; i++) {
    if (i == 4) {
    throw new Exception("this is number 4 error!");
    }
    subscriber.onNext(i);
    }
    subscriber.onCompleted();
    } catch (Throwable e) {
    subscriber.onError(e);
    }
    }
    });

    observable.retry(2).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Integer value) {
    System.out.println("Next:" + value);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    运行结果如下:
    Next:0
    Next:1
    Next:2
    Next:3

    Next:0
    Next:1
    Next:2
    Next:3

    Next:0
    Next:1
    Next:2
    Next:3
    Error: this is number 4 error!

    retryWhen操作符

    retryWhen操作符类似于retry操作符,都是在源observable出现错误或者异常时,重新尝试执行源observable的逻辑,不同在于retryWhen操作符是在源Observable出现错误或者异常时,通过回调第二个Observable来判断是否重新尝试执行源Observable的逻辑,如果第二个Observable没有错误或者异常出现,则就会重新尝试执行源Observable的逻辑,否则就会直接回调执行订阅者的onError方法。其流程图如下:
    RxJava的操作符3

    调用例子如下:

    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    System.out.println("subscribing");
    subscriber.onError(new RuntimeException("always fails"));
    }
    });

    observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
    @Override
    public Integer call(Throwable throwable, Integer integer) {
    return integer;
    }
    }).flatMap(new Func1<Integer, Observable<?>>() {
    @Override
    public Observable<?> call(Integer integer) {
    System.out.println("delay retry by " + integer + " second(s)");
    //每一秒中执行一次
    return Observable.timer(integer, TimeUnit.SECONDS);
    }
    });
    }
    }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }

    @Override
    public void onError(Throwable e) {
    System.err.println("Error: " + e.getMessage());
    }

    @Override
    public void onNext(Integer value) {
    System.out.println("Next:" + value);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    RxJava的操作符3
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    运行结果如下:
    subscribing
    delay retry by 1 second(s)
    subscribing
    delay retry by 2 second(s)
    subscribing
    delay retry by 3 second(s)
    subscribing
    Sequence complete.

    好了,先介绍这么多,下回继续介绍其他的操作符,敬请期待!

    13
    0
       

    我的同类文章

    http://blog.csdn.net