Observable Utility Operators(辅助操作符)
delay
顾名思义,Delay操作符就是让发射数据的时机延后一段时间,这样所有的数据都会依次延后一段时间发射。
log("start subscrib:" + System.currentTimeMillis()/1000);
Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
for (int i = 1; i <= 2; i++) {
Long currentTime=System.currentTimeMillis()/1000;
log("subscrib:" + currentTime);
subscriber.onNext(currentTime);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).subscribeOn(Schedulers.newThread());
observable.delay(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("delay:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));
}
});
结果:
start subscrib:1462519228
subscrib:1462519228
subscrib:1462519229
delay:1462519230---2
delay:1462519231---2
delaySubscription
不同之处在于Delay是延时数据的发射,而DelaySubscription是延时注册Subscriber。
dealy是延迟发射,delaySubscription则是延迟收到。
log("start subscrib:" + System.currentTimeMillis()/1000);
Observable<Long> observable = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
for (int i = 1; i <= 2; i++) {
Long currentTime=System.currentTimeMillis()/1000;
log("subscrib:" + currentTime);
subscriber.onNext(currentTime);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).subscribeOn(Schedulers.newThread());
observable.delaySubscription(2000, TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("delaySubscription:"+System.currentTimeMillis()/1000+"---"+(System.currentTimeMillis()/1000-aLong));
}
});
结果:
start subscrib:1462519279
subscrib:1462519281
delaySubscription:1462519281---0
subscrib:1462519282
delaySubscription:1462519282---0
do
do操作符就是给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段的时候,这些回调就会被触发。在Rxjava实现了很多的doxxx操作符。
doOnEach
doOnEach可以给Observable加上这样的样一个回调:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted。
Observable observable=Observable.just(1,2,3);
observable.doOnEach(new Action1<Notification>() {
@Override
public void call(Notification notification) {
log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());
}
}).subscribe(new Action1() {
@Override
public void call(Object o) {
log(o.toString());
}
});
Subject<Integer, Integer> values = ReplaySubject.create();
values.doOnEach(new Action1<Notification<? super Integer>>() {
@Override
public void call(Notification<? super Integer> notification) {
log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());
}
}).subscribe(new Action1() {
@Override
public void call(Object o) {
log(o.toString());
}
});
values.onNext(4);
values.onNext(5);
values.onNext(6);
values.onError(new Exception("Oops"));
结果:
doOnEach send 1 type:OnNext
1
doOnEach send 2 type:OnNext
2
doOnEach send 3 type:OnNext
3
doOnEach send null type:OnCompleted
doOnEach send 4 type:OnNext
4
doOnEach send 5 type:OnNext
5
doOnEach send 6 type:OnNext
6
doOnEach send null type:OnError
doOnNext
doOnNext则只有onNext的时候才会被触发。
Subject<Integer, Integer> values = ReplaySubject.create();
values.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log("doOnNext send :"+integer.toString());
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer.toString());
}
});
values.onNext(4);
values.onError(new Exception("Oops"));
结果:
doOnNext send :4
4
doOnSubscribe
doOnSubscribe会在Subscriber进行订阅的时候触发回调。
Observable observable=Observable.just(1,2);
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
log("first:"+o.toString());
}
});
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
log("second:"+o.toString());
}
});
结果:
I'm be subscribed!
first:1
first:2
I'm be subscribed!
second:1
second:2
doOnUnSubscribe
doOnUnSubscribe则会在Subscriber进行反订阅的时候触发回调。
当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。
Observable observable = Observable.just(1, 2).doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("I'm be unSubscribed!");
}
});
Subscription subscribe1 = observable.subscribe();
Subscription subscribe2 = observable.subscribe();
subscribe1.unsubscribe();
subscribe2.unsubscribe();
结果:
I'm be unSubscribed!
I'm be unSubscribed!
doOnError
doOnError会在OnError发生的时候触发回调,并将Throwable对象作为参数传进回调函数里;
try {
Observable observable = Observable.error(new Throwable("呵呵哒")).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log(throwable.getMessage().toString());
}
});
observable.subscribe();
}catch (Exception e){
log("catch the exception");
}
结果:
呵呵哒
catch the exception
doOnComplete
doOnComplete会在OnCompleted发生的时候触发回调。
Observable observable = Observable.empty().doOnCompleted(new Action0() {
@Override
public void call() {
log("Complete!");
}
});
observable.subscribe();
结果:
Complete!
doOnTerminate
DoOnTerminate会在Observable结束前触发回调,无论是正常还是异常终止;
Subject<Integer, Integer> values = ReplaySubject.create();
values.doOnTerminate(new Action0() {
@Override
public void call() {
log("order to terminate");
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer.toString());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
log(throwable.getMessage().toString());
}
});
values.onNext(4);
values.onError(new Exception("Oops"));
结果:
4
order to terminate
Oops
finallyDo
finallyDo会在Observable结束后触发回调,无论是正常还是异常终止。
Observable observable = Observable.empty().finallyDo(new Action0() {
@Override
public void call() {
log("already terminate");
}
});
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
}, new Action0() {
@Override
public void call() {
log("Complete!");
}
});
结果:
Complete!
already terminate
materialize
materialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来。
public final Observable<Notification<T>> materialize()
元数据中包含了源 Observable 所发射的动作,是调用 onNext 还是 onComplete。注意上图中,源 Observable 结束的时候, materialize 还会发射一个 onComplete 数据,然后才发射一个结束事件。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.materialize()
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
meterialize:0--type:OnNext
meterialize:1--type:OnNext
meterialize:2--type:OnNext
meterialize:null--type:OnCompleted
Notification 类包含了一些判断每个数据发射类型的方法,如果出错了还可以获取错误信息 Throwable 对象。
dematerialize
deMeterialize则是与materialize 执行相反的过程。
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.materialize()
.dematerialize()
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0
1
2
注意:在调用dematerialize()
之前必须先调用materialize()
,否则会报错。
serialize
强制Observable按次序发射数据并且功能是有效的
如果你无法确保自定义的操作符符合 Rx 的约定,例如从多个源异步获取数据,则可以使用 serialize 操作函数。 serialize 可以把一个不符合约定的 Observable 转换为一个符合约定的 Observable。
下面创建一个不符合约定的 Observable,并且订阅到该 Observable上:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
subscriber.onNext(3);
subscriber.onCompleted();
}
});
observable.doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("Unsubscribed");
}
}) .subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
}, new Action0() {
@Override
public void call() {
log("Complete!");
}
});
结果:
1
2
Complete!
Unsubscribed
先不管上面的 Observable 发射的数据,订阅结束的情况看起来符合 Rx 约定。 这是由于 subscribe 认为当前数据流结束的时候会主动结束这个 Subscription。但实际使用中我们可能并不想直接结束这个Subscription。还有一个函数为 unsafeSubscribe ,该函数不会自动取消订阅。
observable.doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("Unsubscribed");
}
})
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
结果:
1
2
Complete!
3
Complete!
上面的示例最后就没有打印 Unsubscribed 字符串。
unsafeSubscribe 也不能很好的处理错误情况。所以该函数几乎没用。在文档中说:该函数应该仅仅在自定义操作函数中处理嵌套订阅的情况。 为了避免这种操作函数接受到不合法的数据流,我们可以在其上应用 serialize 操作函数:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
subscriber.onNext(3);
subscriber.onCompleted();
}
})
.cast(Integer.class)
.serialize();
observable.doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("Unsubscribed");
}
})
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
结果:
1
2
Complete!
尽管上面的代码中没有调用unsubscribe, 但是数据流事件依然符合约定。最后也收到了完成事件。
timeout
添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
- 我们可以认为timeout()为一个Observable的限时的副本。
- 如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发onError()函数。
Observable<Long> values = Observable.interval(200, TimeUnit.MILLISECONDS);
Subscription subscription = values
.timeout(300,TimeUnit.MILLISECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log(e.getMessage().toString());
}
@Override
public void onNext(Long aLong) {
log(aLong+"");
}
});
结果:
0
1
2
...
Rxjava将Timeout实现为很多不同功能的操作符,比如说超时后用一个备用的Observable继续发射数据等。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i <= 3; i++) {
try {
Thread.sleep(i * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(5, 6)).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer.toString());
}
});
结果:
0
1
2
5
6
timestamp
给Observable发射的每个数据项添加一个时间戳
timestamp 把数据转换为 Timestamped 类型,里面包含了原始的数据和一个原始数据是何时发射的时间戳。
public final Observable<Timestamped<T>> timestamp()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.timestamp()
.subscribe(new Action1<Timestamped>() {
@Override
public void call(Timestamped mTimestamped) {
log(mTimestamped.toString());
}
});
结果:
Timestamped(timestampMillis = 1461758360570, value = 0)
Timestamped(timestampMillis = 1461758360670, value = 1)
Timestamped(timestampMillis = 1461758360771, value = 2)
从结果可以看到,上面的数据大概每隔100毫秒发射一个。
timeInterval
将一个Observable转换为发射两个数据之间所耗费时间的Observable
如果你想知道前一个数据和当前数据发射直接的时间间隔,则可以使用 timeInterval 函数。
public final Observable<TimeInterval<T>> timeInterval()
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values.take(3)
.timeInterval()
.subscribe(new Action1<TimeInterval>() {
@Override
public void call(TimeInterval mTimeInterval) {
log(mTimeInterval.toString());
}
});
结果:
TimeInterval [intervalInMilliseconds=101, value=0]
TimeInterval [intervalInMilliseconds=99, value=1]
TimeInterval [intervalInMilliseconds=100, value=2]
using
创建一个只在Observable的生命周期内存在的一次性资源
Using操作符创建一个在Observable生命周期内存活的资源,也可以这样理解:我们创建一个资源并使用它,用一个Observable来限制这个资源的使用时间,当这个Observable终止的时候,这个资源就会被销毁。
public static final <T,Resource> Observable<T> using(
Func0<Resource> resourceFactory,
Func1<? super Resource,? extends Observable<? extends T>> observableFactory,
Action1<? super Resource> disposeAction)
using 有三个参数,分别是:
- 1.创建这个一次性资源的函数
- 2.创建Observable的函数
- 3.释放资源的函数
当 Observable 被订阅的时候,resourceFactory 用来获取到需要的资源;observableFactory 用这个资源来发射数据;当 Observable 完成的时候,disposeAction 来释放资源。
Observable observable = Observable.using(new Func0<Animal>() {
@Override
public Animal call() {
return new Animal();
}
}, new Func1<Animal, Observable<?>>() {
@Override
public Observable<?> call(Animal animal) {
return Observable.timer(3, TimeUnit.SECONDS);//三秒后发射一次就completed
// return Observable.timer(4, 2, TimeUnit.SECONDS);//没有completed,不停的发射数据
// return Observable.range(1,3);//一次发射三个数据,马上结束
// return Observable.just(1,2,3);//一次发射三个数据,马上结束
}
}, new Action1<Animal>() {
@Override
public void call(Animal animal) {
animal.relase();
}
});
Subscriber subscriber = new Subscriber() {
@Override
public void onCompleted() {
log("subscriber---onCompleted");
}
@Override
public void onError(Throwable e) {
log("subscriber---onError");
}
@Override
public void onNext(Object o) {
log("subscriber---onNext"+o.toString());//o是发射的次数统计,可以用timer(4, 2, TimeUnit.SECONDS)测试
}
};
observable.count().subscribe(subscriber);
结果:
create animal
animal eat
animal eat
animal eat
subscriber---onNext1
subscriber---onCompleted
animal released
项目源码 GitHub求赞,谢谢!
引用:
RxJava操作符(六)Utility-云少嘎嘎嘎-ChinaUnix博客
RxJava 教程第三部分:驯服数据流之自定义操作函数 - 云在千峰