Rx 有两个最基础的类型,和其他一些扩展这两种类型的类。两个核心的类为: Observable 和 Observer。Subject 是同时继承了 Observable 和 Observer。
Rx 是在 Observer 模式之上建立起来的。这种模式很常见,在 Java 中有很多地方都使用了该模式,比如 JavaFx 中的 EventHandler。 这些简单的使用方式和 Rx 对比有如下区别:
- 使用 event handler 来处理事件很难组合使用
- 无法延时处理查询事件
- 可能会导致内存泄露
- 没有标准的标示完成的方式
- 需要手工的来处理并行和多线程
1. Observable
Observable 是第一个核心类。该类包含了 Rx 中的很多实现,以及所有核心的操作函数(operator、或者说 操作符)。在本系列教程中会逐步介绍每个操作函数。现在我们只需要理解 subscribe 函数即可,下面是该函数的一种定义:
public final Subscription subscribe(Subscriber<? super T> subscriber)
该函数是用来接收 observable 发射的事件的。当事件被发射后,他们就丢给了 subscriber, subscriber 是用来处理事件的实现。这里的 Subscriber 参数实现了 Observer 接口
一个 Observable 发射 三种类型的事件:
- Values (数据)
- 完成状态,告诉 Subscriber 事件(数据) 发射完毕,没有其他数据了
- Error, 错误状态,如果在发射数据的过程中出现错误了。会发送该事件
2. Observer
Subscriber 是 Observer 的一个实现。 Subscriber 实现了其他一些额外的功能,可以作为我们实现 Observer 的基类。现在先看看 Observer 的接口定义:
interface Observer<T> {
void onCompleted();
void onError(java.lang.Throwable e);
void onNext(T t);
}
每次 Observable 发射事件的时候就会执行这三个对应的函数。Observer 的 onNext 函数会被调用0次或者多次,然后会调用 onCompleted 或者 onError。在 onCompleted 或者 onError 发生以后就不会再有其他事件发射出来了。
在使用 Rx 开发的过程中,你会看到很多 Observable,但是 Observer 出场的时候很少。但是理解 Observer 的概念是非常重要的,虽然有很多简写方式来帮助更加简洁的使用 Observer
3. 实现 Observable 和 Observer
你可以手工的实现 Observer 或者扩展 Observable。 在真实场景中并不需要这样做,Rx 已经提供了很多可以直接使用的工厂方法了。使用 Rx 提供的工具来创建 Observable 和 Observer 比手工实现要更加安全和简洁。
要订阅到一个 Observable,并不需要提供一个 Observer 示例。subscribe 函数有各种重载方法可以使用,你可以只订阅 onNext 事件,有可以只订阅 onError 事件,这样就不用提供 Observer 对象就可以接受事件了。每次只需要提供你关心的函数即可,例如 如果你不关心 error 和完成事件,则只提供 onNext 来接收每次发送的数据即可。
配合 Java 8 的 Lambda 表达式则使用起来代码看起来会更加简洁,所以本系列示例代码会使用 lambda 表达式,如果你不了解的话,可以先看看掌握 Java 8 Lambda 表达式。
4. Subjec
官方文档:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据
Subject 是 Observable 的一个扩展,同时还实现了 Observer 接口,总结就是:
Subject 可以像 Observer 一样接收事件,同时还可以像 Observable 一样把接收到的事件再发射出去
它可以充当Observable
它可以充当Observer
- 是Observable和Observer之间的桥梁
这种特性非常适合 Rx 中的接入点,当你的事件来至于 Rx 框架之外的代码的时候,你可以把这些数据先放到 Subject 中,然后再把 Subject转换为一个 Observable,就可以在 Rx 中使用它们了。你可以把 Subject 当做 Rx 中的 事件管道。
Subject 有两个参数类型:输入参数和输出参数。这样设计是为了抽象 而不是为了转换数据类型。转换数据应该使用转换操作函数来完成,后面我们将介绍各种操作函数。
- AsyncSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
4.1 PublishSubject
PublishSubject
比较容易理解,相对比其他Subject
常用,它的Observer
只会接收到PublishSubject
被订阅之后发送的数据。示例代码如下:
PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("publishSubject1");
publishSubject.onNext("publishSubject2");
publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
LogUtil.log("publishSubject observer1:"+s);
}
});
publishSubject.onNext("publishSubject3");
publishSubject.onNext("publishSubject4");
输出结果:
behaviorSubject3
behaviorSubject4
4.2 ReplaySubject
ReplaySubject 可以缓存所有发射给他的数据。当一个新的订阅者订阅的时候,缓存的所有数据都会发射给这个订阅者。 由于使用了缓存,所以每个订阅者都会收到所以的数据:
ReplaySubject<String> replaySubject = ReplaySubject.create();
// 创建默认初始缓存容量大小为16的ReplaySubject,当数据条目超过16会重新分配内存空间,使用这种方式,不论ReplaySubject何时被订阅,Observer都能接收到数据
// replaySubject =
// ReplaySubject.create(100);//创建指定初始缓存容量大小为100的ReplaySubject
// replaySubject = ReplaySubject.createWithSize(2);//只缓存订阅前最后发送的2条数据
// replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());
// replaySubject被订阅前的前1秒内发送的数据才能被接收
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("replaySubject:" + s);
}
});
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");
结果
replaySubject:replaySubject:pre1
replaySubject:replaySubject:pre2
replaySubject:replaySubject:pre3
replaySubject:replaySubject:after1
replaySubject:replaySubject:after2
不管是何时订阅的,每个订阅者都收到了所有的数据。
4.3 BehaviorSubject
BehaviorSubject
会接收到BehaviorSubject
被订阅之前的最后一个数据,再接收其他发射过来的数据,如果BehaviorSubject
被订阅之前没有发送任何数据,则会发送一个默认数据。
注意跟AsyncSubject
的区别,AsyncSubject
要手动调用onCompleted()
,且它的Observer
会接收到onCompleted()
前发送的最后一个数据,之后不会再接收数据,而BehaviorSubject
不需手动调用onCompleted()
,它的Observer
接收的是BehaviorSubject
被订阅前发送的最后一个数据,两个的分界点不一样,且之后还会继续接收数据。示例代码如下:
BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
behaviorSubject.onNext("behaviorSubject1");
behaviorSubject.onNext("behaviorSubject2");
behaviorSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
LogUtil.log("behaviorSubject:complete");
}
@Override
public void onError(Throwable e) {
LogUtil.log("behaviorSubject:error");
}
@Override
public void onNext(String s) {
LogUtil.log("behaviorSubject:"+s);
}
});
behaviorSubject.onNext("behaviorSubject3");
behaviorSubject.onNext("behaviorSubject4");
结果
behaviorSubject2
behaviorSubject3
behaviorSubject4
如果在behaviorSubject.subscribe()
之前不发送behaviorSubject1
、behaviorSubject2
,则Observer
会先接收到default
,再接收behaviorSubject3
、behaviorSubject4
4.4 AsyncSubject
Observer
会接收AsyncSubject
的onComplete()
之前的最后一个数据,如果因异常而终止,AsyncSubject
将不会释放任何数据,但是会向Observer传递一个异常通知。示例代码如下:
AsyncSubject<String> asyncSubject = AsyncSubject.create();
asyncSubject.onNext("asyncSubject1");
asyncSubject.onNext("asyncSubject2");
asyncSubject.onNext("asyncSubject3");
asyncSubject.onCompleted();
asyncSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
LogUtil.log("asyncSubject onCompleted"); //输出 asyncSubject onCompleted
}
@Override
public void onError(Throwable e) {
LogUtil.log("asyncSubject onError"); //不输出(异常才会输出)
}
@Override
public void onNext(String s) {
LogUtil.log("asyncSubject:"+s); //输出asyncSubject:asyncSubject3
}
});
输出结果:
asyncSubject3
以上代码,Observer
只会接收asyncSubject
的onCompleted()
被调用前的最后一个数据,即“asyncSubject3”
,如果不调用onCompleted()
,Subscriber
将不接收任何数据。
5. 隐含的规则
如果你把 Subject
当作一个 Subscriber
使用,不要从多个线程中调用它的onNext
方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable
协议,给Subject
的结果增加了不确定性。要避免此类问题,官方提出了“串行化”,你可以将 Subject
转换为一个 SerializedSubject
,类似于这样:
SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);
Rx 中有一些隐含的规则在代码中并不太容易看到。一个重要的规则就是当一个事件流结束(onError 或者 onCompleted 都会导致事件流结束)后就不会发射任何数据了。这些 Subject 的实现都遵守这个规则,subscribe 函数也拒绝违反该规则的情况。
Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);
结果
0
但是在 Rx 实现中并没有完全确保这个规则,所以你在使用 Rx 的过程中要注意遵守该规则,否则会出现意料不到的情况。
6. 代码示例
6.1 创建Observable并发射数据
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("I'm Observable");
subscriber.onCompleted();
}
});
用Subject实现为:
PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("as Observable");
publishSubject.onCompleted();
6.2 创建Observer订阅Observable并接收数据:
mObservable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
//接收数据
}
});
用Subject实现为:
publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
6.3 把Subject当作Observer传入subscribe()中
PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("as Observer");
subscriber.onCompleted();
}
}).subscribe(publishSubject);
有没有发现问题?publishSubject
没有重写onNext()
方法啊,在哪接收的数据?这就是前面说的“桥梁”的问题了,尽管把Subject作为Observer传入subscribe(),但接收数据还是要通过Observer来接收,借用Subject来连接Observable和Observer,整体代码如下:
PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("as Bridge");
subscriber.onCompleted();
}
}).subscribe(publishSubject);
publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
LogUtil.log("subject:"+s); //接收到 as Bridge
}
});
这就是桥梁的意思!