最近RxJava越来越流行了,在移动端也越来越多的项目开始使用这个框架了!唯一的问题就是上手不容易,尤其是大部分人之前都是使用命令式编程语言。但是一旦你弄明白了,你就会发现RxJava真是太棒了。
这里仅仅是帮助你了解RxJava,整个系列共有四篇文章,希望你看完这四篇文章之后能够了解RxJava背后的思想,并且喜欢上RxJava。
基础
RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件。这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据。。。)
一个Observable可以发出零个或者多个事件,知道结束或者出错。每发出一个事件,就会调用它的Subscriber的onNext方法,最后调用Subscriber.onNext()或者Subscriber.onError()结束。
Rxjava的看起来很想设计模式中的观察者模式,但是有一点明显不同,那就是如果一个Observerble没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。
Hello World
创建一个Observable对象很简单,直接调用Observable.create即可- Observable<String> myObservable = Observable.create(
- new Observable.OnSubscribe<String>() {
- @Override
- public void call(Subscriber<? super String> sub) {
- sub.onNext("Hello, world!");
- sub.onCompleted();
- }
- }
- );
- Subscriber<String> mySubscriber = new Subscriber<String>() {
- @Override
- public void onNext(String s) { System.out.println(s); }
- @Override
- public void onCompleted() { }
- @Override
- public void onError(Throwable e) { }
- };
- myObservable.subscribe(mySubscriber);
更简洁的代码
是不是觉得仅仅为了打印一个hello world要写这么多代码太啰嗦?我这里主要是为了展示RxJava背后的原理而采用了这种比较啰嗦的写法,RxJava其实提供了很多便捷的函数来帮助我们减少代码。首先来看看如何简化Observable对象的创建过程。RxJava内置了很多简化创建Observable对象的函数,比如Observable.just就是用来创建只发出一个事件就结束的Observable对象,上面创建Observable对象的代码可以简化为一行
- Observable<String> myObservable = Observable.just("Hello, world!");
- Action1<String> onNextAction = new Action1<String>() {
- @Override
- public void call(String s) {
- System.out.println(s);
- }
- };
- myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);
- myObservable.subscribe(onNextAction);
- // Outputs "Hello, world!"
- Observable.just("Hello, world!")
- .subscribe(new Action1<String>() {
- @Override
- public void call(String s) {
- System.out.println(s);
- }
- });
- Observable.just("Hello, world!")
- .subscribe(s -> System.out.println(s));
变换
让我们做一些更有趣的事情吧!比如我想在hello world中加上我的签名,你可能会想到去修改Observable对象:
- Observable.just("Hello, world! -Dan")
- .subscribe(s -> System.out.println(s));
那么在Subscriber中对事件进行修改怎么样呢?比如下面的代码:
- Observable.just("Hello, world!")
- .subscribe(s -> System.out.println(s + " -Dan"));
操作符(Operators)
操作符就是为了解决对Observable对象的变换的问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件。RxJava提供了很多很有用的操作符。比如map操作符,就是用来把把一个事件转换为另一个事件的。
- Observable.just("Hello, world!")
- .map(new Func1<String, String>() {
- @Override
- public String call(String s) {
- return s + " -Dan";
- }
- })
- .subscribe(s -> System.out.println(s));
- Observable.just("Hello, world!")
- .map(s -> s + " -Dan")
- .subscribe(s -> System.out.println(s));
map操作符进阶
map操作符更有趣的一点是它不必返回Observable对象返回的类型,你可以使用map操作符返回一个发出新的数据类型的observable对象。比如上面的例子中,subscriber并不关心返回的字符串,而是想要字符串的hash值
- Observable.just("Hello, world!")
- .map(new Func1<String, Integer>() {
- @Override
- public Integer call(String s) {
- return s.hashCode();
- }
- })
- .subscribe(i -> System.out.println(Integer.toString(i)));
- Observable.just("Hello, world!")
- .map(s -> s.hashCode())
- .subscribe(i -> System.out.println(Integer.toString(i)));
- Observable.just("Hello, world!")
- .map(s -> s.hashCode())
- .map(i -> Integer.toString(i))
- .subscribe(s -> System.out.println(s));
不服?
是不是觉得我们的例子太简单,不足以说服你?你需要明白下面的两点:1.Observable和Subscriber可以做任何事情
Observable可以是一个数据库查询,Subscriber用来显示查询结果;Observable可以是屏幕上的点击事件,Subscriber用来响应点击事件;Observable可以是一个网络请求,Subscriber用来显示请求结果。
2.Observable和Subscriber是独立于中间的变换过程的。
在Observable和Subscriber中间可以增减任何数量的map。整个系统是高度可组合的,操作数据是一个很简单的过程。 首先先看一个例子:
准备工作
假设我有这样一个方法:这个方法根据输入的字符串返回一个网站的url列表(啊哈,搜索引擎)
- Observable<List<String>> query(String text);
- query("Hello, world!")
- .subscribe(urls -> {
- for (String url : urls) {
- System.out.println(url);
- }
- });
当然,我可以使用map操作符,map的输入是urls列表,处理的时候还是要for each遍历,一样很蛋疼。
万幸,还有Observable.from()方法,它接收一个集合作为输入,然后每次输出一个元素给subscriber:
- Observable.from("url1", "url2", "url3")
- .subscribe(url -> System.out.println(url));
- query("Hello, world!")
- .subscribe(urls -> {
- Observable.from(urls)
- .subscribe(url -> System.out.println(url));
- });
改进
救星来了,他就是flatMap()。Observable.flatMap()接收一个Observable的输出作为输入,同时输出另外一个Observable。直接看代码:
- query("Hello, world!")
- .flatMap(new Func1<List<String>, Observable<String>>() {
- @Override
- public Observable<String> call(List<String> urls) {
- return Observable.from(urls);
- }
- })
- .subscribe(url -> System.out.println(url));
- query("Hello, world!")
- .flatMap(urls -> Observable.from(urls))
- .subscribe(url -> System.out.println(url));
这部分也是我当初学RxJava的时候最难理解的部分,一旦我突然领悟了,RxJava的很多疑问也就一并解决了。
还可以更好
flatMap()实在不能更赞了,它可以返回任何它想返回的Observable对象。比如下面的方法:
- // 返回网站的标题,如果404了就返回null
- Observable<String> getTitle(String URL);
- query("Hello, world!")
- .flatMap(urls -> Observable.from(urls))
- .flatMap(new Func1<String, Observable<String>>() {
- @Override
- public Observable<String> call(String url) {
- return getTitle(url);
- }
- })
- .subscribe(title -> System.out.println(title));
- query("Hello, world!")
- .flatMap(urls -> Observable.from(urls))
- .flatMap(url -> getTitle(url))
- .subscribe(title -> System.out.println(title));
不止这些,我还将两个API的调用组合到一个链式调用中了。我们可以将任意多个API调用链接起来。大家应该都应该知道同步所有的API调用,然后将所有API调用的回调结果组合成需要展示的数据是一件多么蛋疼的事情。这里我们成功的避免了callback hell(多层嵌套的回调,导致代码难以阅读维护)。现在所有的逻辑都包装成了这种简单的响应式调用。
丰富的操作符
目前为止,我们已经接触了两个操作符,RxJava中还有更多的操作符,那么我们如何使用其他的操作符来改进我们的代码呢?getTitle()返回null如果url不存在。我们不想输出"null",那么我们可以从返回的title列表中过滤掉null值!
- query("Hello, world!")
- .flatMap(urls -> Observable.from(urls))
- .flatMap(url -> getTitle(url))
- .filter(title -> title != null)
- .subscribe(title -> System.out.println(title));
如果我们只想要最多5个结果:
- query("Hello, world!")
- .flatMap(urls -> Observable.from(urls))
- .flatMap(url -> getTitle(url))
- .filter(title -> title != null)
- .take(5)
- .subscribe(title -> System.out.println(title));
如果我们想在打印之前,把每个标题保存到磁盘:
- query("Hello, world!")
- .flatMap(urls -> Observable.from(urls))
- .flatMap(url -> getTitle(url))
- .filter(title -> title != null)
- .take(5)
- .doOnNext(title -> saveTitle(title))
- .subscribe(title -> System.out.println(title));
看到这里操作数据流是多么简单了么。你可以添加任意多的操作,并且不会搞乱你的代码。
RxJava包含了大量的操作符。操作符的数量是有点吓人,但是很值得你去挨个看一下,这样你可以知道有哪些操作符可以使用。弄懂这些操作符可能会花一些时间,但是一旦弄懂了,你就完全掌握了RxJava的威力。
你甚至可以编写自定义的操作符!这篇blog不打算将自定义操作符,如果你想的话,清自行Google吧。
感觉如何?
好吧,你是一个怀疑主义者,并且还很难被说服,那为什么你要关心这些操作符呢?因为操作符可以让你对数据流做任何操作。
将一系列的操作符链接起来就可以完成复杂的逻辑。代码被分解成一系列可以组合的片段。这就是响应式函数编程的魅力。用的越多,就会越多的改变你的编程思维。
另外,RxJava也使我们处理数据的方式变得更简单。在最后一个例子里,我们调用了两个API,对API返回的数据进行了处理,然后保存到磁盘。但是我们的Subscriber并不知道这些,它只是认为自己在接收一个Observable<String>对象。良好的封装性也带来了编码的便利!
错误处理
到目前为止,我们都没怎么介绍onComplete()和onError()函数。这两个函数用来通知订阅者,被观察的对象将停止发送数据以及为什么停止(成功的完成或者出错了)。
下面的代码展示了怎么使用这两个函数:
代码中的potentialException() 和 anotherPotentialException()有可能会抛出异常。每一个Observerable对象在终结的时候都会调用onCompleted()或者onError()方法,所以Demo中会打印”Completed!”或者”Ouch!”。
这种模式有以下几个优点:
1.只要有异常发生onError()一定会被调用
这极大的简化了错误处理。只需要在一个地方处理错误即可以。
2.操作符不需要处理异常
将异常处理交给订阅者来做,Observerable的操作符调用链中一旦有一个抛出了异常,就会直接执行onError()方法。
3.你能够知道什么时候订阅者已经接收了全部的数据。
知道什么时候任务结束能够帮助简化代码的流程。(虽然有可能Observable对象永远不会结束)
我觉得这种错误处理方式比传统的错误处理更简单。传统的错误处理中,通常是在每个回调中处理错误。这不仅导致了重复的代码,并且意味着每个回调都必须知道如何处理错误,你的回调代码将和调用者紧耦合在一起。
使用RxJava,Observable对象根本不需要知道如何处理错误!操作符也不需要处理错误状态-一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。
调度器
假设你编写的Android app需要从网络请求数据(感觉这是必备的了,还有单机么?)。网络请求需要花费较长的时间,因此你打算在另外一个线程中加载数据。那么问题来了!
编写多线程的Android应用程序是很难的,因为你必须确保代码在正确的线程中运行,否则的话可能会导致app崩溃。最常见的就是在非主线程更新UI。
使用RxJava,你可以使用subscribeOn()指定观察者代码运行的线程,使用observerOn()指定订阅者运行的线程:
myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
是不是很简单?任何在我的Subscriber前面执行的代码都是在I/O线程中运行。最后,操作view的代码在主线程中运行.
最棒的是我可以把subscribeOn()和observerOn()添加到任何Observable对象上。这两个也是操作符!。我不需要关心Observable对象以及它上面有哪些操作符。仅仅运用这两个操作符就可以实现在不同的线程中调度。
如果使用AsyncTask或者其他类似的,我将不得不仔细设计我的代码,找出需要并发执行的部分。使用RxJava,我可以保持代码不变,仅仅在需要并发的时候调用这两个操作符就可以。
订阅(Subscriptions)
当调用Observable.subscribe(),会返回一个Subscription对象。这个对象代表了被观察者和订阅者之间的联系。
ubscription subscription = Observable.just("Hello, World!")
.subscribe(s -> System.out.println(s));
你可以在后面使用这个Subscription对象来操作被观察者和订阅者之间的联系.
subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
// Outputs "Unsubscribed=true"
RxJava的另外一个好处就是它处理unsubscribing的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用unsubscribe将会在他当前执行的地方终止。不需要做任何额外的工作!
总结
记住这个系列仅仅是对RxJava的一个入门介绍。RxJava中有更多的我没介绍的功能等你探索(比如backpressure)。当然我也不是所有的代码都使用响应式的方式–仅仅当代码复杂到我想将它分解成简单的逻辑的时候,我才使用响应式代码。
在第1,2,3篇中,我大概介绍了RxJava是怎么使用的。下面我会介绍如何在Android中使用RxJava.
RxAndroid
RxAndroid是RxJava的一个针对Android平台的扩展。它包含了一些能够简化Android开发的工具。
首先,AndroidSchedulers提供了针对Android的线程系统的调度器。需要在UI线程中运行某些代码?很简单,只需要使用AndroidSchedulers.mainThread():
1234567 | retrofitService.getImage(url) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(bitmap -> myImageView.setImageBitmap(bitmap)); |
如果你已经创建了自己的Handler,你可以使用HandlerThreadScheduler1将一个调度器链接到你的handler上。
接着要介绍的就是AndroidObservable,它提供了跟多的功能来配合Android的生命周期。bindActivity()和bindFragment()方法默认使用AndroidSchedulers.mainThread()来执行观察者代码,这两个方法会在Activity或者Fragment结束的时候通知被观察者停止发出新的消息。
AndroidObservable.bindActivity(this, retrofitService.getImage(url))
.subscribeOn(Schedulers.io())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap);
我自己也很喜欢AndroidObservable.fromBroadcast()方法,它允许你创建一个类似BroadcastReceiver的Observable对象。下面的例子展示了如何在网络变化的时候被通知到:
IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
.subscribe(intent -> handleConnectivityChange(intent));
最后要介绍的是ViewObservable,使用它可以给View添加了一些绑定。如果你想在每次点击view的时候都收到一个事件,可以使用ViewObservable.clicks(),或者你想监听TextView的内容变化,可以使用ViewObservable.text()。
ViewObservable.clicks(mCardNameEditText, false)
.subscribe(view -> handleClick(view));
Retrofit
大名鼎鼎的Retrofit库内置了对RxJava的支持。通常调用发可以通过使用一个Callback对象来获取异步的结果:
@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback cb);
使用RxJava,你可以直接返回一个Observable对象。
@GET("/user/{id}/photo")
Observable getUserPhoto(@Path("id") int id);
1
2
现在你可以随意使用Observable对象了。你不仅可以获取数据,还可以进行变换。
Retrofit对Observable的支持使得它可以很简单的将多个REST请求结合起来。比如我们有一个请求是获取照片的,还有一个请求是获取元数据的,我们就可以将这两个请求并发的发出,并且等待两个结果都返回之后再做处理:
Observable.zip(
service.getUserPhoto(id),
service.getPhotoMetadata(id),
(photo, metadata) -> createPhotoWithData(photo, metadata))
.subscribe(photoWithData -> showPhoto(photoWithData));
在第二篇里我展示过一个类似的例子(使用flatMap())。这里我只是想展示以下使用RxJava+Retrofit可以多么简单地组合多个REST请求。
遗留代码,运行极慢的代码
Retrofit可以返回Observable对象,但是如果你使用的别的库并不支持这样怎么办?或者说一个内部的内码,你想把他们转换成Observable的?有什么简单的办法没?
绝大多数时候Observable.just() 和 Observable.from() 能够帮助你从遗留代码中创建 Observable 对象:
private Object oldMethod() { ... }
public Observable newMethod() {
return Observable.just(oldMethod());
上面的例子中如果oldMethod()足够快是没有什么问题的,但是如果很慢呢?调用oldMethod()将会阻塞住他所在的线程。
为了解决这个问题,可以参考我一直使用的方法–使用defer()来包装缓慢的代码:
private Object slowBlockingMethod() { ... }
public Observable newMethod() {
return Observable.defer(() -> Observable.just(slowBlockingMethod()));
现在,newMethod()的调用不会阻塞了,除非你订阅返回的observable对象。
生命周期
我把最难的不分留在了最后。如何处理Activity的生命周期?主要就是两个问题:
1.在configuration改变(比如转屏)之后继续之前的Subscription。
比如你使用Retrofit发出了一个REST请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?
2.Observable持有Context导致的内存泄露
这个问题是因为创建subscription的时候,以某种方式持有了context的引用,尤其是当你和view交互的时候,这太容易发生!如果Observable没有及时结束,内存占用就会越来越大。
不幸的是,没有银弹来解决这两个问题,但是这里有一些指导方案你可以参考。
第一个问题的解决方案就是使用RxJava内置的缓存机制,这样你就可以对同一个Observable对象执行unsubscribe/resubscribe,却不用重复运行得到Observable的代码。cache() (或者 replay())会继续执行网络请求(甚至你调用了unsubscribe也不会停止)。这就是说你可以在Activity重新创建的时候从cache()的返回值中创建一个新的Observable对象。
Observable request = service.getUserPhoto(id).cache();
Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));
// ...When the Activity is being recreated...
sub.unsubscribe();
// ...Once the Activity is recreated...
request.subscribe(photo -> handleUserPhoto(photo));
注意,两次sub是使用的同一个缓存的请求。当然在哪里去存储请求的结果还是要你自己来做,和所有其他的生命周期相关的解决方案一延虎,必须在生命周期外的某个地方存储。(retained fragment或者单例等等)。
第二个问题的解决方案就是在生命周期的某个时刻取消订阅。一个很常见的模式就是使用CompositeSubscription来持有所有的Subscriptions,然后在onDestroy()或者onDestroyView()里取消所有的订阅。
123456789101112131415161718192021 | private = new private doSomething() { mCompositeSubscription.add( AndroidObservable.bindActivity( this , Observable.just( "Hello, World!" )) .subscribe(s -> System.out.println(s))); } @Overrideprotected onDestroy() { super .onDestroy(); mCompositeSubscription.unsubscribe(); } |
你可以在Activity/Fragment的基类里创建一个CompositeSubscription对象,在子类中使用它。
注意! 一旦你调用了 CompositeSubscription.unsubscribe(),这个CompositeSubscription对象就不可用了, 如果你还想使用CompositeSubscription,就必须在创建一个新的对象了。
两个问题的解决方案都需要添加额外的代码,如果谁有更好的方案,欢迎告诉我。
总结
RxJava还是一个很新的项目,RxAndroid更是。RxAndroid目前还在活跃开发中,也没有多少好的例子。我打赌一年之后我的一些建议就会被看做过时了。
事实上市,RxAndroid之前的版本确实是有点换乱,因此最近进行了一次大得重构。这里有详细的说明,概括来说就是:
从头开始对RxAndroid进行模化的改造,让这个库变成一个可服用的,可组合的模块。
这个目标已经达成,但是如果你升级到1.0,你可能会很奇怪:东西都跑到哪里去了,如何才能让我的代码通过编译?
RxAndroid
AndroidSchedulers是RxAndroid中唯一保留下来的,但是一些方法签名已经变了。
迁移部分
WidgetObservable和ViewObservable被打包进了RxBinding项目中,并且做了一些改进。
LifecycleObservable迁移到了RxLifecycle项目中。另外需要注意的是,这里进行了一些相对比较大幅度的重构,所以使用的时候请参考一下修改日志。
ContentObservable.fromSharedPreferencesChanges()迁移到了rx-preferences项目。
删除部分
AppObservable连同它的bind方法已经被完全删除掉了。AppObservable本身有很多问题:
AppObservable尝试来做自动unsubscribe,但是仅仅是在Activity或者Fragment已经paused之后Observable再发出一个事件,才会触发自动unsubscribe。也就是说,如果Activity或者Fragment如果没有paused,一个不会complete的Observable将永远不会被unsubscribe。
AppObservable被设计用来在pause之后避免继续受到消息,但是因为HandlerScheduler的一个bug,导致某些场景存在缺陷。
AppObservable自动调用了observeOn(AndroidSchedulers.mainThread()),不管你是不是想在主线程这么做。
换句话来说,AppObservable并没有做到它所描述的功能,它的可定制性也比较差,并且还会有一些非期望的副作用。
删除AppObservable的时候,可以这样做:
手动的处理Subscription(或者使用RxLifecycle),来在适当的时机做unsubscribe。检查一下你是否需要使用observeOn(AndroidSchedulers.mainThread())。