(转载)http://www.jianshu.com/p/3b3345e6cea0
RxJava操作符
创建操作
Range
Range操作符根据初始值n和数目m发射一系列大于等于n的m个值
Observable.range(5, 5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //5,6,7,8,9
}
});
Defer
Defer操作符只有当有Subscriber来订阅的时候才会创建一个新的Observable对象,也就是说每次订阅都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的。
Repeat
Repeat会将一个Observable对象重复发射,我们可以指定其发射的次数。
Observable.just(1, 2, 3).repeat(5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); // 1,2,3,1,2,3...重复5次
}
});
Timer
Timer会在指定时间后发射一个数字0,该操作符运行在Computation Scheduler。
Observable.timer(3, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d(TAG, "aLong=" + aLong); // 延时3s
}
});
Interval
创建一个按固定时间间隔发射整数序列的Observable.
interval默认在computation调度器上执行。你也可以传递一个可选的Scheduler参数来指定调度器。
// 间隔时间1秒
Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d(TAG, "aLong=" + aLong); //
}
});
变换操作
Buffer
Buffer操作符定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。
RxView.clickEvents(mButton)
.buffer(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<List<ViewClickEvent>>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(List<ViewClickEvent> viewClickEvents) {
if (viewClickEvents.size() > 0) {
Toast.makeText(MainActivity.this, "2秒内点击了" + viewClickEvents.size() + "次", Toast.LENGTH_SHORT).show();
} else {
}
}
});
如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。
GroupBy
GroupBy操作符将原始Observable发射的数据按照key来拆分成一些小的Observable,然后这些小的Observable分别发射其所包含的的数据。
Observable.just(1, 2, 3, 4, 5, 6)
.groupBy(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribe(new Action1<GroupedObservable<Boolean, Integer>>() {
@Override
public void call(final GroupedObservable<Boolean, Integer> observable) {
//toList方法转换为Observable<List<T>>
observable.toList().subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
Log.d(TAG, "key=" + observable.getKey() + ",values=" + integers);
//key=false,values=[1, 3, 5]
//key=true,values=[2, 4, 6]
}
});
}
});
Scan
Scan操作符对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用这个函数时候的第一个参数使用。
Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9,10}).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer x, Integer y) {
return x+y;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer);// 1,3,6,10,15,21,28,36,45,55
}
});
过滤操作
Debounce
ThrottleWithTimeout
Distinct/DistinctUntilChanged
Distinct操作符用来除去重复数据。
Observable.from(new Integer[]{1,2,2,3,3,3,2,2,1}).distinct().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //1,2,3
}
});
DistinctUntilChanged操作符用来过滤掉连续的重复数据。
Observable.from(new Integer[]{1,2,2,3,3,3,2,2,1}).distinctUntilChanged().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //1,2,3,2,1
}
});
ElementAt
ElementAt只会返回指定位置的数据。
Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9})
.elementAt(4)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //5
}
});
Filter
Filter返回满足过滤条件的数据。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer < 5;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //1,2,3,4
}
});
First
First操作符返回第一条数据或者返回满足条件的第一条数据。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
.first()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //1 返回第一条数据
}
});
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 3;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //4 返回满足条件的第一条数据
}
});
Last
Last操作符返回最后一条数据或者满足条件的最后一条数据。
Skip
Skip操作符将源Observable发射的数据过滤掉前n项。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
.skip(6)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //7,8,9
}
});
Take
Take操作符只取前n项。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
.take(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); //1,2
}
});
Sample
Sample操作符会定时地发射源Observable最近发射的数据,其他的都会被过滤掉。
ThrottleLast
结合操作
CombineLatest
当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
// (200)(400)(600) (800) (1000)
// ---0--- 5---10----15----20
// (300) (600) (900) (1200)(1500)
//------0------5------10--- 15----20
Observable<Long> observable1=Observable.interval(200,TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong*5;
}
}).take(5);
Observable<Long> observable2=Observable.interval(300,TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong*5;
}
}).take(5);
Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
return aLong+aLong2;
}
}).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d(TAG, "aLong=" + aLong);
//0(300) ,5+0(400) ,10+0(600),10+5(600),15+5(800),15+10(900)
//20+10(1000),20+15(1200),20+20(1500)
}
});
在实际的开发中我们可以利用该操作符结合RxBinding来实现表单提交的校验。
final Observable<TextViewTextChangeEvent> usernameChangeObservable = RxTextView.textChangeEvents(mUsernameEditText);
final Observable<TextViewTextChangeEvent> passwordChangeObservable = RxTextView.textChangeEvents(mPasswordEditText);
submitButton.setEnabled(false);
Observable.combineLatest(usernameChangeObservable, passwordChangeObservable,
new Func2<TextViewTextChangeEvent, TextViewTextChangeEvent, Boolean>() {
@Override
public Boolean call(TextViewTextChangeEvent event1, TextViewTextChangeEvent event2) {
boolean emailCheck = event1.text().length() >= 3;
boolean passwordCheck = event2.text().length() >= 3;
return emailCheck && passwordCheck;
}
})
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
submitButton.setEnabled(aBoolean);
}
});
Join
Merge
Merge操作符将多个Observable发射的数据整合起来发射,就如同是一个Observable发射的数据一样。
当某一个Observable发出onError的时候,merge的过程会被停止并将错误分发给Subscriber,如果不想让错误终止merge的过程,可以使用MeregeDelayError操作符,会将错误在merge结束后再分发。
Observable<Long> observable1 = Observable.interval(200, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
Observable<Long> observable2 = Observable.interval(300, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
Observable.merge(observable1, observable2).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d(TAG, "aLong=" + aLong); //0,0,5,10,5,15,10,20,15,20
}
});
StartWith
StartWith操作符会在源Observable发射的数据前面插上一些数据。
Observable.just(1, 2, 3, 4).startWith(-1, 0)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG,"integer="+integer); // -1,0,1,2,3,4
}
});
Observable.just(1,2,3,4).startWith(Observable.just(-1,0))
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG,"integer="+integer); // -1,0,1,2,3,4
}
});
Zip
Zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。
// (200)(400)(600) (800) (1000)
// ---0--- 5---10----15----20
// (300) (600) (900) (1200)(1500)
//------0------5------10--- 15----20
Observable<Long> observable1=Observable.interval(200,TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong*5;
}
}).take(5);
Observable<Long> observable2=Observable.interval(300,TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong*5;
}
}).take(5);
Observable.zip(observable1, observable2, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
return aLong+aLong2;
}
}).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d(TAG, "aLong=" + aLong); //0,10,20,30,40
}
});
Delay操作符
Delay操作符让发射数据的时机延后一段时间,这样所有的数据都会依次延后一段时间发射。在Rxjava中将其实现为Delay和DelaySubscription。不同之处在于Delay是延时数据的发射,而DelaySubscription是延时注册Subscriber。
Do操作符
Do操作符就是给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段的时候,这些回调就会被触发。在Rxjava实现了很多的doXxx操作符。
doOnEach可以给Observable加上这样的样一个回调:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted。
DoOnNext则只有onNext的时候才会被触发。
doOnNext则只有onNext的时候才会被触发。
doOnError会在OnError发生的时候触发回调,并将Throwable对象作为参数传进回调函数里
doOnComplete会在OnCompleted发生的时候触发回调。
doOnTerminate会在Observable结束前触发回调,无论是正常还是异常终止;finallyDo会在Observable结束后触发回调,无论是正常还是异常终止。
doOnSubscribe和doOnUnSubscribe则会在Subscriber进行订阅和反订阅的时候触发回调。当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。
Observable.just(1, 2, 3, 4)
.doOnEach(new Action1<Notification<? super Integer>>() {
@Override
public void call(Notification<? super Integer> notification) {
Log.d(TAG, "doOnEach" + notification.getKind().name()); // onNext,onNext,onNext,onNext,onCompleted
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.d(TAG, "doOnSubscribe");
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.d(TAG, "doOnUnsubscribe");
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer = " + integer);
}
});
Meterialize操作符
Meterialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来,而DeMeterialize则是执行相反的过程。
Observable.just(1,2,3)
.materialize()
.subscribe(new Action1<Notification<Integer>>() {
@Override
public void call(Notification<Integer> integerNotification) {
Log.d(TAG,"kind="+integerNotification.getKind().name()+"value="+integerNotification.getValue());
}
});
SubscribOn/ObserverOn
SubscribOn用来指定Observable在哪个线程上运行。
ObserverOn用来指定观察者所运行的线程。
TimeInterval/TimeStamp
TimeInterval会拦截发射出来的数据,然后发射两个发射数据的间隔时间。对于第一个发射的数据,其时间间隔为订阅后到首次发射的间隔。
Observable.interval(1,TimeUnit.SECONDS,AndroidSchedulers.mainThread()).
timeInterval().subscribe(new Action1<TimeInterval<Long>>() {
@Override
public void call(TimeInterval<Long> longTimeInterval) {
Log.d(TAG,"value = "+longTimeInterval.getIntervalInMilliseconds());//
}
});
TimeStamp会将每个数据项给重新包装一下,加上了一个时间戳来标明每次发射的时间。
Timeout
Timeout操作符给Observable加上超时时间,每发射一个数据后就重置计时器,当超过预定的时间还没有发射下一个数据,就抛出一个超时的异常。
All操作符
All操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断结果。这个函数使用发射的数据作为参数,内部判断所有的数据是否满足我们定义好的判断条件,如果全部都满足则返回true,否则就返回false。
Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9,10}).all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer<=10;
}
}).subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.d(TAG,"result is "+ aBoolean); //result is true
}
});
Amb操作符
Amb操作符可以将至多9个Observable结合起来,让他们竞争。哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。
Observable<Integer> delay3 = Observable.just(1, 2, 3).delay(3000, TimeUnit.MILLISECONDS);
Observable<Integer> delay2 = Observable.just(4, 5, 6).delay(2000, TimeUnit.MILLISECONDS);
Observable<Integer> delay1 = Observable.just(7, 8, 9).delay(1000, TimeUnit.MILLISECONDS);
Observable.amb(delay1, delay2, delay3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer=" + integer); // 7,8,9
}
});
Contains操作符
Contains操作符用来判断源Observable所发射的数据是否包含某一个数据,如果包含会返回true,如果源Observable已经结束了却还没有发射这个数据则返回false。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.contains(11)
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.d(TAG, "result is " + aBoolean);//result is false
}
});
IsEmpty操作符
IsEmpty操作符用来判断源Observable是否发射过数据,如果发射过就会返回false,如果源Observable已经结束了却还没有发射这个数据则返回true。
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
subscriber.onCompleted();
}
}).isEmpty().subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.d(TAG, "result is " + aBoolean);//result is true
}
});
DefaultIfEmpty
DefaultIfEmpty操作符会判断源Observable是否发射数据,如果源Observable发射了数据则正常发射这些数据,如果没有则发射一个默认的数据。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onCompleted();
}
}).defaultIfEmpty(100).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer= " + integer); // 100
}
});
SequenceEqual操作符
SequenceEqual操作符用来判断两个Observable发射的数据序列是否相同(发射的数据相同,数据的序列相同,结束的状态相同),如果相同返回true,否则返回false。
Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(1,2,3)).subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.d(TAG,"result is "+ aBoolean);//result is true
}
});
Observable.sequenceEqual(Observable.just(1,2),Observable.just(1,2,3)).subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.d(TAG,"result is "+ aBoolean);//result is false
}
});
SkipUntil操作符
SkipUnitl根据一个标志Observable来跳过一些数据,当这个标志Observable没有发射数据的时候,所有源Observable发射的数据都会被跳过;当标志Observable发射了一个数据,则开始正常地发射数据。
Observable.interval(1, TimeUnit.SECONDS)
.skipUntil(Observable.timer(3, TimeUnit.SECONDS)) //延迟3s
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d(TAG, "aLong = " + aLong); //2,3,4...
}
});
SkipWhile操作符
SkipWhile根据一个函数来判断是否跳过数据,当函数返回值为true的时候则一直跳过源Observable发射的数据;当函数返回false的时候则开始正常发射数据。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 5, 4,3,2,1})
.skipWhile(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) { //1,2,3,4,5
Log.d(TAG,"integer -> "+integer ); //如果首次为true后面的将不进行判断
return integer<5; //
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer = " + integer); //5, 6, 5, 4,3,2,1
}
});
TakeUntil操作符
TakeUntil使用一个标志Observable是否发射数据来判断,当标志Observable没有发射数据时,正常发射数据,而一旦标志Observable发射过了数据则后面的数据都会被丢弃。
Observable.interval(1, TimeUnit.SECONDS)
.takeUntil(Observable.timer(3, TimeUnit.SECONDS)) //延迟3s
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d(TAG, "aLong = " + aLong); //0,1
}
});
TakeWhile操作符
TakeWhile则是根据一个函数来判断是否发射数据,当函数返回值为true的时候正常发射数据;当函数返回false的时候丢弃所有后面的数据。
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1})
.takeWhile(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) { //1,2,3,4,5
Log.d(TAG, "integer -> " + integer); //如果首次为false后面的将不进行判断
return integer < 5; //
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "integer = " + integer); //1,2,3,4,5
}
});
Concat操作符
Concat操作符将多个Observable结合成一个Observable并发射数据,并且严格按照先后顺序发射数据,前一个Observable的数据没有发射完,是不能发射后面Observable的数据的。
Observable<Integer> observable1 = Observable.just(1,2,3);
Observable<Integer> observable2 = Observable.just(4,5,6);
Observable<Integer> observable3 = Observable.just(7,8,9);
Observable.concat(observable1,observable2,observable3).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Integer integer) {
Log.d(TAG,"integer="+integer);// 1,2,3,4,5,6,7,8,9
}
});
当一个Observable发生错误的时候,发射会终止。
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onError(new Throwable("error"));
}
});
Observable<Integer> observable3 = Observable.just(7, 8, 9);
Observable.concat(observable1, observable2, observable3).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.getMessage());
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer=" + integer);// 1,2,3,error
}
});
Count操作符
Count操作符用来统计源Observable发射了多少个数据,最后将数目给发射出来;如果源Observable发射错误,则会将错误直接报出来;在源Observable没有终止前,count是不会发射统计数据的。
Observable.just(1, 2, 3).count().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer=" + integer); // integer=3
}
});
Reduce
Reduce操作符接收Observable发射的数据并利用提供的函数的计算结果作为下次计算的参数,输出最后的结果。首次没有计算结果传入前两个参数。
Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9,10}).reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer x, Integer y) {
return x+y; // 1+2+3+4+5+6+7+8+9+10
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG,"result="+ integer); // result = 55
}
});
参考
- RxJava Wiki
- ReactiveX文档中文翻译
- 使用RxJava实现延迟订阅
- party-tricks-with-rxjava-rxandroid-retrolambda
- RxJava操作符
- RxJava变换操作符:.concatMap( )与.flatMap( )的比较
- 避免打断链式结构:使用.compose( )操作符
文/Carve_Time(简书作者)
原文链接:http://www.jianshu.com/p/3b3345e6cea0
著作权归作者所有,转载请联系作者获得授权,并标注“简书作者”。