一、啥是RXJava
1.1 简介
一个在Java Jvm上使用可观测的序列来组成异步的、基于事件的程序的库。
概念很复杂,没所谓。我们先学会使用。
RxJava github地址:
https://github.com/ReactiveX/RxJava
RxJava github地址:
https://github.com/ReactiveX/RxAndroid
建议查看他的wiki,可以看它内容,进行通篇的认识。
1.2 三个概念
- Observable:被观察者(主题Subject)
- Observer/Subscriber:观察者
- Subscribe: 订阅
Observable和Observer 通过subscribe() 方法实现订阅关系
//以前是通过add到被观察者里面进行订阅,然后change来进行通知。 RxJava是一旦订阅就发送。
二、使用三部曲
前提是先依赖了
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
2.1 方法1: create
2.1.1 创建 被观察者Observable
/**
* 定义 被观察者
* @return
*/
public Observable<String> getObservable(){
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//只要有三个方法onNext、onError、onCompleted
//onNext类似观察者模式的change
e.onNext("HI"); //发送数据
e.onNext("天平"); //发送数据
//发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法
e.onComplete();
//onComplete 和 onError 只调用一个,两个都用只认前面那个
//e.onError(new Exception("错误啦!"));
}
});
2.1.2 创建观察者 Observer
/**
* 生成观察者
* @return
*/
public Observer<String> getObserver(){
return new Observer<String>() {
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: "+e.getMessage());
//发生错误调用
}
@Override
public void onComplete() {
Log.d(TAG, "onCompleted: ");
//数据接收完成时调用
}
/**
*
* @param d
*/
@Override
public void onSubscribe(Disposable d) {
//最先回调,没有执行onNext、onComplete、onError也会回调
Log.d(TAG, "onSubscribe: "+d.toString());
//d.dispose(); //移除订阅关系
//d.isDisposed(); //判断取消了订阅关系,为真就是没有订阅,假就是有订阅中
}
/**
* 被观察者调用onNext这里就会回调
* @param s 参数
*/
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
//正常接收数据调用
System.out.print(s); //将接收到来自sender的问候"Hi,Weavey!"
}
};
}
2.1.3 订阅
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<String> observable = getObservable();
Observer<String> observer = getObserver();
//关联观察者和被观察者-> 订阅
observable.subscribe(observer);
}
2.2 方法2: create
2.2.1 上面那样创建被观察者
2.2.2 创建观察者
已经包含订阅
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
//这个accept就等于观察者的onNext
Log.e(TAG, "accept: "+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//onError
}
}, new Action() {
@Override
public void run() throws Exception {
}
});
2.3 方法3: just
2.3.1 生成被观察者
/**
* 生成 被观察者
* @return
*/
public Observable<String> getObservable(){
//依次发送"just1"和"just2"
return Observable.just("just1","just2");
}
2.3.2 定义 观察者
包含了订阅
Observable<String> observable = getObservable();
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
//这个accept就等于观察者的onNext
Log.e(TAG, "accept: "+s);
}
});
2.4 方法4: fromArray
2.4.1 生成被观察者
/**
* 生成 被观察者
* @return
*/
public Observable<String> getObservable(){
return Observable.fromArray("from1","from2","from3");
}
- 定义观察者和上面一致
2.5 方法5: fromCallable
2.5.1 生成被观察者
/**
* 生成 被观察者
* @return
*/
public Observable<String> getObservable(){
return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "fromCallable";
}
});
}
2.5.2 定义观察者和上面一致
还有其他的看下面的列表。
三. 方法列表
名称 | 解析 |
---|---|
just() | 将一个或多个对象转换成发射这个或这些对象的一个Observable |
fromArray() | 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable |
repeat() | 创建一个重复发射指定数据或数据序列的Observable |
repeatWhen() | 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据 |
create() | 使用一个函数从头创建一个Observable |
defer() | 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable |
range() | 创建一个发射指定范围的整数序列的Observable |
interval() | 创建一个按照给定的时间间隔发射整数序列的Observable |
timer() | 创建一个在给定的延时之后发射单个数据的Observable |
empty() | 创建一个什么都不做直接通知完成的Observable |
error() | 创建一个什么都不做直接通知错误的Observable |
never() | 创建一个不发射任何数据的Observable |