RxJava系列教程:
1. RxJava使用介绍 【视频教程】
2. RxJava操作符
• Creating Observables(Observable的创建操作符) 【视频教程】
• Transforming Observables(Observable的转换操作符) 【视频教程】
• Filtering Observables(Observable的过滤操作符) 【视频教程】
• Combining Observables(Observable的组合操作符) 【视频教程】
• Error Handling Operators(Observable的错误处理操作符) 【视频教程】
• Observable Utility Operators(Observable的辅助性操作符) 【视频教程】
• Conditional and Boolean Operators(Observable的条件和布尔操作符) 【视频教程】
• Mathematical and Aggregate Operators(Observable数学运算及聚合操作符) 【视频教程】
• 其他如observable.toList()、observable.connect()、observable.publish()等等; 【视频教程】
3. RxJava Observer与Subcriber的关系 【视频教程】
4. RxJava线程控制(Scheduler) 【视频教程】
5. RxJava 并发之数据流发射太快如何办(背压(Backpressure)) 【视频教程】
RxJava到底是什么?使用RxJava到底有什么好处呢?其实RxJava是ReactiveX中使用Java语言实现的版本,目前ReactiveX已经实现的语言版本有:
Java: RxJava
JavaScript: RxJS
C#: Rx.NET
C#(Unity): UniRx
Scala: RxScala
Clojure: RxClojure
C++: RxCpp
Ruby: Rx.rb
Python: RxPY
Groovy: RxGroovy
JRuby:RxJRuby
Kotlin: RxKotlin
可以看出ReactiveX在开发应用中如此的火爆。那到底什么是ReactiveX呢?简单来说,ReactiveX就是”观察者模式+迭代器模式+函数式编程”,它扩展了观察者模式,通过使用可观察的对象序列流来表述一系列事件,订阅者进行占点观察并对序列流做出反应(或持久化或输出显示等等);借鉴迭代器模式,对多个对象序列进行迭代输出,订阅者可以依次处理不同的对象序列;使用函数式编程思想(functional programming),极大简化问题解决的步骤。
RxJava的基本概念
RxJava最核心的两个东西就是Observables(被观察者,也就是事件源)和Subscribers(观察者),由Observables发出一系列的事件,Subscribers进行订阅接收并进行处理,看起来就好像是设计模式中的观察者模式,但是跟观察者模式不同的地方就在于,如果没有观察者(即Subscribers),Observables是不会发出任何事件的。
由于Observables发出的事件并不仅限于一个,有可能是多个的,如何确保每一个事件都能发送到Subscribers上进行处理呢?这里就借鉴了设计模式的迭代器模式,对事件进行迭代轮询(next()、hasNext()),在迭代过程中如果出现异常则直接抛出(throws Exceptions),下表是Observable和迭代器(Iterable)的对比:
事件(event) | 迭代器(Iterable) | Observable |
---|---|---|
接收数据 | T next() | onNext(T) |
发现错误 | throws Exception | onError(Exception) |
迭代完成 | !hasNext() | onCompleted() |
与迭代器模式不同的地方在于,迭代器模式在事件处理上采用的是“同步/拉式”的方式,而Observable采用的是“异步/推式”的方式,对于Subscriber(观察者)而言,这种方式会更加灵活。
Rxjava的看起来很像设计模式中的观察者模式,但是有一点明显不同,那就是如果一个Observerble没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。
补充概念
- Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;
- Observer:接收源,英文释义“观察者”,没错!就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;
- Subject:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源,为避免初学者被混淆,本章将不对Subject做过多的解释和使用,重点放在Observable和Observer上,先把最基本方法的使用学会,后面再学其他的都不是什么问题;
- Subscriber:“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber实现了Observer接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe( )方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;
- Subscription :Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;
- Action0:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2…Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推;
- Func0:与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1…Func9;
Hello World
创建一个Observable对象很简单,直接调用Observable.create即可。
Observable<String> mObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);
这里定义的Observable对象仅仅发出一个Hello World字符串,然后就结束了。接着我们创建一个Subscriber来处理Observable对象发出的字符串。
Subscriber<String> mSubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
这里subscriber仅仅就是打印observable发出的字符串。通过subscribe函数就可以将我们定义的mObservable对象和mSubscriber对象关联起来,这样就完成了mSubscriber对mObservable的订阅。
mObservable.subscribe(mSubscriber);
一旦mSubscriber订阅了mObservable,mObservable就是调用mSubscriber对象的onNext和onComplete方法,mSubscriber就会打印出Hello World!
更简洁的代码(RxJava的流式API调用)
上面的代码最终可以写成这样:
//这就是RxJava的流式API调用
Observable.just("Hello, world!")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
just是RxJava中的创建操作符,RxJava的强大性就来自于它所定义的操作符。后续文章我们会逐一介绍RxJava的操作符。
上面就是一个非常简易的RxJava流式API的调用:同一个调用主体一路调用下来,一气呵成。
由于被观察者产生事件,是事件的起点,那么开头就是用Observable这个主体调用来创建被观察者,产生事件,为了保证流式API调用规则,就直接让Observable作为唯一的调用主体,一路调用下去。
流程图如下:
至此,我们就把RxJava的骨架就讲完了,总结一下:
- 创建被观察者,产生事件
- 设置事件传递过程中的过滤,合并,变换等加工操作。
- 订阅一个观察者对象,实现事件最终的处理。
Tips: 当调用订阅操作(即调用Observable.subscribe()方法)的时候,被观察者才真正开始发出事件。