RxJava2.0新特性
简单总结介绍下:
- 不再支持传null,传了直接正常结束或者抛异常;
- Observable不再支持背压,新加入Flowable支持非阻塞的背压,并且所有操作符都强制支持背压;
- Single类可单独发送onSuccess或onError消息;
- Completable只是改变了命名;
- 新增Maybe,可以说是Single和Completable结合体,只能发送0或1个事件或错误;
- 很多基础类实现了类似Publisher<T>的接口;
- 支持背压的都是FlowableProcessor<T>的子类,Subject不再支持T -> R的转换;
- TestSubject被废弃;
- SerializedSubject 不再是公共方法,需要使用Subject.toSerialized()和FlowableProcessor.toSerialized()替代;
- GroupedObservable编程抽象类;
- 可以自己实现功能接口,并且所有功能接口都会抛出异常,不需要try-catch;
- 减少组件数量,Action0由io.reactivex.functions.Action和Scheduler代替,Action1重命名Consumer,Action2重命名BiConsumer,ActionN被Consumer<Object[]>代替;
- Functions按照Java8明明风格命名;
- 使用轻量级Subscriber接口,整合请求管理和取消,为Flowable定义抽象类,支持外部取消dispose(),onCompleted重命名,
- request前必须完成初始化工作;
- 1.0中Subscription重命名成Disposable;
- Reactive-Streams规范的操作符支持背压,异常会在onNext抛出,Observable完全不支持背压;
- Reactive-Streams compliance;
- 重新设计RxJavaPlugins,RxJavaHooks功能被加入到了RxJavaPlugins;
- 调度类调整;
- 每个基础类都有create操作符用于支持背压和取消;
- Subscriber和Observer不允许主动抛出异常;
- 为了支持内部测试,所有基础类都有test()方法;
- TestObserver可在订阅前取消TestSubscriber/TestObserver;
- 阻塞终端成为可能;
- 1.x中使用Mockito和Observer的用户需要使用Subscriber.onSubscribe进行初始化请求;
- 大部分操作符仍然保留,但进行了重载;
- 1.x Observable 到 2.x Flowable;
- 实例方法;
- 不同返回值;
- 移除方法;
- doOnCancel/doOnDispose/unsubscribeOn变化说明;
以上是个人大致总结,详情还是看官方文档 What's different in 2.0或中文翻译,虽然变动很多,但在实际使用中估计就背压和部分类的改变可能有所影响。
RxBus
RxBus不是一个库,而是一种模式,是使用RxJava来实现EventBus。用RxBus来替代EventBus减少了程序对第三方库的应用。
直接上代码
public class RxBus {单例实现,使用中只需要订阅事件subscribe和发送事件post即可,与RxJava1.x实现的RxBus几乎一样,熟悉的用法,不一样的实现而已。
private static RxBus instance;
public static RxBus getInstance() {
if (instance == null) {
synchronized (RxBus.class) {
if (instance == null) {
instance = new RxBus();
}
}
}
return instance;
}
private Subject<Object> subject;
private HashMap<Object, CompositeDisposable> disposableHashMap;
private RxBus() {
subject = PublishSubject.create().toSerialized();
disposableHashMap = new HashMap<>();
}
/**
* 发送事件
*
* @param event 递送的事件
*/
public void post(Object event) {
subject.onNext(event);
}
/**
* 返回指定类型的带背压的Flowable实例
*
* @param type
* @param <T>
* @return
*/
private <T> Flowable<T> getObservable(Class<T> type) {
return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}
/**
* 订阅事件
* @param subscriber 订阅者对象
* @param type 事件的类型
* @param next 事件的处理程序
* @param error 事件的异常处理
* @param <T>
*/
public <T> void subscribe(Object subscriber, Class<T> type, Consumer<T> next, Consumer<Throwable> error) {
Disposable disposable = getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next, error);
addSubscription(subscriber, disposable);
}
/**
* 保存订阅后的disposable
*
* @param subscriber
* @param disposable
*/
private void addSubscription(Object subscriber, Disposable disposable) {
if (disposableHashMap.get(subscriber) != null) {
disposableHashMap.get(subscriber).add(disposable);
} else {
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposable);
disposableHashMap.put(subscriber, compositeDisposable);
}
}
/**
* 是否有观察者订阅
*
* @return
*/
public boolean hasObservers() {
return subject.hasObservers();
}
/**
* 取消订阅
*
* @param subscriber
*/
public void unSubscribe(Object subscriber) {
if (disposableHashMap.containsKey(subscriber)) {
disposableHashMap.get(subscriber).dispose();
disposableHashMap.remove(subscriber);
}
}
}