RxJava使用操作符(二)——变换操作符

时间:2021-08-02 17:50:35

1、简介

RxJava 的作用之强大相信每个用过的同学都深有体会,在介绍基本的创建操作符之后,我们继续来看变换操作符,正是因为变换操作符的存在,RxJava的才可以满足不同场景的功能。

变换操作符的作用:对事件序列中的事件 / 整个事件序列 进行加工处理(即变换),使得其转变成不同的事件 / 整个事件序列

2、操作符类型

  • map()
  • flatMap() 
  • ConcatMap ()
  • buffer ()

3、使用介绍

  • map

(1)被观察着发送的每个数据经过固定的函数处理,返回某个数据类型

(2)使用场景 ——数据类型转换

(3)具体实例,将数字转换为字符串:

Observable.just(1, 2, 3)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return String.valueOf(integer);
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                
            }
        });
从上面可以看出,map() 将参数中的 Integer 类型对象转换成一个 String类型 对象后返回,事件的参数类型也转换为String


  • flatMap

(1)将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送

(2)原理:

  • 为每个分发序列中的事件都创建一个Observable对象
  • 对创建的每个Observable对象都执行相应的方法进行装换并储存
  • 对变换后的Observable对象组合成一个对象并发送给观察者

(3)使用场景——对发送的事件进行拆分装换成新的事件

(4)具体实例——将每个数字转换为集合并添加一个0

 Observable.just(1, 2, 3)
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        ArrayList<Integer> list = new ArrayList<>();
                        list.add(0);
                        list.add(integer);
                        return Observable.fromIterable(list);
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });

输出结果:


注:此时输出的顺序是无序的

  • ConcatMap 

(1)作用和flatMap()相同,不过 拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序

(2)原理与flatMap相同

(3)使用场景——对发送的事件进行拆分装换成新的事件

(4)具体实例——将每个数字转换为集合并添加一个0

 Observable.just(1, 2, 3)
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        ArrayList<Integer> list = new ArrayList<>();
                        list.add(0);
                        list.add(integer);
                        return Observable.fromIterable(list);
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });

输出结果:

05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 0
05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 1
05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 2
05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 0
05-02 20:16:09.046 15718-15718/com.example.administrator.glide E/concatMap========: 1
05-02 20:16:09.047 15718-15718/com.example.administrator.glide E/concatMap========: 2
05-02 20:16:09.047 15718-15718/com.example.administrator.glide E/concatMap========: 0
05-02 20:16:09.047 15718-15718/com.example.administrator.glide E/concatMap========: 1
05-02 20:16:09.047 15718-15718/com.example.administrator.glide E/concatMap========: 2

注:此时输出的顺序是按照事件分发顺序的

  • buffer 

(1)定期从被观察者中取出固定的事件放到缓存区中,在一起发送

(2)原理:

  • 按照设置的缓存大小和步长拿去事件
  • 发送缓存区的事件

(3)使用场景——缓存发送的事件

(4)使用实例——发送数据

Observable.just(1,2,3,4,5)
                .buffer(3,1)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        
                    }
                });

输出结果:

05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 1
05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 2
05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 3
05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 2
05-02 20:16:09.050 15718-15718/com.example.administrator.glide E/buffer=========: 3
05-02 20:16:09.051 15718-15718/com.example.administrator.glide E/buffer=========: 4
05-02 20:16:09.051 15718-15718/com.example.administrator.glide E/buffer=========: 3
05-02 20:16:09.052 15718-15718/com.example.administrator.glide E/buffer=========: 4
05-02 20:16:09.052 15718-15718/com.example.administrator.glide E/buffer=========: 5
05-02 20:16:09.053 15718-15718/com.example.administrator.glide E/buffer=========: 4
05-02 20:16:09.054 15718-15718/com.example.administrator.glide E/buffer=========: 5
05-02 20:16:09.054 15718-15718/com.example.administrator.glide E/buffer=========: 5

4、开发实例

实现注册后自动登陆的请求,业务逻辑

  • 验证注册信息
  • 注册成功后发起登陆请求
  • 验证登陆信息
  • 登陆成功

模拟注册请求,注册一个用户账号:

 final Observable<User> observableLogin = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext(new User("AAAAA","123"));
            }
        });

模拟登陆请求:

Observable<User> observableRegister = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext(new User("AAAAA","123"));
            }
        });

实现业务逻辑:

observableRegister.filter(new Predicate<User>() {
            @Override
            public boolean test(User user) throws Exception {
                return user.getName().equals("AAAAA");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<User>() {
                    @Override
                    public void accept(User user) throws Exception {
                        Log.e("========",user.getName()+"注册成功");
                    }
                }).observeOn(Schedulers.io())
                .flatMap(new Function<User, ObservableSource<User>>() {
                    @Override
                    public ObservableSource<User> apply(User user) throws Exception {
                        return observableLogin;
                    }
                }).filter(new Predicate<User>() {
            @Override
            public boolean test(User user) throws Exception {
                return user.getName().equals("AAAAA") && user.getPswd().equals("123");
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<User>() {
                    @Override
                    public void accept(User user) throws Exception {
                        Log.e("========",user.getName() + "登陆成功");
                    }
                });

上面过程使用observableRegister在注册时先验证注册信息是否为“AAAAA”,验证成功后完成注册,注册成功后切换为登陆的被观察者对象,完成登陆功能。

输出结果:

05-03 01:36:40.142 2310-2310/? E/========: AAAAA注册成功
05-03 01:36:40.316 2310-2310/? E/========: AAAAA登陆成功

这种使用在平时的业务逻辑中会起很大作用,可以在实现更能的同时使代码逻辑更清晰。