RxJava使用详解系列文章
详细的例子可以查看文章末尾的源码
这篇文章主要讲RxJava中常见的组合操作符
1.combineLatest操作符把两个Observable产生的结果进行合并,合并的结果组成一个新的Observable。下面的栗子是ob2中的每一个数据项都与ob1中的最后一项进行相加,将生成的结果组成一个新的Observable对象.
combineLatest操作符可以接受2-9个Observable作为参数,最后一个Observable中的每一个数据项,都与前面Observable中的最后一项进行规则运算。也就是call方法中的最后一个值参数是最后一个Observable的每一项数据,3.merge操作符 将多个Observalbe发射的数据项,合并到一个Observable中再发射出去,可能会让合并的Observable发射的数据交错(concat是连接不会出现交错),如果在合并的途中出现错误,就会立即将错误提交给订阅者,将终止合并后的Observable
前面的参数是前面每一个Observable的最后一项数据,固定不变的。
combineLatest(List,FuncN)操作符可以接受一个Observable的list集合,集合中最后一个Observable中的每一项数据,会跟前面每一个Observable对象的最后一项数据进行规则运算
默认不在任何特定的调度器上执行。Observable<Integer> ob1 = Observable.just(1,2,3);输出结果:
Observable<Integer> ob2 = Observable.just(4,5,6);
Observable<Integer> ob3 = Observable.just(7,8,9);
List<Observable<Integer>> list = new ArrayList<>();
list.add(ob1);list.add(ob2);list.add(ob3);
Observable.combineLatest(ob1, ob2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
System.out.println("combineLatest(o1,o2,Func2):"+"o1:" + integer +" o2:"+ integer2 );
return integer + integer2;//这里进行合并的规则,可以用函数进行运算返回一个数据
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("combineLatest(o1,o2,Func2) onNext:" + integer);
}
});
combineLatest(o1,o2,Func2):o1:3 o2:4
combineLatest(o1,o2,Func2) onNext:7
combineLatest(o1,o2,Func2):o1:3 o2:5
combineLatest(o1,o2,Func2) onNext:8
combineLatest(o1,o2,Func2):o1:3 o2:6
combineLatest(o1,o2,Func2) onNext:9Observable.combineLatest(ob1, ob2, ob3, new Func3<Integer, Integer, Integer, Integer>() { @Override//这里进行合并的规则,可以用函数进行运算返回一个数据 public Integer call(Integer integer, Integer integer2, Integer integer3) { System.out.println("combineLatest(o1,o2,o3,Func3):"+"o1:" + integer +" o2:"+ integer2 +" o3:"+ integer3); return integer + integer2 + integer3; }}).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("combineLatest(o1,o2,o3,Func3) onNext:" + integer); }});输出结果:
combineLatest(o1,o2,o3,Func3):o1:3 o2:6 o3:7
combineLatest(o1,o2,o3,Func3) onNext:16
combineLatest(o1,o2,o3,Func3):o1:3 o2:6 o3:8
combineLatest(o1,o2,o3,Func3) onNext:17
combineLatest(o1,o2,o3,Func3):o1:3 o2:6 o3:9
combineLatest(o1,o2,o3,Func3) onNext:18Observable.combineLatest(list, new FuncN<String>() { @Override//这里进行合并的规则,可以用函数进行运算返回一个数据 public String call(Object... args) { String concat = ""; for (Object value : args){ System.out.println("combineLatest(List,FuncN) value:" + value); concat += value; } return concat; }}).subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println("combineLatest(List,FuncN) onNext:" + s); }});输出结果:
combineLatest(List,FuncN) value:3
combineLatest(List,FuncN) value:6
combineLatest(List,FuncN) value:7
combineLatest(List,FuncN) onNext:367
combineLatest(List,FuncN) value:3
combineLatest(List,FuncN) value:6
combineLatest(List,FuncN) value:8
combineLatest(List,FuncN) onNext:368
combineLatest(List,FuncN) value:3
combineLatest(List,FuncN) value:6
combineLatest(List,FuncN) value:9
combineLatest(List,FuncN) onNext:369
2.join操作符将两个Observable产生的结果合并成一个新Observable对象,join操作符可以控制每个Observable产生结果的生命周期。参数解释 ob1.join(ob2,ob1产生结果生命周期控制函数,ob2产生结果生命周期控制函数,ob1和ob2合并结果的规则)groupJoin()操作符第四个参数与join操作符不同,详细的运行栗子查看
Observable<Integer> ob1 = Observable.just(1,2);Observable<Integer> ob2 = Observable.just(3,4);//join操作符ob1.join(ob2, new Func1<Integer, Observable<Integer>>() {//ob1产生结果生命周期控制函数 @Override public Observable<Integer> call(Integer integer) { //使ob1延迟200毫秒执行 return Observable.just(integer).delay(200, TimeUnit.MILLISECONDS); }}, new Func1<Integer, Observable<Integer>>() {//ob2产生结果声明周期控制函数 @Override public Observable<Integer> call(Integer integer) { //使ob2延迟200毫秒执行 return Observable.just(integer).delay(200, TimeUnit.MILLISECONDS); }}, new Func2<Integer, Integer, Integer>() {//ob1 和ob2产生结果的合并规则 @Override public Integer call(Integer integer1, Integer integer2) { System.out.println("join(ob2,Func1,Func1,Func2) " + "integer1:" +integer1+ " integer2:" + integer2); return integer1 + integer2; }}).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("join(ob2,Func1,Func1,Func2) " + integer); }});输出结果:
join(ob2,Func1,Func1,Func2) integer1:1 integer2:3
join(ob2,Func1,Func1,Func2) 4
join(ob2,Func1,Func1,Func2) integer1:2 integer2:3
join(ob2,Func1,Func1,Func2) 5
join(ob2,Func1,Func1,Func2) integer1:1 integer2:4
join(ob2,Func1,Func1,Func2) 5
join(ob2,Func1,Func1,Func2) integer1:2 integer2:4
join(ob2,Func1,Func1,Func2) 6//groupJoin操作符ob1.groupJoin(ob2, new Func1<Integer, Observable<Integer>>() {//ob1产生结果生命周期控制函数 @Override public Observable<Integer> call(Integer integer) { //使ob1延迟1600毫秒执行 return Observable.just(integer).delay(1600, TimeUnit.MILLISECONDS); }}, new Func1<Integer, Observable<Integer>>() {//ob2产生结果声明周期控制函数 @Override public Observable<Integer> call(Integer integer) { //使ob2延迟600毫秒执行 return Observable.just(integer).delay(600, TimeUnit.MILLISECONDS); }}, new Func2<Integer, Observable<Integer>, Observable<Integer>>() { @Override public Observable<Integer> call(final Integer integer1, Observable<Integer> observable) { return observable.map(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer2) { System.out.println("groupJoin(ob2,Func1,Func1,Func2) " + "integer1:" + integer1 + " integer2:" + integer2); return integer1 + integer2; } }); }}). subscribe(new Action1<Observable<Integer>>() { @Override public void call(Observable<Integer> observable) { observable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("groupJoin(ob2,Func1,Func1,Func2) onNnext:" + integer); } }); }});输出结果:
groupJoin(ob2,Func1,Func1,Func2) integer1:1 integer2:3
groupJoin(ob2,Func1,Func1,Func2) onNnext:4
groupJoin(ob2,Func1,Func1,Func2) integer1:2 integer2:3
groupJoin(ob2,Func1,Func1,Func2) onNnext:5
groupJoin(ob2,Func1,Func1,Func2) integer1:1 integer2:4
groupJoin(ob2,Func1,Func1,Func2) onNnext:5
groupJoin(ob2,Func1,Func1,Func2) integer1:2 integer2:4
groupJoin(ob2,Func1,Func1,Func2) onNnext:6mergeDelayError操作符类似于merge操作符,唯一不同就是如果在合并途中出现错误,不会立即发射错误通知,而是保留错误直到合并后的Observable将所有的数据发射完成,
此时才会将onError提交给订阅者。
合并多个Observable也可以通过传递一个Observalbe列表List、数组。Observable<Integer> ob1 = Observable.just(1,2,3).delay(100, TimeUnit.MILLISECONDS);输出结果:
Observable<Integer> ob2 = Observable.just(4,5,6)/*.delay(100, TimeUnit.MILLISECONDS)*/;
Observable.merge(ob1,ob2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("merge(ob1,ob2) onNext:" + integer);
}
});12-20 13:18:42.341 31603-31603/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:4
12-20 13:18:42.341 31603-31603/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:5
12-20 13:18:42.341 31603-31603/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:6
12-20 13:18:42.440 31603-31672/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:1
12-20 13:18:42.441 31603-31672/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:2
12-20 13:18:42.441 31603-31672/com.dingmouren.rxjavademo I/System.out: merge(ob1,ob2) onNext:3
4.startWith操作符是在源Observable提交结果之前插入指定的数据,可以是数值,也可以是Observable对象
Observable.just(1,2,3).startWith(0).subscribe(new Action1<Integer>() {输出结果:
@Override
public void call(Integer integer) {
System.out.println("startWith(T) onNext:" + integer);
}
});
System.out.println(" - - - - - - - - ");
Observable<Integer> ob2 = Observable.just(4,5,6);
Observable.just(1,2,3).startWith(ob2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("startWith(T) onNext:" + integer);
}
});startWith(T) onNext:0
startWith(T) onNext:1
startWith(T) onNext:2
startWith(T) onNext:3
- - - - - - - -
startWith(T) onNext:4
startWith(T) onNext:5
startWith(T) onNext:6
startWith(T) onNext:1
startWith(T) onNext:2
startWith(T) onNext:3
5.switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者(看源码中的例子)
6.zip操作符严格按照顺序进行组合Observable,假设两个Observable合并,ob1发射2个数据,ob2发射3个数据,最终合并的胡发射2个合并的数据。
zipWith操作符与上面类似,具体的看下面的例子
默认不在特定的调度器上执行
Observable<Integer> ob1 = Observable.just(1,2,3);输出结果:
Observable<Integer> ob2 = Observable.just(4,5,6);
Observable.zip(ob1, ob2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer1, Integer integer2) {
System.out.println("zip(ob1,ob2,Func2) integer1:" + integer1 +" integer2:"+integer2);
return integer1 + integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("zip(ob1,ob2,Func2) onNext:" + integer);
}
});
System.out.println("- - - - - - - -");
ob1.zipWith(ob2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer1, Integer integer2) {
System.out.println("ob1.zipWith(ob2,Func2) integer1:" + integer1 +" integer2:"+integer2);
return integer1 + integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("ob1.zipWith(ob2,Func2) " + integer );
}
});zip(ob1,ob2,Func2) integer1:1 integer2:4
zip(ob1,ob2,Func2) onNext:5
zip(ob1,ob2,Func2) integer1:2 integer2:5
zip(ob1,ob2,Func2) onNext:7
zip(ob1,ob2,Func2) integer1:3 integer2:6
zip(ob1,ob2,Func2) onNext:9
- - - - - - - -
ob1.zipWith(ob2,Func2) integer1:1 integer2:4
ob1.zipWith(ob2,Func2) 5
ob1.zipWith(ob2,Func2) integer1:2 integer2:5
ob1.zipWith(ob2,Func2) 7
ob1.zipWith(ob2,Func2) integer1:3 integer2:6
ob1.zipWith(ob2,Func2) 9更多详细内容和例子,可以查看源码