RxJava的操作符(2)

时间:2022-05-25 22:48:17

Android RxJava使用介绍(三) RxJava的操作符

标签: android操作符 14202人阅读 评论(13)收藏举报 本文章已收录于: RxJava的操作符(2)分类: 作者同类文章X

    目录(?)[+]

    1. Transforming ObservablesObservable的转换操作符
      1. buffer操作符
      2. flatMap操作符
      3. concatMap操作符
      4. switchMap操作符
      5. groupBy操作符
      6. map操作符
      7. cast操作符
      8. scan操作符
      9. window操作符
    2. Filtering ObservablesObservable的过滤操作符
      1. debounce操作符
      2. distinct操作符
      3. elementAt操作符
      4. filter操作符
      5. ofType操作符
      6. first操作符
      7. single操作符
      8. last操作符
      9. ignoreElements操作符
      10. sample操作符
      11. skip操作符
      12. skipLast操作符
      13. take操作符
      14. takeFirst操作符
      15. takeLast操作符

    上一篇文章已经详细讲解了RxJava的创建型操作符,本片文章将继续讲解RxJava操作符,包括:

    • Transforming Observables(Observable的转换操作符)
    • Filtering Observables(Observable的过滤操作符)

    Transforming Observables(Observable的转换操作符)

    buffer操作符

    buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始。

    需要注意的是,一旦源Observable在产生结果的过程中出现异常,即使buffer已经存在收集到的结果,订阅者也会马上收到这个异常,并结束整个过程。

    buffer的名字很怪,但是原理很简单,流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    //定义邮件内容
    final String[] mails = new String[]{"Here is an email!", "Another email!", "Yet another email!"};
    //每隔1秒就随机发布一封邮件
    Observable<String> endlessMail = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    try {
    if (subscriber.isUnsubscribed()) return;
    Random random = new Random();
    while (true) {
    String mail = mails[random.nextInt(mails.length)];
    subscriber.onNext(mail);
    Thread.sleep(1000);
    }
    } catch (Exception ex) {
    subscriber.onError(ex);
    }
    }
    }).subscribeOn(Schedulers.io());
    //把上面产生的邮件内容缓存到列表中,并每隔3秒通知订阅者
    endlessMail.buffer(3, TimeUnit.SECONDS).subscribe(new Action1<List<String>>() {
    @Override
    public void call(List<String> list) {

    System.out.println(String.format("You've got %d new messages! Here they are!", list.size()));
    for (int i = 0; i < list.size(); i++)
    System.out.println("**" + list.get(i).toString());
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    运行结果如下:
    You’ve got 3 new messages! Here they are!(after 3s)
    **Here is an email!
    **Another email!
    **Another email!
    You’ve got 3 new messages! Here they are!(after 6s)
    **Here is an email!
    **Another email!
    **Here is an email!
    ……

    flatMap操作符

    flatMap操作符是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

    flatMap操作符通过传入一个函数作为参数转换源Observable,在这个函数中,你可以自定义转换规则,最后在这个函数中返回一个新的Observable,然后flatMap操作符通过合并这些Observable结果成一个Observable,并依次提交结果给订阅者。

    值得注意的是,flatMap操作符在合并Observable结果时,有可能存在交叉的情况,如下流程图所示:
    RxJava的操作符(2)

    调用例子如下:

    private Observable<File> listFiles(File f){
    if(f.isDirectory()){
    return Observable.from(f.listFiles()).flatMap(new Func1<File, Observable<File>>() {
    @Override
    public Observable<File> call(File file) {
    return listFiles(f);
    }
    });
    } else {
    return Observable.just(f);
    }
    }


    @Override
    public void onClick(View v) {
    Observable.just(getApplicationContext().getExternalCacheDir())
    .flatMap(new Func1<File, Observable<File>>() {
    @Override
    public Observable<File> call(File file) {
    //参数file是just操作符产生的结果,这里判断file是不是目录文件,如果是目录文件,则递归查找其子文件flatMap操作符神奇的地方在于,返回的结果还是一个Observable,而这个Observable其实是包含多个文件的Observable的,输出应该是ExternalCacheDir下的所有文件
    return listFiles(file);
    }
    })
    .subscribe(new Action1<File>() {
    @Override
    public void call(File file) {
    System.out.println(file.getAbsolutePath());
    }
    });

    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    concatMap操作符

    cancatMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

    与flatMap操作符不同的是,concatMap操作符在处理产生的Observable时,采用的是“连接(concat)”的方式,而不是“合并(merge)”的方式,这就能保证产生结果的顺序性,也就是说提交给订阅者的结果是按照顺序提交的,不会存在交叉的情况。

    concatMap的流程如下:
    RxJava的操作符(2)

    concatMap的调用例子与flatMap类似,这里不做重复

    switchMap操作符

    switchMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

    与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果,举个例子来说,比如源Observable产生A、B、C三个结果,通过switchMap的自定义映射规则,映射后应该会产生A1、A2、B1、B2、C1、C2,但是在产生B2的同时,C1已经产生了,这样最后的结果就变成A1、A2、B1、C1、C2,B2被舍弃掉了!流程图如下:
    RxJava的操作符(2)

    以下是flatMap、concatMap和switchMap的运行实例对比:


    //flatMap操作符的运行结果
    Observable.just(10, 20, 30).flatMap(new Func1<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> call(Integer integer) {
    //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
    int delay = 200;
    if (integer > 10)
    delay = 180;

    return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
    }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    System.out.println("flatMap Next:" + integer);
    }
    });

    //concatMap操作符的运行结果
    Observable.just(10, 20, 30).concatMap(new Func1<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> call(Integer integer) {
    //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
    int delay = 200;
    if (integer > 10)
    delay = 180;

    return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
    }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    System.out.println("concatMap Next:" + integer);
    }
    });

    //switchMap操作符的运行结果
    Observable.just(10, 20, 30).switchMap(new Func1<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> call(Integer integer) {
    //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
    int delay = 200;
    if (integer > 10)
    delay = 180;

    return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
    }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    System.out.println("switchMap Next:" + integer);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    运行结果如下:
    flatMap Next:20
    flatMap Next:10
    flatMap Next:30
    flatMap Next:15
    flatMap Next:10
    flatMap Next:5

    switchMap Next:30
    switchMap Next:15

    concatMap Next:10
    concatMap Next:5
    concatMap Next:20
    concatMap Next:10
    concatMap Next:30
    concatMap Next:15

    groupBy操作符

    groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。

    值得注意的是,由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。

    groupBy操作符的流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.interval(1, TimeUnit.SECONDS).take(10).groupBy(new Func1<Long, Long>() {
    @Override
    public Long call(Long value) {
    //按照key为0,1,2分为3组
    return value % 3;
    }
    }).subscribe(new Action1<GroupedObservable<Long, Long>>() {
    @Override
    public void call(GroupedObservable<Long, Long> result) {
    result.subscribe(new Action1<Long>() {
    @Override
    public void call(Long value) {
    System.out.println("key:" + result.getKey() +", value:" + value);
    }
    });
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    运行结果如下:
    key:0, value:0
    key:1, value:1
    key:2, value:2
    key:0, value:3
    key:1, value:4
    key:2, value:5
    key:0, value:6
    key:1, value:7
    key:2, value:8
    key:0, value:9

    map操作符

    map操作符是把源Observable产生的结果,通过映射规则转换成另一个结果集,并提交给订阅者进行处理。

    map操作符的流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1,2,3,4,5,6).map(new Func1<Integer, Integer>() {
    @Override
    public Integer call(Integer integer) {
    //对源Observable产生的结果,都统一乘以3处理
    return integer*3;
    }
    }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    System.out.println("next:" + integer);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    运行结果如下:
    next:3
    next:6
    next:9
    next:12
    next:15
    next:18

    cast操作符

    cast操作符类似于map操作符,不同的地方在于map操作符可以通过自定义规则,把一个值A1变成另一个值A2,A1和A2的类型可以一样也可以不一样;而cast操作符主要是做类型转换的,传入参数为类型class,如果源Observable产生的结果不能转成指定的class,则会抛出ClassCastException运行时异常。

    cast操作符的流程图如下:
    RxJava的操作符(2)

    调用例子如下:

            Observable.just(1,2,3,4,5,6).cast(Integer.class).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
    System.out.println("next:"+value);
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    运行结果如下:
    next:1
    next:2
    next:3
    next:4
    next:5
    next:6

    scan操作符

    scan操作符通过遍历源Observable产生的结果,依次对每一个结果项按照指定规则进行运算,计算后的结果作为下一个迭代项参数,每一次迭代项都会把计算结果输出给订阅者。

    scan操作符的流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1, 2, 3, 4, 5)
    .scan(new Func2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer sum, Integer item) {
    //参数sum就是上一次的计算结果
    return sum + item;
    }
    }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    运行结果如下:
    Next: 1
    Next: 3
    Next: 6
    Next: 10
    Next: 15
    Sequence complete.

    window操作符

    window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理。

    window操作符有很多个重载方法,这里只举一个简单的例子,其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.interval(1, TimeUnit.SECONDS).take(12)
    .window(3, TimeUnit.SECONDS)
    .subscribe(new Action1<Observable<Long>>() {
    @Override
    public void call(Observable<Long> observable) {
    System.out.println("subdivide begin......");
    observable.subscribe(new Action1<Long>() {
    @Override
    public void call(Long aLong) {
    System.out.println("Next:" + aLong);
    }
    });
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    运行结果如下:
    subdivide begin……
    Next:0
    Next:1
    subdivide begin……
    Next:2
    Next:3
    Next:4
    subdivide begin……
    Next:5
    Next:6
    Next:7
    subdivide begin……
    Next:8
    Next:9
    Next:10
    subdivide begin……
    Next:11

    Filtering Observables(Observable的过滤操作符)

    debounce操作符

    debounce操作符对源Observable每产生一个结果后,如果在规定的间隔时间内没有别的结果产生,则把这个结果提交给订阅者处理,否则忽略该结果。

    值得注意的是,如果源Observable产生的最后一个结果后在规定的时间间隔内调用了onCompleted,那么通过debounce操作符也会把这个结果提交给订阅者。

    debounce操作符的流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    if(subscriber.isUnsubscribed()) return;
    try {
    //产生结果的间隔时间分别为100、200、300...900毫秒
    for (int i = 1; i < 10; i++) {
    subscriber.onNext(i);
    Thread.sleep(i * 100);
    }
    subscriber.onCompleted();
    }catch(Exception e){
    subscriber.onError(e);
    }
    }
    }).subscribeOn(Schedulers.newThread())
    .debounce(400, TimeUnit.MILLISECONDS) //超时时间为400毫秒
    .subscribe(
    new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    System.out.println("Next:" + integer);
    }
    }, new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
    System.out.println("Error:" + throwable.getMessage());
    }
    }, new Action0() {
    @Override
    public void call() {
    System.out.println("completed!");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    运行结果如下:
    Next:4
    Next:5
    Next:6
    Next:7
    Next:8
    Next:9
    completed!

    distinct操作符

    distinct操作符对源Observable产生的结果进行过滤,把重复的结果过滤掉,只输出不重复的结果给订阅者,非常类似于SQL里的distinct关键字。

    distinct操作符的流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1, 2, 1, 1, 2, 3)
    .distinct()
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    运行结果如下:
    Next: 1
    Next: 2
    Next: 3
    Sequence complete.

    elementAt操作符

    elementAt操作符在源Observable产生的结果中,仅仅把指定索引的结果提交给订阅者,索引是从0开始的。其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1,2,3,4,5,6).elementAt(2)
    .subscribe(
    new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    System.out.println("Next:" + integer);
    }
    }, new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
    System.out.println("Error:" + throwable.getMessage());
    }
    }, new Action0() {
    @Override
    public void call() {
    System.out.println("completed!");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    运行结果如下:
    Next:3
    completed!

    filter操作符

    filter操作符是对源Observable产生的结果按照指定条件进行过滤,只有满足条件的结果才会提交给订阅者,其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1, 2, 3, 4, 5)
    .filter(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer item) {
    return( item < 4 );
    }
    }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    运行结果如下:
    Next: 1
    Next: 2
    Next: 3
    Sequence complete.

    ofType操作符

    ofType操作符类似于filter操作符,区别在于ofType操作符是按照类型对结果进行过滤,其流程图如下:
    RxJava的操作符(2)
    调用例子如下:

    Observable.just(1, "hello world", true, 200L, 0.23f)
    .ofType(Float.class)
    .subscribe(new Subscriber<Object>() {
    @Override
    public void onNext(Object item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    运行结果如下:
    Next: 0.23
    Sequence complete.

    first操作符

    first操作符是把源Observable产生的结果的第一个提交给订阅者,first操作符可以使用elementAt(0)和take(1)替代。其流程图如下:
    RxJava的操作符(2)
    调用例子如下:

            Observable.just(1,2,3,4,5,6,7,8)
    .first()
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    运行结果如下:
    Next: 1
    Sequence complete.

    single操作符

    single操作符是对源Observable的结果进行判断,如果产生的结果满足指定条件的数量不为1,则抛出异常,否则把满足条件的结果提交给订阅者,其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1,2,3,4,5,6,7,8)
    .single(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
    //取大于10的第一个数字
    return integer>10;
    }
    })
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    运行结果如下:
    Error: Sequence contains no elements

    last操作符

    last操作符把源Observable产生的结果的最后一个提交给订阅者,last操作符可以使用takeLast(1)替代。其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1, 2, 3)
    .last()
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    运行结果如下:
    Next: 3
    Sequence complete.

    ignoreElements操作符

    ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。ignoreElements操作符适用于不太关心Observable产生的结果,只是在Observable结束时(onCompleted)或者出现错误时能够收到通知。

    ignoreElements操作符的流程图如下:
    RxJava的操作符(2)

    调用例子如下:

            Observable.just(1,2,3,4,5,6,7,8).ignoreElements()
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    运行结果如下:
    Sequence complete.

    sample操作符

    sample操作符定期扫描源Observable产生的结果,在指定的时间间隔范围内对源Observable产生的结果进行采样,其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    if(subscriber.isUnsubscribed()) return;
    try {
    //前8个数字产生的时间间隔为1秒,后一个间隔为3秒
    for (int i = 1; i < 9; i++) {
    subscriber.onNext(i);
    Thread.sleep(1000);
    }
    Thread.sleep(2000);
    subscriber.onNext(9);
    subscriber.onCompleted();
    } catch(Exception e){
    subscriber.onError(e);
    }
    }
    }).subscribeOn(Schedulers.newThread())
    .sample(2200, TimeUnit.MILLISECONDS) //采样间隔时间为2200毫秒
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    运行结果如下:
    Next: 3
    Next: 5
    Next: 7
    Next: 8
    Sequence complete.

    skip操作符

    skip操作符针对源Observable产生的结果,跳过前面n个不进行处理,而把后面的结果提交给订阅者处理,其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1,2,3,4,5,6,7).skip(3)
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    运行结果如下:
    Next: 4
    Next: 5
    Next: 6
    Next: 7
    Sequence complete.

    skipLast操作符

    skipLast操作符针对源Observable产生的结果,忽略Observable最后产生的n个结果,而把前面产生的结果提交给订阅者处理,

    值得注意的是,skipLast操作符提交满足条件的结果给订阅者是存在延迟效果的,看以下流程图即可明白:
    RxJava的操作符(2)

    可以看到skipLast操作符把最后的天蓝色球、蓝色球、紫色球忽略掉了,但是前面的红色球等并不是源Observable一产生就直接提交给订阅者,这里有一个延迟的效果。

    调用例子如下:

    Observable.just(1,2,3,4,5,6,7).skipLast(3)
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    运行结果如下:
    Next: 1
    Next: 2
    Next: 3
    Next: 4
    Sequence complete.

    take操作符

    take操作符是把源Observable产生的结果,提取前面的n个提交给订阅者,而忽略后面的结果,其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
    .take(4)
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    运行结果如下:
    Next: 1
    Next: 2
    Next: 3
    Next: 4
    Sequence complete.

    takeFirst操作符

    takeFirst操作符类似于take操作符,同时也类似于first操作符,都是获取源Observable产生的结果列表中符合指定条件的前一个或多个,与first操作符不同的是,first操作符如果获取不到数据,则会抛出NoSuchElementException异常,而takeFirst则会返回一个空的Observable,该Observable只有onCompleted通知而没有onNext通知。

    takeFirst操作符的流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
    //获取数值大于3的数据
    return integer>3;
    }
    })
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    运行结果如下:
    Next: 4
    Sequence complete.

    takeLast操作符

    takeLast操作符是把源Observable产生的结果的后n项提交给订阅者,提交时机是Observable发布onCompleted通知之时。其流程图如下:
    RxJava的操作符(2)

    调用例子如下:

    Observable.just(1,2,3,4,5,6,7).takeLast(2)
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    RxJava的操作符(2)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    运行结果如下:
    Next: 6
    Next: 7
    Sequence complete.

    不知不觉介绍了那么多操作符,篇幅有点长了,下回继续介绍其他的操作符,敬请期待!

    13
    0
       

    我的同类文章

    http://blog.csdn.net