RxJava组合操作符实例

时间:2022-01-15 14:40:47

结合操作符

组合操作符的作用是可以结合多个Observable进行操作。

CombineLatest 操作符

他可以组合两个Observable,进行一定的操作之后,再次发射下去,例如:

 Observable.combineLatest(Observable.range(5,2), Observable.range(10, 4), new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer, Integer integer2) {
return integer+"=="+integer2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s + "=combineLatest");
}
});

结果是:

10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==10=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==11=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==12=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==13=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==14=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==15=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==16=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==17=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==18=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==19=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==20=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==21=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==22=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==23=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==24=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==25=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==26=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==27=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==28=combineLatest
10-19 15:31:45.776 20501-20501/com.example.user.testproject I/RxJavaTest: 6==29=combineLatest

它继续发射的前提是:其中的一个Observable还有数据没有发射,那么,他讲两个Observable目前最新发射的数据组合在一起,比如上面,第一个Observable最新的数据是6,然后第二个的依次在变,然后再把他们组合在一起。
重载方法

combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
或者是:
combineLatest(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)

他们都是输入一堆的List

 List<Observable<Integer>> list = new ArrayList<>();
list.add(A.range(10, 1));
list.add(A.range(10, 1));
list.add(A.range(10, 1));
Observable.combineLatest(list, new FuncN<Object>() {
@Override
public Object call(Object... args) {
for (Object obj : args) {
Log.i(TAG, obj.toString());
}
return args;
}
}).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {

}
});

RxJava组合操作符实例

join 组合操作符,他的声明如下:

public final <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Observable<TRight> right, Func1<T, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<T, TRight, R> resultSelector)

例子调用:

Observable<String> left =
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return "L" + aLong;
}
});
Observable<String> right =
Observable.interval(200, TimeUnit.MILLISECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return "R" + aLong;
}
});
left.join(right, new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(String s) {
Log.i(TAG, s + "==");
return Observable.never();
}
},
new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
Log.i(TAG, s + "=====");
return Observable.timer(0, TimeUnit.MILLISECONDS);
}
}
, new Func2<String, String, String>() {
@Override
public String call(String s, String s2) {
return s + "-" + s2;
}
})
.take(10)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});

例子来源:RxJava 教程第四部分:并发 之意外情况处理
结果输出:

L0==
L1==
R0=====
L1-R0
L0-R0
L2==
L3==
R1=====
L1-R1
L0-R1
L3-R1
L2-R1
L4==
R2=====
L4-R2
L1-R2
L0-R2
L3-R2

可以看到,在right join如left之后,她会结合每一次left的发射的Observable,然后再次发射,但是他的前提是left窗口还有数据在发射。假如left窗口没有数据了,那么right窗口也就不会再去跟left窗口接口再去发射了,比如:

Observable.range(10,10).join(Observable.range(10, 2), new Func1<Integer, Observable<Object>>() {
@Override
public Observable<Object> call(Integer integer) {
return Observable.just(integer);
}
}, new Func1<Integer, Observable<Object>>() {
@Override
public Observable<Object> call(Integer integer) {
return Observable.never();
}
}, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer, Integer integer2) {
return integer + "-" + integer2;
}
}).take(10).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s + "join");
}
});

将不会有输出。
图:
RxJava组合操作符实例

merge 组合操作符

用于合并多个Observable,他们需要同类型,按照前到后的顺历依次发射所有的Observable例子:

 Observable.merge(Observable.range(10,2),Observable.range(20,3)).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, integer + "merge");
}
});

输出:

10-19 15:47:39.973 2140-2140/? I/RxJavaTest: 10merge
10-19 15:47:39.973 2140-2140/? I/RxJavaTest: 11merge
10-19 15:47:39.973 2140-2140/? I/RxJavaTest: 20merge
10-19 15:47:39.973 2140-2140/? I/RxJavaTest: 21merge
10-19 15:47:39.973 2140-2140/? I/RxJavaTest: 22merge

图:
RxJava组合操作符实例

zip 组合操作符

用于多两个Observable进行再次操作之后再次发射,他是有顺序的,他会按照顺序去结合多个Observable之间的数据,按照最短的数据为zip的func2的调用次数,例子

 Observable.zip(Observable.range(10, 10), Observable.range(5, 2), new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer, Integer integer2) {
return integer+"-"+integer2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG,s);
}
});

输出:

10-19 15:57:57.482 11123-11123/com.example.user.testproject I/RxJavaTest: 10-5
10-19 15:57:57.482 11123-11123/com.example.user.testproject I/RxJavaTest: 11-6

图:
RxJava组合操作符实例

zipWith

跟zip类型,但是他是非静态的,需要在另外一个Observable操作之上,他接受两个参数,一种是一个Observable和Func2,另外一个是多个组合, Iterable和Func2例子:

 Observable.range(10,2).zipWith(Observable.range(10,1),new Func2<Integer,Integer,String>(){

@Override
public String call(Integer o, Integer o2) {
return o+"="+o2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG,s);
}
});

RxJava组合操作符实例

switchOnNext

将一个发射多个Observable对象装换为一个Observable发射

 Observable.switchOnNext(Observable.just(Observable.range(1,1),Observable.range(2,1))).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, integer + "switchOnNext");
}
});

输出:

10-19 16:13:26.296 24641-24641/? I/RxJavaTest: 1switchOnNext
10-19 16:13:26.296 24641-24641/? I/RxJavaTest: 2switchOnNext

图:
![](
http://reactivex.io/documentation/operators/images/switch.c.png)

startWith

在源Observable之前插入一个或者是多个数据,例如

 Observable.range(5,2).startWith(6).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, integer + "switchOnNext");
}
});

结果:

10-19 16:16:10.690 27055-27055/? I/RxJavaTest: 6switchOnNext
10-19 16:16:10.690 27055-27055/? I/RxJavaTest: 5switchOnNext
10-19 16:16:10.690 27055-27055/? I/RxJavaTest: 6switchOnNext

图:
RxJava组合操作符实例