RxJava 组合操作符

时间:2021-07-13 17:50:08

RxJava有许多组合操作符,按照某种规则,进行多个源Observable的合并,下面依次介绍:

concat操作符是将多个Observable 按传入顺序进行输出,O_a输出完毕,O_b接着输出:

例子:

  Observable<Integer> range = Observable.range(1, 5);
List<Integer> data = Arrays.asList(6, 7, 8, 9, 10);
Observable<Integer> from = Observable.from(data);

Observable.concat(from,range).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
LogUtils.d("----->onCompleted");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----->onError");
}

@Override
public void onNext(Integer o) {
LogUtils.d("----->onNext:" + o);
}
});

结果:

03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:6
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:7
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:8
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:9
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:10
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:1
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:2
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:3
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:4
03-09 15:49:27.845 13639-13639/com.rxandroid.test1 D/----->: ----->onNext:5
03-09 15:49:27.846 13639-13639/com.rxandroid.test1 D/----->: ----->onCompleted



combineLastest() 是将第一个Observable的【最新】数据与后面的Observable数据项按某种规则合并:

例子:

 Observable<Integer> range = Observable.range(1, 5);
List<Integer> data = Arrays.asList(6, 7, 8, 9, 10);
Observable<Integer> from = Observable.from(data);
Observable.combineLatest(range, from, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
LogUtils.d("----->i1:"+integer+" i2:"+integer2);
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
LogUtils.d("----->onCompleted");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----->onError");
}

@Override
public void onNext(Integer o) {
LogUtils.d("----------->onNext:" + o);
}
});

结果:

03-09 15:59:34.482 3010-3010/com.rxandroid.test1 D/----->: ----->i1:5  i2:6
03-09 15:59:34.482 3010-3010/com.rxandroid.test1 D/----->: ----------->onNext:11
03-09 15:59:34.482 3010-3010/com.rxandroid.test1 D/----->: ----->i1:5  i2:7
03-09 15:59:34.482 3010-3010/com.rxandroid.test1 D/----->: ----------->onNext:12
03-09 15:59:34.482 3010-3010/com.rxandroid.test1 D/----->: ----->i1:5  i2:8
03-09 15:59:34.482 3010-3010/com.rxandroid.test1 D/----->: ----------->onNext:13
03-09 15:59:34.483 3010-3010/com.rxandroid.test1 D/----->: ----->i1:5  i2:9
03-09 15:59:34.486 3010-3010/com.rxandroid.test1 D/----->: ----------->onNext:14
03-09 15:59:34.486 3010-3010/com.rxandroid.test1 D/----->: ----->i1:5  i2:10
03-09 15:59:34.486 3010-3010/com.rxandroid.test1 D/----->: ----------->onNext:15
03-09 15:59:34.487 3010-3010/com.rxandroid.test1 D/----->: ----->onCompleted


join 操作符:将两个Observable合并,A.join(b)

在a的生命周期内:b输出的数据项与a输出的数据项每个合并,直到b输出下一项:

例子:

Observable<Integer> range = Observable.range(1, 5);

List<Integer> data = Arrays.asList(6, 7, 8, 9, 10);
Observable<Integer> from = Observable.from(data);

range.join(from, new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
LogUtils.d("->left:" + integer);
return Observable.just(integer);
}
}, new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
LogUtils.d("->right:" + integer);
return Observable.just(integer);
}
}, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
LogUtils.d("---->left:" + integer + " right:" + integer2);
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
LogUtils.d("----->onCompleted");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----->onError");
}

@Override
public void onNext(Integer o) {
LogUtils.d("----------->onNext:" + o);
}
});


结果:

03-09 16:26:20.605 23851-23851/com.rxandroid.test1 D/----->: ->left:1
03-09 16:26:20.605 23851-23851/com.rxandroid.test1 D/----->: ->left:2
03-09 16:26:20.605 23851-23851/com.rxandroid.test1 D/----->: ->left:3
03-09 16:26:20.605 23851-23851/com.rxandroid.test1 D/----->: ->left:4
03-09 16:26:20.605 23851-23851/com.rxandroid.test1 D/----->: ->left:5
03-09 16:26:20.605 23851-23851/com.rxandroid.test1 D/----->: ----->onCompleted

分析:同一线程:a的生命周期已经执行完了,b还没出来,所以合并不了


例子:

Observable<Integer> range = Observable.range(1, 5);
range.subscribeOn(Schedulers.newThread());

List<Integer> data = Arrays.asList(6, 7, 8, 9, 10);
Observable<Integer> from = Observable.from(data);
from.subscribeOn(Schedulers.newThread());

range.join(from, new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
LogUtils.d("->left:" + integer);
return Observable.just(integer).delay(1,TimeUnit.SECONDS);
}
}, new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
LogUtils.d("->right:" + integer);
return Observable.just(integer).delay(1,TimeUnit.SECONDS);
}
}, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
LogUtils.d("---->left:" + integer + " right:" + integer2);
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
LogUtils.d("----->onCompleted");
}

@Override
public void onError(Throwable e) {
LogUtils.d("----->onError");
}

@Override
public void onNext(Integer o) {
LogUtils.d("----------->onNext:" + o);
}
});


结果:

03-09 16:31:17.506 7575-7575/com.rxandroid.test1 D/----->: ->left:1
03-09 16:31:17.506 7575-7575/com.rxandroid.test1 D/----->: ->left:2
03-09 16:31:17.506 7575-7575/com.rxandroid.test1 D/----->: ->left:3
03-09 16:31:17.507 7575-7575/com.rxandroid.test1 D/----->: ->left:4
03-09 16:31:17.507 7575-7575/com.rxandroid.test1 D/----->: ->left:5
03-09 16:31:17.507 7575-7575/com.rxandroid.test1 D/----->: ->right:6
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ---->left:5  right:6
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:11
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ---->left:2  right:6
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:8
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ---->left:1  right:6
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:7
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ---->left:4  right:6
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:10
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ---->left:3  right:6
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:9
03-09 16:31:17.508 7575-7575/com.rxandroid.test1 D/----->: ->right:7
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ---->left:5  right:7
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:12
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ---->left:2  right:7
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:9
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ---->left:1  right:7
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:8
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ---->left:4  right:7
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:11
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ---->left:3  right:7
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:10
03-09 16:31:17.509 7575-7575/com.rxandroid.test1 D/----->: ->right:8
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ---->left:5  right:8
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:13
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ---->left:2  right:8
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:10
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ---->left:1  right:8
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:9
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ---->left:4  right:8
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:12
03-09 16:31:17.511 7575-7575/com.rxandroid.test1 D/----->: ---->left:3  right:8
03-09 16:31:17.513 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:11
03-09 16:31:17.513 7575-7575/com.rxandroid.test1 D/----->: ->right:9
03-09 16:31:17.513 7575-7575/com.rxandroid.test1 D/----->: ---->left:5  right:9
03-09 16:31:17.513 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:14
03-09 16:31:17.513 7575-7575/com.rxandroid.test1 D/----->: ---->left:2  right:9
03-09 16:31:17.514 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:11
03-09 16:31:17.514 7575-7575/com.rxandroid.test1 D/----->: ---->left:1  right:9
03-09 16:31:17.514 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:10
03-09 16:31:17.514 7575-7575/com.rxandroid.test1 D/----->: ---->left:4  right:9
03-09 16:31:17.514 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:13
03-09 16:31:17.514 7575-7575/com.rxandroid.test1 D/----->: ---->left:3  right:9
03-09 16:31:17.515 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:12
03-09 16:31:17.515 7575-7575/com.rxandroid.test1 D/----->: ->right:10
03-09 16:31:17.516 7575-7575/com.rxandroid.test1 D/----->: ---->left:5  right:10
03-09 16:31:17.516 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:15
03-09 16:31:17.516 7575-7575/com.rxandroid.test1 D/----->: ---->left:2  right:10
03-09 16:31:17.516 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:12
03-09 16:31:17.517 7575-7575/com.rxandroid.test1 D/----->: ---->left:1  right:10
03-09 16:31:17.517 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:11
03-09 16:31:17.517 7575-7575/com.rxandroid.test1 D/----->: ---->left:4  right:10
03-09 16:31:17.517 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:14
03-09 16:31:17.517 7575-7575/com.rxandroid.test1 D/----->: ---->left:3  right:10
03-09 16:31:17.517 7575-7575/com.rxandroid.test1 D/----->: ----------->onNext:13
03-09 16:31:17.517 7575-7575/com.rxandroid.test1 D/----->: ----->onCompleted


GroupJion 操作符:

Observable<Integer> range = Observable.range(1, 5);
range.subscribeOn(Schedulers.newThread());

List<Integer> data = Arrays.asList(6, 7, 8, 9, 10);
Observable<Integer> from = Observable.from(data);
from.subscribeOn(Schedulers.newThread());

range.groupJoin(from, new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer).delay(2,TimeUnit.SECONDS);
}
}, new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer).delay(1,TimeUnit.SECONDS);
}
}, new Func2<Integer, Observable<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(final Integer integer, Observable<Integer> integerObservable) {
return integerObservable.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer i) {
return integer + i;
}
});
}
}).subscribe(new Subscriber<Observable<Integer>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Observable<Integer> integerObservable) {
LogUtils.d("------->组:");
integerObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
LogUtils.d("------->i:" +integer);
}
});
}
});

结果:

03-18 13:27:52.415 14783-14783/com.rxandroid.test1 D/----->: ------->组:
03-18 13:27:52.416 14783-14783/com.rxandroid.test1 D/----->: ------->组:
03-18 13:27:52.419 14783-14783/com.rxandroid.test1 D/----->: ------->组:
03-18 13:27:52.420 14783-14783/com.rxandroid.test1 D/----->: ------->组:
03-18 13:27:52.420 14783-14783/com.rxandroid.test1 D/----->: ------->组:
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:11
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:8
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:7
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:10
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:9
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:12
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:9
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:8
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:11
03-18 13:27:52.421 14783-14783/com.rxandroid.test1 D/----->: ------->i:10
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:13
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:10
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:9
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:12
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:11
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:14
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:11
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:10
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:13
03-18 13:27:52.422 14783-14783/com.rxandroid.test1 D/----->: ------->i:12
03-18 13:27:52.423 14783-14783/com.rxandroid.test1 D/----->: ------->i:15
03-18 13:27:52.423 14783-14783/com.rxandroid.test1 D/----->: ------->i:12
03-18 13:27:52.423 14783-14783/com.rxandroid.test1 D/----->: ------->i:11
03-18 13:27:52.423 14783-14783/com.rxandroid.test1 D/----->: ------->i:14
03-18 13:27:52.423 14783-14783/com.rxandroid.test1 D/----->: ------->i:13


merge操作符:

 按照两个Observable提交结果的时间顺序进行输出

Observable<Long> o1 = Observable.interval(5, TimeUnit.SECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 1;
}
}).take(10);

Observable<Long> o2 = Observable.interval(5, TimeUnit.SECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 100;
}
}).take(10);

Observable.merge(o1,o2).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
LogUtils.d("------>onCompleted()");
}

@Override
public void onError(Throwable e) {
LogUtils.d("------>onError()" + e);
}

@Override
public void onNext(Long integer) {
LogUtils.d("------>onNext()" + integer);
}
});


结果:

03-18 13:45:37.972 646-700/com.rxandroid.test1 D/----->: ------>onNext()0
03-18 13:45:37.992 646-701/com.rxandroid.test1 D/----->: ------>onNext()0
03-18 13:45:42.952 646-700/com.rxandroid.test1 D/----->: ------>onNext()1
03-18 13:45:42.952 646-701/com.rxandroid.test1 D/----->: ------>onNext()100
03-18 13:45:47.952 646-700/com.rxandroid.test1 D/----->: ------>onNext()2
03-18 13:45:47.952 646-701/com.rxandroid.test1 D/----->: ------>onNext()200
03-18 13:45:52.952 646-700/com.rxandroid.test1 D/----->: ------>onNext()3
03-18 13:45:52.952 646-701/com.rxandroid.test1 D/----->: ------>onNext()300
03-18 13:45:57.952 646-700/com.rxandroid.test1 D/----->: ------>onNext()4
03-18 13:45:57.952 646-701/com.rxandroid.test1 D/----->: ------>onNext()400
03-18 13:46:02.952 646-700/com.rxandroid.test1 D/----->: ------>onNext()5
03-18 13:46:02.952 646-701/com.rxandroid.test1 D/----->: ------>onNext()500
03-18 13:46:07.962 646-700/com.rxandroid.test1 D/----->: ------>onNext()6
03-18 13:46:07.962 646-701/com.rxandroid.test1 D/----->: ------>onNext()600
03-18 13:46:12.962 646-700/com.rxandroid.test1 D/----->: ------>onNext()7
03-18 13:46:12.962 646-701/com.rxandroid.test1 D/----->: ------>onNext()700
03-18 13:46:17.962 646-700/com.rxandroid.test1 D/----->: ------>onNext()8
03-18 13:46:17.962 646-701/com.rxandroid.test1 D/----->: ------>onNext()800
03-18 13:46:22.952 646-700/com.rxandroid.test1 D/----->: ------>onNext()9
03-18 13:46:22.962 646-700/com.rxandroid.test1 D/----->: ------>onNext()900
03-18 13:46:22.972 646-700/com.rxandroid.test1 D/----->: ------>onCompleted()


mergeDelayError操作符

在merge过程中 一旦某一个Observable中出现错误,就会停止合并,mergeDelayError操作符会把错误放在所有结果都和并完成后才执行:

Observable<Long> o1 = Observable.interval(5, TimeUnit.SECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong>2)//模拟
throw new RuntimeException();
return aLong * 1;
}
}).take(5);

Observable<Long> o2 = Observable.interval(5, TimeUnit.SECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 100;
}
}).take(5);

Observable.mergeDelayError(o1, o2).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
LogUtils.d("------>onCompleted()");
}

@Override
public void onError(Throwable e) {
LogUtils.d("------>onError()" + e);
}

@Override
public void onNext(Long integer) {
LogUtils.d("------>onNext()" + integer);
}
});

结果:

03-18 13:54:53.652 10298-10344/com.rxandroid.test1 D/----->: ------>onNext()0
03-18 13:54:53.652 10298-10344/com.rxandroid.test1 D/----->: ------>onNext()0
03-18 13:54:58.652 10298-10344/com.rxandroid.test1 D/----->: ------>onNext()1
03-18 13:54:58.652 10298-10345/com.rxandroid.test1 D/----->: ------>onNext()100
03-18 13:55:03.652 10298-10344/com.rxandroid.test1 D/----->: ------>onNext()2
03-18 13:55:03.652 10298-10345/com.rxandroid.test1 D/----->: ------>onNext()200
03-18 13:55:08.652 10298-10345/com.rxandroid.test1 D/----->: ------>onNext()300
03-18 13:55:13.652 10298-10345/com.rxandroid.test1 D/----->: ------>onNext()400
03-18 13:55:13.652 10298-10345/com.rxandroid.test1 D/----->: ------>onError()java.lang.RuntimeException


startWith操作符:

   在源Observable输出之前插入指定数据项:

Observable.range(1, 10).startWith(-2, -1).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
LogUtils.d("------>onCompleted()");
}

@Override
public void onError(Throwable e) {
LogUtils.d("------>onError()" + e);
}

@Override
public void onNext(Integer integer) {
LogUtils.d("------>onNext()" + integer);
}
});
结果:

03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()-2
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()-1
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()1
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()2
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()3
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()4
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()5
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()6
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()7
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()8
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()9
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onNext()10
03-18 13:58:49.442 14596-14596/com.rxandroid.test1 D/----->: ------>onCompleted()


switchOnNext操作符:

    把一组Observable转换成一个Observable, 这组Observable中的每一个Observable所产生的结果,如果在同一个时间段内两个或者多个Observable提交结果,只取最后一个Observable提交的结果

 Observable<Observable<Long>> longObservable = Observable.interval(3, TimeUnit.SECONDS).map(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(final Long aLong1) {
return Observable.interval(1, TimeUnit.SECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong + (aLong1*10);
}
}).take(3);
}
}).take(3);
Observable.switchOnNext(longObservable).subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
LogUtils.d("------>onCompleted()");
}

@Override
public void onError(Throwable e) {
LogUtils.d("------>onError()" + e);
}

@Override
public void onNext(Long integer) {
LogUtils.d("------>onNext()" + integer);
}
});


结果:少了2 12 

03-18 14:24:06.332 9782-9815/com.rxandroid.test1 D/----->: ------>onNext()0
03-18 14:24:07.322 9782-9815/com.rxandroid.test1 D/----->: ------>onNext()1
03-18 14:24:09.312 9782-9814/com.rxandroid.test1 D/----->: ------>onNext()10
03-18 14:24:10.322 9782-9814/com.rxandroid.test1 D/----->: ------>onNext()11
03-18 14:24:12.322 9782-9815/com.rxandroid.test1 D/----->: ------>onNext()20
03-18 14:24:13.312 9782-9815/com.rxandroid.test1 D/----->: ------>onNext()21
03-18 14:24:14.322 9782-9815/com.rxandroid.test1 D/----->: ------>onNext()22
03-18 14:24:14.322 9782-9815/com.rxandroid.test1 D/----->: ------>onCompleted()


zip 操作符:

   是将同索引位置的数据项进行合并,放单的数据项丢弃

 Observable.zip(Observable.just(1, 2, 3, 4), Observable.just(10, 20, 30, 40, 50), new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer+integer2;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
LogUtils.d("------>onCompleted()");
}

@Override
public void onError(Throwable e) {
LogUtils.d("------>onError()" + e);
}

@Override
public void onNext(Integer integer) {
LogUtils.d("------>onNext()" + integer);
}
});

结果:

03-18 14:32:23.652 19015-19015/com.rxandroid.test1 D/----->: ------>onNext()11
03-18 14:32:23.652 19015-19015/com.rxandroid.test1 D/----->: ------>onNext()22
03-18 14:32:23.652 19015-19015/com.rxandroid.test1 D/----->: ------>onNext()33
03-18 14:32:23.652 19015-19015/com.rxandroid.test1 D/----->: ------>onNext()44
03-18 14:32:23.652 19015-19015/com.rxandroid.test1 D/----->: ------>onCompleted()