Rxjava Subjects

时间:2023-03-08 17:51:13

上次提到调用observable的publish和connect方法后可以将一个Observable发出的对象实时传递到订阅在上的subscriber。

这个和Rxjava中Subject的概念十分相像。Subject是可以理解为桥接或者代理,可以订阅Observable也可以自己作为Observable提供Subscriber订阅。

Rxjava提供PublishSubject其功能与上次提到的publish+connect一样

@Test
public void testPublishSubject() throws InterruptedException {
//创建一个publish subject
PublishSubject<Object> subject = PublishSubject.create();
Observable.create(sub->{
new Thread(()->{
int i=0;
while(i<5){
sub.onNext(i++);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
}).start();
}).subscribe(subject);//将subject订阅在原始Observable上
//在subject上订阅子subscriber
subject.subscribe(x->{
System.out.println("1st sub "+x);
});
Thread.sleep(1000);
System.out.println("2nd subscriber start");
//一秒后在subject上订阅子subscriber
subject.subscribe(x->{
System.out.println("2nd sub "+x);
});
Thread.sleep(10000);
}
--------输出---------
1st sub 1
1st sub 2
2nd subscriber start
1st sub 3
2nd sub 3
1st sub 4
2nd sub 4

主要subject种类

subject的用法大致都如上所示,可以subscribe也可以被subscribe,subject的不同是可以同时订阅多个observable并同时广播到子subscriber上。

AsyncSubject

Rxjava Subjects

该subject会等到原observable结束并发出原observable最后一个发出的对象或错误。

BehaviorSubject

Rxjava Subjects

该subject会发出最近的一个对象并持续发出接下来的所由对象。

PublishSubject

Rxjava Subjects

上面已经介绍过,该subject会发出订阅后原Observable所发出的对象。

ReplaySubject

Rxjava Subjects

该subject会返回从原始observable开始的所有对象。要注意的是这个可能存在潜在的OOM风险。