RxJava 的使用详解(一)

时间:2021-07-09 17:50:54

RxJava 的使用详解(一)
1.作用

RxJava的目的就是异步。
RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。

2.工程引用

要应用RxJava,需要在项目中引入依赖:

compile ‘io.reactivex:rxandroid:1.2.1’
compile ‘io.reactivex.rxjava2:rxjava:2.0.4’

3.对RxJava的理解
RxJava实质使用的是观察者模式,对于观察者模式我在这里首先做一个介绍,观察者模式主要由以下几个角色实现:
Observable:被观察者,他可以发出一些事件。
Observer:观察者,他时刻关注着被观察者,并接受来自被观察者的事件,并对这些事件作出反应。
subscribe:订阅,观察者通过subscribe()来订阅,也就是观察者绑定被观察者
Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable。

观察者模式(又被称为发布-订阅(Publish/Subscribe)模式,属于行为型模式的一种,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态变化时,会通知所有的观察者对象,使他们能够自动更新自己。
而RxJava中的观察者模式与传统的观察者模式还是有些区别的,他不仅有普通事件的onNext()方法,还定义了onError(),onComplete(),onSubscribe().

onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

onComplete(): 事件队列完结时调用该方法。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。

onSubscribe():RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于RxJava1.x中的Subscription,用于解除订阅。
介绍完了RxJava的基本构成与概念,那我们就该谈谈他的异步了,RxJava他就是处理异步的,可以完全替代AsycTask和Handler,用户可以让被观察者Observable去开启线程,执行操作,执行完之后触发回调,然后观察者Observer去更新UI视图。

3.具体使用
RxJava有很多种创建方法,我们先以最基本的构建方法来认识RxJava:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //执行一些其他操作

                //执行完毕,触发回调,通知观察者
                e.onNext("我发生变化了");
            }
        });

创建Observer对象

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
        //2.0新增方法,用于解绑
            }

            @Override

            public void onNext(String aLong) {
            //观察者接收来自被观察者的信息,并作出反应
                System.out.println("我感觉到你的变化了");
            }

            @Override
            public void onError(Throwable e) {
        //事件队列异常。在事件处理过程中出异常时
            }

            @Override
            public void onComplete() {
        //事件队列完结时调用该方法
            }
        };

观察者订阅被观察者:

 observable.subscribe(observer);

以上就是RxJava使用create方法创建实例的具体使用方法,我相信大家都明白了。下面我们来看其他创建方式:

just方式:

Observable observable = Observable.just(“我只是一个字符串”);
使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。

defer()方式:

 Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just("我还是一个字符串");
            }
        });

当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。

fromIterable()方式:

 List<String> list = new ArrayList<String>();
        for(int i =0;i<5;i++){
            list.add("我不想作字符串"+i);
        }
        Observable<String> observable = Observable.fromIterable((Iterable<String>) list);

使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。

range( )方式:

Observable<Integer> observable = Observable.range(1,10);

创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。上述表示发射1到10的数。即调用10次nNext()方法,依次传入1-10数字。

timer( )方式:

Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。

除此之外,RxJava中还有许多操作符。操作符就是用于在Observable和最终的Observer之间,通过转换Observable为其他观察者对象的过程,修改发出的事件,最终将最简洁的数据传递给Observer对象。下面我们介绍一些比较常用的操作符。

map()操作符:

Observable<Integer> observable = Observable.just("hello").map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return s.length();
            }
        });

map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。

flatMap()操作符:

 Observable<Object> observable = Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        });

flatMap()对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。

filter()操作符:

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).filter(new Predicate<Object>() {
            @Override
            public boolean test(Object s) throws Exception {
                String newStr = (String) s;
                if (newStr.charAt(5) - '0' > 5) {
                    return true;
                }
                return false;
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println((String)o);
            }
        });

filter()操作符根据test()方法中,根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。

take()操作符:

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                System.out.println((String)s);
            }
        });

输出最多指定数量的结果。

doOnNext():

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).doOnNext(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println("准备工作");
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                System.out.println((String)s);
            }
        });

doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。

以上就是一些常用的操作符,通过操作符的使用。我们每次调用一次操作符,就进行一次观察者对象的改变,同时将需要传递的数据进行转变,最终Observer对象获得想要的数据。
以网络加载为例,我们通过Observable开启子线程,进行一些网络请求获取数据的操作,获得到网络数据后,然后通过操作符进行转换,获得我们想要的形式的数据,然后传递给Observer对象。

相关文章