概述:RxJava系列文章第一篇 RxJava入门篇 介绍RxJava是什么和基本使用。
一 RxJava是什么
1 来自官网的介绍
a library for composing asynchronous and event-based programs using observable sequences for the Java VM 这是来自RxJava在GitHub的自我介绍 翻译过来就是 一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
2 来自个人的剖析
我想大多人第一次见识这段话的时候都是懵逼的,当然包括我自己。所以不管你都是看英文,还是看中文。这段话太难理解了。为了更好的理解这段话,我们对这段话进行一次剖析拿到这几个词 “可观测” “异步”“事件”“库”。
3 关键词:“可观测” “异步” “事件” “库”
① “可观测”,当你看到这个词的时候,你的脑海里想到了什么? 是的,没错。就是观察者模式。现在我们来聊聊观察者模式 举一个生活中给的例子 你妈妈和儿子的栗子 ,你看着你的妈妈 一直不说话 突然妈妈手里有了面包,儿子就说“我要吃”。这里 儿子就是观察者 ,妈妈就是被观察者,面包就是 被观察者 发生的改变。“我要吃”就是观察者作出的响应。如下图:
通过“观察”这个工作建立关系
被观察者发生改变 观察者作出响应
RxJava就是一种拓展的观察者模式 。让我们先来记住几个名词:“Observer”(带er就是什么人,那这里就是观察的人,即观察者),“Observable”(带able 就是可以什么什么的,那这里就是可以被观察的人,即被观察者)“subscribe”(订阅,即观察这个动作)如下图:
② “异步”
在安卓开发中,我们总会听到这样一句话,在主线程中不能执行耗时操作,在子线程中不能操作UI。这样就引出了一个“异步这个关键词”,通常我们的做法是使用Asynctask或者handler来执行异步的耗时操作通过接口回调在主线程中拿到数据修改UI。然后你发现大片的代码,大片的嵌套,还有回调 。让我们来感谢RxJava的诞生,有了RxJava ,妈妈再也不用担心我的异步操作了。在RxJava中有“线程调度”这样的说法。可以*的切换方法执行的线程。让我们来看看这个神奇的东东: observeOn(未知线程) ,subscribeOn(未知线程)。这就是在RxJava中使用的线程调度器。我们需要在这两个的方法中 指定调度到哪个线程。让我们记住这两个名词吧,在本篇的例子中会更详细的解释异步和线程调度。
③ “事件”
“事件”这个词,其实也挺抽象的,好吧,让我们回到妈妈和儿子的栗子。在这个模型中,妈妈拿出一个面包,这个行为 就是 被观察者创造出的一个事件,我们先称为“面包事件”而面包就是事件的参数。 儿子想吃面包那么就需要通过妈妈通过事件传递的方法将“面包事件”传递给儿子。那么在RxJava中有哪些事件的传递的方法呢。这里就不介绍了,在后面的栗子中,我们就会详细的介绍。
④ “库”
看到这里我只想说一句“酷”,这就意味着我们使用RxJava是非常简单的。作为Android开发者,我们只需要在gradle中添加依赖就可以了。
4 是时候来一句总结了
RxJava是什么呢?说了这么久,相信大家对RxJava有了一定的了解。那么是时候来一句总结了:RxJava是一个 通过拓展的观察者模式 使用线程调度器来解决异步操作 进行事件传递 的 库。(总结的不好,你来打我啊,哈哈)
二 RxJava怎么用
RxJava到现在已经有 RxJava1.x 和 RxJava2.0 两个版本了。我们分别来看看这个版本是怎么使用的。
1 RxJava1.x的用法:
① 引用库
compile 'io.reactivex:rxjava:1.0.10'
② 创建 观察者 Observer
Observer<String> observer=new Observer<String>() {
@Override
public void onNext(String s) {
Log.i("onNext","传递面包事件");
}
@Override
public void onCompleted() {
Log.i("onCompleted","传递面包事件完成");
}
@Override
public void onError(Throwable e) {
Log.i("onError","传递面包事件出错了");
}
};
在创建观察者的方法中,我们可以看到 有三个传递事件的方法:onNext (向下传递事件时调用) onCompleted(事件传递完成时调用) onError(事件传递出错时调用)。
③ 创建 被观察者 Observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("面包事件");
}
});
这里可以看到 通过create 方法传入了一个 onSubscribe对象 作为参数。当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用onNext()方法,将“面包事件”进行传递。由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递
④ 建立观察关系
observable.subscribe(observer);
被观察者 调用 subscribe 方法 传入 观察者 observer 建立订阅关系。
⑤ 订阅结果
a 只调用 onNext()方法时
subscriber.onNext("面包事件");
com.rxjava1xdemo I/onNext: 面包事件
可以清楚的看到 在 观察者的onNext()方法中 成功的打印出来 “面包事件” 这样就完成的了从 被观察者 到 观察者的事件传递。
b 调用 两次 onNext()方法时
subscriber.onNext("面包事件");
subscriber.onNext("面包事件2");
03-17 08:57:21.691 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 08:57:21.691 2513-2513/com.rxjava1xdemo I/onNext: 面包事件2
可以清楚看到 观察者的调用了两次onNext()方法。可以说明 被观察者可以发射很多次事件。
c 调用 onNext() 和 onCompleted()时
subscriber.onNext("面包事件");
subscriber.onCompleted();
subscriber.onNext("面包事件2");
03-17 09:01:37.708 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 09:01:37.708 2513-2513/com.rxjava1xdemo I/onCompleted: 传递面包事件完成
可以看到 当调用 onCompleted()方法时 在其之后再调用onNext()方法 将不会在进行事件传递。
d 当调用onError()方法时
subscriber.onNext("面包事件");
subscriber.onError(new NullPointerException());
subscriber.onNext("面包事件2");
03-17 09:04:45.273 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 09:04:45.273 2513-2513/com.rxjava1xdemo I/onError: 传递面包事件出错了
可以看到 当调用 onError()方法时 其后在调用onNext()方法 不在进行事件传递。
e 当 onCompleted()方法和onError() 都调用时。
onError方法先调用时。
subscriber.onNext("面包事件");
subscriber.onError(new NullPointerException());
subscriber.onCompleted();
subscriber.onNext("面包事件2");
03-17 09:07:43.065 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 09:07:43.065 2513-2513/com.rxjava1xdemo I/onError: 传递面包事件出错了
可以看到只调用了 onError()方法。
onCompleted方法先调用时
subscriber.onNext("面包事件");
subscriber.onCompleted();
subscriber.onError(new NullPointerException());
subscriber.onNext("面包事件2");
03-17 09:09:01.728 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 09:09:01.728 2513-2513/com.rxjava1xdemo I/onCompleted: 传递面包事件完成
可以看到只调用了 onCompleted()方法
总结 1 onNext()方法可以多次调用,但是只能在onError()方法或者onCompleted()方法之前调用。2 onError()方法和onCompleted()方法同时只能出现一个。当同时出现出现时,只会调用 第一个时间出现的。
⑥ RxJava1.X 观察者Observer的其他写法
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。
Subscriber<String> subscriber=new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
return subscriber;
可以看到 Subscriber 是一个抽象类 它实现了 Observer接口 并 增加了一个 onStart(); 对Observer进行了一点拓展。但是他们的使用方法还是一样的。同样可以通过 subscribe()方法 实现订阅关系。
observable.subscribe(subscriber);
除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
在这里 action1 和 action0 都是接口 因为 onNext()和onError()方法都是有参的 所以实现了 action1 接口 而 onCompleted()方法是无参的 所以实现了 action0接口 。
⑦ RxJava1.X 被观察者 Observable 的其他创建方式 。
create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列
⑴ 使用 just 方式 将传入的参数 发射出来
Observable<String> observable=Observable.just("面包事件1","面包事件2","面包事件3");
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 面包事件1
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 面包事件2
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 面包事件3
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onCompleted: 传递面包事件完成
⑵ 使用 from 方式 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] actions = {"面包事件1", "面包事件2", "面包事件3"};
Observable observable = Observable.from(actions);
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 面包事件1
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 面包事件2
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 面包事件3
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onCompleted: 传递面包事件完成
2 RxJava2.0的用法
① 引用库
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
② 创建 观察者 Observer
Observer observer=new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("onNext",d.isDisposed()+"");
}
@Override
public void onNext(String s) {
Log.i("onNext",s);
}
@Override
public void onError(Throwable e) {
Log.i("onError",e.toString());
}
@Override
public void onComplete() {
Log.i("onComplete","onComplete");
}
};
在创建观察者的方法中,我们可以看到 有四个传递事件的方法:onSubscribe (是否解除订阅关系,相当于发射事件的开关) onNext (向下传递事件时调用) onCompleted(事件传递完成时调用) onError(事件传递出错时调用)。
③ 创建 被观察者 Observable
Observable observable=Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("面包事件1");
e.onNext("面包事件2");
e.onNext("面包事件3");
e.onComplete();
}
});
这里可以看到 通过create 方法传入了一个 ObservableOnSubscribe对象 作为参数。它有一个subscribe的方法 在这里 ObservableEmitter 事件发射器的意思。通过这个发射器 发射事件。也就是调用 onNext onCompleted onError方法。
④ 建立观察关系
observable.subscribe(observer);
被观察者 调用 subscribe 方法 传入 观察者 observer 建立订阅关系。
⑤ 订阅结果
03-17 11:02:01.561 25410-25410/com.rxjava20demo I/onNext: false
03-17 11:02:01.561 25410-25410/com.rxjava20demo I/onNext: 面包事件1
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onNext: 面包事件2
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onNext: 面包事件3
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onComplete: onComplete
⑥ 实现订阅关系的其他写法
我们在用subscribe(subscriber)方式实现订阅关系时 可以看到 subscribe还有其他的重载方法:
我们可以看到几个熟悉的词(标记的词) onNext onError onCompleted 。
也就是在subscribe方法里 可以传入不同的参数。从而调用不同的方法。
observable.subscribe(new Consumer() {
@Override
public void accept(@NonNull Object o) throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
}
});
这里的重载方法 就相当于 subscribe(onNext ,onError,onCompleted ) 。
三 RxJava1.X 与RxJava2.0的区别
1 在使用create方式创建Observable时 传入的参数不同。
RxJava1.X:
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
RxJava2.0:
Observable observable=Observable.create(new ObservableOnSubscribe()
2 subscribe方法 重载的参数不同
RxJava1.X
RxJava2.0
可以明显看到 RxJava2.0中 不在支持 Subscriber的实现类参数。
不再支持 Action1 的实现类参数。