RxJava 的使用详解(一)
1.作用
RxJava的目的就是异步。
RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。
2.工程引用
要应用RxJava,需要在项目中引入依赖:
compile ‘io.reactivex:rxandroid:1.2.1’
compile ‘io.reactivex.rxjava2:rxjava:2.0.4’
3.对RxJava的理解
RxJava实质使用的是观察者模式,对于观察者模式我在这里首先做一个介绍,观察者模式主要由以下几个角色实现:
Observable:被观察者,他可以发出一些事件。
Observer:观察者,他时刻关注着被观察者,并接受来自被观察者的事件,并对这些事件作出反应。
subscribe:订阅,观察者通过subscribe()来订阅,也就是观察者绑定被观察者
Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable。
观察者模式(又被称为发布-订阅(Publish/Subscribe)模式,属于行为型模式的一种,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态变化时,会通知所有的观察者对象,使他们能够自动更新自己。
而RxJava中的观察者模式与传统的观察者模式还是有些区别的,他不仅有普通事件的onNext()方法,还定义了onError(),onComplete(),onSubscribe().
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
onComplete(): 事件队列完结时调用该方法。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
onSubscribe():RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于RxJava1.x中的Subscription,用于解除订阅。
介绍完了RxJava的基本构成与概念,那我们就该谈谈他的异步了,RxJava他就是处理异步的,可以完全替代AsycTask和Handler,用户可以让被观察者Observable去开启线程,执行操作,执行完之后触发回调,然后观察者Observer去更新UI视图。
3.具体使用
RxJava有很多种创建方法,我们先以最基本的构建方法来认识RxJava:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//执行一些其他操作
//执行完毕,触发回调,通知观察者
e.onNext("我发生变化了");
}
});
创建Observer对象
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//2.0新增方法,用于解绑
}
@Override
public void onNext(String aLong) {
//观察者接收来自被观察者的信息,并作出反应
System.out.println("我感觉到你的变化了");
}
@Override
public void onError(Throwable e) {
//事件队列异常。在事件处理过程中出异常时
}
@Override
public void onComplete() {
//事件队列完结时调用该方法
}
};
观察者订阅被观察者:
observable.subscribe(observer);
以上就是RxJava使用create方法创建实例的具体使用方法,我相信大家都明白了。下面我们来看其他创建方式:
just方式:
Observable observable = Observable.just(“我只是一个字符串”);
使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。
defer()方式:
Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just("我还是一个字符串");
}
});
当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。
fromIterable()方式:
List<String> list = new ArrayList<String>();
for(int i =0;i<5;i++){
list.add("我不想作字符串"+i);
}
Observable<String> observable = Observable.fromIterable((Iterable<String>) list);
使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。
range( )方式:
Observable<Integer> observable = Observable.range(1,10);
创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。上述表示发射1到10的数。即调用10次nNext()方法,依次传入1-10数字。
timer( )方式:
Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);
创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。
除此之外,RxJava中还有许多操作符。操作符就是用于在Observable和最终的Observer之间,通过转换Observable为其他观察者对象的过程,修改发出的事件,最终将最简洁的数据传递给Observer对象。下面我们介绍一些比较常用的操作符。
map()操作符:
Observable<Integer> observable = Observable.just("hello").map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.length();
}
});
map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
flatMap()操作符:
Observable<Object> observable = Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
});
flatMap()对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。
filter()操作符:
Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).filter(new Predicate<Object>() {
@Override
public boolean test(Object s) throws Exception {
String newStr = (String) s;
if (newStr.charAt(5) - '0' > 5) {
return true;
}
return false;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println((String)o);
}
});
filter()操作符根据test()方法中,根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。
take()操作符:
Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
System.out.println((String)s);
}
});
输出最多指定数量的结果。
doOnNext():
Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("准备工作");
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
System.out.println((String)s);
}
});
doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。
以上就是一些常用的操作符,通过操作符的使用。我们每次调用一次操作符,就进行一次观察者对象的改变,同时将需要传递的数据进行转变,最终Observer对象获得想要的数据。
以网络加载为例,我们通过Observable开启子线程,进行一些网络请求获取数据的操作,获得到网络数据后,然后通过操作符进行转换,获得我们想要的形式的数据,然后传递给Observer对象。