RxJava系列第一弹——RxJava入门篇

时间:2023-01-20 17:45:08

概述:RxJava系列文章第一篇 RxJava入门篇 介绍RxJava是什么和基本使用。

一 RxJava是什么

1 来自官网的介绍

a library for composing asynchronous and event-based programs using observable sequences for the Java VM 这是来自RxJava在GitHub的自我介绍 翻译过来就是 一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

2 来自个人的剖析

我想大多人第一次见识这段话的时候都是懵逼的,当然包括我自己。所以不管你都是看英文,还是看中文。这段话太难理解了。为了更好的理解这段话,我们对这段话进行一次剖析拿到这几个词 “可观测” “异步”“事件”“库”。

3 关键词:“可观测” “异步” “事件” “库”

① “可观测”,当你看到这个词的时候,你的脑海里想到了什么? 是的,没错。就是观察者模式。现在我们来聊聊观察者模式 举一个生活中给的例子 你妈妈和儿子的栗子 ,你看着你的妈妈 一直不说话 突然妈妈手里有了面包,儿子就说“我要吃”。这里 儿子就是观察者 ,妈妈就是被观察者,面包就是 被观察者 发生的改变。“我要吃”就是观察者作出的响应。如下图:

通过“观察”这个工作建立关系
RxJava系列第一弹——RxJava入门篇

被观察者发生改变 观察者作出响应
RxJava系列第一弹——RxJava入门篇

RxJava就是一种拓展的观察者模式 。让我们先来记住几个名词:“Observer”(带er就是什么人,那这里就是观察的人,即观察者),“Observable”(带able 就是可以什么什么的,那这里就是可以被观察的人,即被观察者)“subscribe”(订阅,即观察这个动作)如下图:
RxJava系列第一弹——RxJava入门篇

② “异步”

在安卓开发中,我们总会听到这样一句话,在主线程中不能执行耗时操作,在子线程中不能操作UI。这样就引出了一个“异步这个关键词”,通常我们的做法是使用Asynctask或者handler来执行异步的耗时操作通过接口回调在主线程中拿到数据修改UI。然后你发现大片的代码,大片的嵌套,还有回调 。让我们来感谢RxJava的诞生,有了RxJava ,妈妈再也不用担心我的异步操作了。在RxJava中有“线程调度”这样的说法。可以*的切换方法执行的线程。让我们来看看这个神奇的东东: observeOn(未知线程) ,subscribeOn(未知线程)。这就是在RxJava中使用的线程调度器。我们需要在这两个的方法中 指定调度到哪个线程。让我们记住这两个名词吧,在本篇的例子中会更详细的解释异步和线程调度。

③ “事件”

“事件”这个词,其实也挺抽象的,好吧,让我们回到妈妈和儿子的栗子。在这个模型中,妈妈拿出一个面包,这个行为 就是 被观察者创造出的一个事件,我们先称为“面包事件”而面包就是事件的参数。 儿子想吃面包那么就需要通过妈妈通过事件传递的方法将“面包事件”传递给儿子。那么在RxJava中有哪些事件的传递的方法呢。这里就不介绍了,在后面的栗子中,我们就会详细的介绍。

RxJava系列第一弹——RxJava入门篇

④ “库”

看到这里我只想说一句“酷”,这就意味着我们使用RxJava是非常简单的。作为Android开发者,我们只需要在gradle中添加依赖就可以了。

4 是时候来一句总结了

RxJava是什么呢?说了这么久,相信大家对RxJava有了一定的了解。那么是时候来一句总结了:RxJava是一个 通过拓展的观察者模式 使用线程调度器来解决异步操作 进行事件传递 的 库。(总结的不好,你来打我啊,哈哈)

二 RxJava怎么用

RxJava到现在已经有 RxJava1.x 和 RxJava2.0 两个版本了。我们分别来看看这个版本是怎么使用的。

1 RxJava1.x的用法:

① 引用库

compile 'io.reactivex:rxjava:1.0.10'

② 创建 观察者 Observer

       Observer<String> observer=new Observer<String>() {
@Override
public void onNext(String s) {
Log.i("onNext","传递面包事件");
}
@Override
public void onCompleted() {
Log.i("onCompleted","传递面包事件完成");
}

@Override
public void onError(Throwable e) {
Log.i("onError","传递面包事件出错了");
}
};

在创建观察者的方法中,我们可以看到 有三个传递事件的方法:onNext (向下传递事件时调用) onCompleted(事件传递完成时调用) onError(事件传递出错时调用)。

③ 创建 被观察者 Observable

        Observable observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("面包事件");
}
});

这里可以看到 通过create 方法传入了一个 onSubscribe对象 作为参数。当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用onNext()方法,将“面包事件”进行传递。由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递

④ 建立观察关系

 observable.subscribe(observer);

被观察者 调用 subscribe 方法 传入 观察者 observer 建立订阅关系。

⑤ 订阅结果

a 只调用 onNext()方法时

  subscriber.onNext("面包事件");
com.rxjava1xdemo I/onNext: 面包事件

可以清楚的看到 在 观察者的onNext()方法中 成功的打印出来 “面包事件” 这样就完成的了从 被观察者 到 观察者的事件传递。

b 调用 两次 onNext()方法时

subscriber.onNext("面包事件");
subscriber.onNext("面包事件2");
03-17 08:57:21.691 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 08:57:21.691 2513-2513/com.rxjava1xdemo I/onNext: 面包事件2

可以清楚看到 观察者的调用了两次onNext()方法。可以说明 被观察者可以发射很多次事件。
c 调用 onNext() 和 onCompleted()时

subscriber.onNext("面包事件");
subscriber.onCompleted();
subscriber.onNext("面包事件2");
03-17 09:01:37.708 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 09:01:37.708 2513-2513/com.rxjava1xdemo I/onCompleted: 传递面包事件完成

可以看到 当调用 onCompleted()方法时 在其之后再调用onNext()方法 将不会在进行事件传递。

d 当调用onError()方法时

subscriber.onNext("面包事件");
subscriber.onError(new NullPointerException());
subscriber.onNext("面包事件2");
03-17 09:04:45.273 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 09:04:45.273 2513-2513/com.rxjava1xdemo I/onError: 传递面包事件出错了

可以看到 当调用 onError()方法时 其后在调用onNext()方法 不在进行事件传递。

e 当 onCompleted()方法和onError() 都调用时。

onError方法先调用时。

subscriber.onNext("面包事件");
subscriber.onError(new NullPointerException());
subscriber.onCompleted();
subscriber.onNext("面包事件2");
03-17 09:07:43.065 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 09:07:43.065 2513-2513/com.rxjava1xdemo I/onError: 传递面包事件出错了

可以看到只调用了 onError()方法。

onCompleted方法先调用时

subscriber.onNext("面包事件");
subscriber.onCompleted();
subscriber.onError(new NullPointerException());
subscriber.onNext("面包事件2");
03-17 09:09:01.728 2513-2513/com.rxjava1xdemo I/onNext: 面包事件
03-17 09:09:01.728 2513-2513/com.rxjava1xdemo I/onCompleted: 传递面包事件完成

可以看到只调用了 onCompleted()方法

总结 1 onNext()方法可以多次调用,但是只能在onError()方法或者onCompleted()方法之前调用。2 onError()方法和onCompleted()方法同时只能出现一个。当同时出现出现时,只会调用 第一个时间出现的。

⑥ RxJava1.X 观察者Observer的其他写法

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。

      Subscriber<String> subscriber=new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
};
return subscriber;

可以看到 Subscriber 是一个抽象类 它实现了 Observer接口 并 增加了一个 onStart(); 对Observer进行了一点拓展。但是他们的使用方法还是一样的。同样可以通过 subscribe()方法 实现订阅关系。

observable.subscribe(subscriber);

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

在这里 action1 和 action0 都是接口 因为 onNext()和onError()方法都是有参的 所以实现了 action1 接口 而 onCompleted()方法是无参的 所以实现了 action0接口 。

⑦ RxJava1.X 被观察者 Observable 的其他创建方式 。

create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列
⑴ 使用 just 方式 将传入的参数 发射出来

Observable<String> observable=Observable.just("面包事件1","面包事件2","面包事件3");
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 面包事件1
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 面包事件2
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onNext: 面包事件3
03-17 10:07:02.804 18123-18123/com.rxjava1xdemo I/onCompleted: 传递面包事件完成

⑵ 使用 from 方式 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。

String[] actions = {"面包事件1", "面包事件2", "面包事件3"};
Observable observable = Observable.from(actions);
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 面包事件1
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 面包事件2
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onNext: 面包事件3
03-17 10:12:17.600 18123-18123/com.rxjava1xdemo I/onCompleted: 传递面包事件完成

2 RxJava2.0的用法

① 引用库

compile 'io.reactivex.rxjava2:rxjava:2.0.7'

② 创建 观察者 Observer

  Observer observer=new Observer<String>() {

@Override
public void onSubscribe(Disposable d) {
Log.i("onNext",d.isDisposed()+"");
}

@Override
public void onNext(String s) {
Log.i("onNext",s);
}


@Override
public void onError(Throwable e) {
Log.i("onError",e.toString());
}

@Override
public void onComplete() {
Log.i("onComplete","onComplete");
}
};

在创建观察者的方法中,我们可以看到 有四个传递事件的方法:onSubscribe (是否解除订阅关系,相当于发射事件的开关) onNext (向下传递事件时调用) onCompleted(事件传递完成时调用) onError(事件传递出错时调用)。

③ 创建 被观察者 Observable

        Observable observable=Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("面包事件1");
e.onNext("面包事件2");
e.onNext("面包事件3");
e.onComplete();
}
});

这里可以看到 通过create 方法传入了一个 ObservableOnSubscribe对象 作为参数。它有一个subscribe的方法 在这里 ObservableEmitter 事件发射器的意思。通过这个发射器 发射事件。也就是调用 onNext onCompleted onError方法。

④ 建立观察关系

 observable.subscribe(observer);

被观察者 调用 subscribe 方法 传入 观察者 observer 建立订阅关系。

⑤ 订阅结果

03-17 11:02:01.561 25410-25410/com.rxjava20demo I/onNext: false
03-17 11:02:01.561 25410-25410/com.rxjava20demo I/onNext: 面包事件1
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onNext: 面包事件2
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onNext: 面包事件3
03-17 11:02:01.562 25410-25410/com.rxjava20demo I/onComplete: onComplete

⑥ 实现订阅关系的其他写法

我们在用subscribe(subscriber)方式实现订阅关系时 可以看到 subscribe还有其他的重载方法:
RxJava系列第一弹——RxJava入门篇
我们可以看到几个熟悉的词(标记的词) onNext onError onCompleted 。
也就是在subscribe方法里 可以传入不同的参数。从而调用不同的方法。

        observable.subscribe(new Consumer() {
@Override
public void accept(@NonNull Object o) throws Exception {

}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {

}
}, new Action() {
@Override
public void run() throws Exception {

}
});

这里的重载方法 就相当于 subscribe(onNext ,onError,onCompleted ) 。

三 RxJava1.X 与RxJava2.0的区别

1 在使用create方式创建Observable时 传入的参数不同。

RxJava1.X:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() 

RxJava2.0:

Observable observable=Observable.create(new ObservableOnSubscribe()

2 subscribe方法 重载的参数不同

RxJava1.X
RxJava系列第一弹——RxJava入门篇
RxJava2.0
RxJava系列第一弹——RxJava入门篇

可以明显看到 RxJava2.0中 不在支持 Subscriber的实现类参数。
不再支持 Action1 的实现类参数。

四 总结

在RxJava系列的第一篇中 我们引入了 RxJava的概念。剖析了一些关键字,通过这些关键字,对RxJava进行了一层基本的认识。然后就是RxJava1.X的api用法。和RxJava2.0Api的用法。最后进行了 两个版本的比较。相信大家已经基本会使用RxJava了。再后续第二篇 我使用RxJava来实现一些经典场景。敬请期待吧。