Android RxJava转换操作符

时间:2021-01-27 17:48:24

Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。因此在学习过程中全面的了解了下RxJava的转换操作符。

map()

原理图
Android RxJava转换操作符
方法:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)

作用:
可以将被观察者发送的数据进行数据类型转换
代码:
比如说想将String数组的数据转换成Integer

        Observable.fromArray(new String[]{"1","12","123"}).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.valueOf(s);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("----","----onNext:"+integer);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

flatMap()

原理图
Android RxJava转换操作符
方法:

 public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) 

作用:
这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。
1、将传入的事件对象装换成一个Observable对象;
2、这时不会直接发送这个Observable, 而是将这个Observable激活让它自己开始发送事件;
3、每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable负责将这些事件统一交给Subscriber的回调方法。
代码:
假设有一个People类,定义如下:

class People {
    private String name;
    private ArrayList<Book> saveBooks;//收藏书籍

    public People(String name, ArrayList<Book> saveBooks){
        this.name = name;
        this.saveBooks = saveBooks;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ArrayList<Book> getSaveBooks() {
        return saveBooks;
    }

    public void setSaveBooks(ArrayList<Book> saveBooks) {
        this.saveBooks = saveBooks;
    }
}

People类里有name和saveBooks两个变量,分别代表姓名和收藏得书籍清单
Book定义如下:

class Book{
    private String name;
    private ArrayList<String> history;//修订记录

    public Book(String name,ArrayList<String> history){
        this.name = name;
        this.history = history;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ArrayList<String> getHistory() {
        return history;
    }

    public void setHistory(ArrayList<String> history) {
        this.history = history;
    }
}

初始化一些简单数据

 ArrayList<Book> books = new ArrayList<>(3); books.add(new Book("开发艺术探索",new ArrayList<String>(){{add("2018.01.01 第一版");add("2018.01.02 第二版");}})); books.add(new Book("第一行代码",new ArrayList<String>(){{add("2019.01.01 第一版");add("2019.01.02 第二版");}})); ArrayList<People> libraries = new ArrayList<>(2); libraries.add(new People("Tom",books)); libraries.add(new People("Jerry",books));

如果我们得需求是打印每个人收藏得书籍相关信息,可以用map来实现

        Observable.fromIterable(libraries).map(new Function<People, ArrayList<Book>>() {
            @Override
            public ArrayList<Book> apply(People people) throws Exception {
                return people.getSaveBooks();
            }
        }).subscribe(new Observer<ArrayList<Book>>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(ArrayList<Book> books) {
                //打印想要数据
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

但是如果这里我们有进一步得需求,那就是只打印每个人收藏得每本书得每一次修订记录,如果再次用map得话就得在onNext里面获取Book对象进行遍历获取数据,但是如果用flatMap()可以很方便得实现

 Observable.fromIterable(libraries).flatMap(new Function<People, ObservableSource<Book>>() {
           @Override
           public ObservableSource<Book> apply(People people) throws Exception {
           //获取当前传递进来得用户得每一本收藏
               return Observable.fromIterable(people.getSaveBooks());
           }
       }).flatMap(new Function<Book, ObservableSource<String>>() {
           @Override
           public ObservableSource<String> apply(Book book) throws Exception {
           //获取每本书得修订记录
               return Observable.fromIterable(book.getHistory());
           }
       }).subscribe(new Observer<String>() {
           @Override
           public void onSubscribe(Disposable d) {
           }

           @Override
           public void onNext(String s) {
                //打印修订记录
           }

           @Override
           public void onError(Throwable e) {
           }

           @Override
           public void onComplete() {
           }
       });

从上述代码可以看出,经过两次flagMap()对数据进行转换,就可以完成需求,代码逻辑比较清晰。

concatMap()

原理图
Android RxJava转换操作符
方法:

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

作用:
功能同flatMap()基本上是一样的,只不过concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。
代码:
将上述flatMap()代码做如下更改

  Observable.fromIterable(libraries).flatMap(new Function<People, ObservableSource<Book>>() {
           @Override
           public ObservableSource<Book> apply(People people) throws Exception {
               if("Tom".equals(people.getName())){
                   return Observable.fromIterable(people.getSaveBooks()).delay(3, TimeUnit.SECONDS);
               }
               return Observable.fromIterable(people.getSaveBooks());
           }
       }).flatMap(new Function<Book, ObservableSource<String>>() {
           @Override
           public ObservableSource<String> apply(Book book) throws Exception {
               return Observable.fromIterable(book.getHistory());
           }
       }).subscribe(new Observer<String>() {
           @Override
           public void onSubscribe(Disposable d) {
           }

           @Override
           public void onNext(String s) {
                //打印修订记录
               Log.e("----",s);
           }

           @Override
           public void onError(Throwable e) {
           }

           @Override
           public void onComplete() {
           }
       });

将用户名为Tom得用户数据延迟3秒再发送,打印日志如下

06-04 18:17:23.084 25841-25841/ E/----: 2018.01.01 第一版
    2018.01.02 第二版
    2019.01.01 第一版
    2019.01.02 第二版
06-04 18:17:26.086 25841-25874/ E/----: 2018.01.01 第一版
    2018.01.02 第二版
    2019.01.01 第一版
    2019.01.02 第二版

会发现Tom得数据是3秒之后接收到得,如果我们把flatMap() 改成concatMap()打印日志如下

06-04 18:19:47.002 26620-26644/ E/----: 2018.01.01 第一版
    2018.01.02 第二版
    2019.01.01 第一版
    2019.01.02 第二版
    2018.01.01 第一版
    2018.01.02 第二版
    2019.01.01 第一版
    2019.01.02 第二版

这就代表 concatMap() 转换后发送的事件序列是有序的了。

flatMapIterable()

原理图
Android RxJava转换操作符
方法:

public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper)

作用:
flatMapIterable() 和flatMap()功能在流程上大体一致,唯一不同的是,flatMap是转一个Observable转换成多个Observable,每一个Observable最后又返回一个Observable。而flatMapInterable是将一个Observable转换成多个Observable,但是每一个Observable最后返回得是Iterable。Iterable,可以理解成返回一个list
代码:

        Observable.fromArray(new String[]{"1","2","3"}).flatMapIterable(new Function<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(final String s) throws Exception {
                return new ArrayList<String>(){{add("a"+s);add("b"+s);}};
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("----",s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

上述代码打印日志如下

06-04 18:35:15.400 30351-30351/ E/----: a1
    b1
    a2
    b2
    a3
    b3

switchMap()

原理图
Android RxJava转换操作符
用法:

public final <R> Observable<R> switchMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

作用:
switchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。

buffer()

原理图
Android RxJava转换操作符
用法:

public final Observable<List<T>> buffer(int count, int skip) 

count:缓冲区元素的数量
skip:缓冲区满了之后,发送下次事件序列的时候要跳过多少元素
作用:
从需要发送的事件当中获取一定数量事件,并将这些事件放到缓冲区当中一并发送
代码:

        Observable.just(1,2,3,4,5,6).buffer(3,2).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<Integer> value) {
                Log.e("---","---"+value);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

上述代码是设置缓冲区大小为3,每次跳过2个元素,代码打印结果如下:

06-04 22:20:48.595 2631-2631/ E/---: ---[1, 2, 3]
06-04 22:20:48.595 2631-2631/ E/---: ---[3, 4, 5]
06-04 22:20:48.595 2631-2631/ E/---: ---[5, 6]

scan()

原理图:
Android RxJava转换操作符
用法:

public final Observable<T> scan(BiFunction<T, T, T> accumulator)

作用:
将数据以一定的逻辑聚合起来,并将计算结果发送出去作为下个数据应用函数时的第一个参数使用
代码:

       Observable.just(1,2,3,4,5).scan(new BiFunction<Integer, Integer, Integer>() {
           @Override
           public Integer apply(Integer integer, Integer integer2) throws Exception {
               Log.e("---","integer:"+integer+" integer2:"+integer2);
               return integer + integer2;
           }
       }).subscribe(new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {

           }

           @Override
           public void onNext(Integer value) {
                Log.e("---","value:"+value);
           }

           @Override
           public void onError(Throwable e) {

           }

           @Override
           public void onComplete() {

           }
       });

上述代码打印日志如下:

06-04 22:35:41.659 11404-11404/? E/---: value:1
06-04 22:35:41.659 11404-11404/? E/---: integer:1  integer2:2
06-04 22:35:41.659 11404-11404/? E/---: value:3
06-04 22:35:41.659 11404-11404/? E/---: integer:3  integer2:3
06-04 22:35:41.659 11404-11404/? E/---: value:6
06-04 22:35:41.659 11404-11404/? E/---: integer:6  integer2:4
06-04 22:35:41.659 11404-11404/? E/---: value:10
06-04 22:35:41.659 11404-11404/? E/---: integer:10  integer2:5
06-04 22:35:41.659 11404-11404/? E/---: value:15

通过上述执行结果可以发现,第一个参数integer就是上一次运算的结果

groupby()

原理图:
Android RxJava转换操作符
用法:

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)

作用:
将原始Observable发送的数据按照key进行分组,每个分组都会返回一个Observable,这些Observable分别发射其包含的数据。
代码:
下面代码代表将1到5的数据根据是否能对2整除进行分组

        Observable.just(1, 2, 3, 4, 5)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        return integer % 2;
                    }
                }).subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(final GroupedObservable<Integer, Integer> value) {
               value.subscribe(new Observer<Integer>() {
                   @Override
                   public void onSubscribe(Disposable d) {
                   }

                   @Override
                   public void onNext(Integer integer) {
                        Log.e("---",value.getKey()+" "+integer);
                   }

                   @Override
                   public void onError(Throwable e) {
                   }

                   @Override
                   public void onComplete() {
                   }
               });
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

上述代码执行结果如下:

06-04 22:47:41.850 21780-21780/ E/---: 1  1
06-04 22:47:41.850 21780-21780/ E/---: 0  2
06-04 22:47:41.850 21780-21780/ E/---: 1  3
06-04 22:47:41.850 21780-21780/ E/---: 0  4
06-04 22:47:41.850 21780-21780/ E/---: 1  5

window()

用法:

public final Observable<Observable<T>> window(long count)
public final Observable<Observable<T>> window(long count, long skip) 
...

count:事件数量
作用:
发送指定数量事件时,就将这些事件分为一组。
代码:

Observable.just(1,2,3,4,5).window(3).subscribe(new Observer<Observable<Integer>>() {
           @Override
           public void onSubscribe(Disposable d) {
               Log.e("---","---onSubscribe");
           }

           @Override
           public void onNext(final Observable<Integer> value) {
                value.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("---","value---onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("---","value---"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("---","value---onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.e("---","value---onComplete");
                    }
                });
           }

           @Override
           public void onError(Throwable e) {
           }

           @Override
           public void onComplete() {
               Log.e("---","---onComplete");
           }
       });

上述代码代表每发送3个事件为一组,执行结果如下:

06-04 22:59:16.776 30167-30167/ E/---: ---onSubscribe
06-04 22:59:16.777 30167-30167/ E/---: value---onSubscribe
06-04 22:59:16.777 30167-30167/ E/---: value---1
06-04 22:59:16.777 30167-30167/ E/---: value---2
06-04 22:59:16.777 30167-30167/ E/---: value---3
06-04 22:59:16.777 30167-30167/ E/---: value---onComplete
06-04 22:59:16.777 30167-30167/ E/---: value---onSubscribe
06-04 22:59:16.777 30167-30167/ E/---: value---4
06-04 22:59:16.777 30167-30167/ E/---: value---5
06-04 22:59:16.777 30167-30167/ E/---: value---onComplete
06-04 22:59:16.777 30167-30167/ E/---: ---onComplete