flatMap基本使用
flatMap是变换操作符,使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatMap合并这些Observable发射的数据,最后将合并后的结果当作它自己的数据序列发射。
注意:flatMap对这些Observable发射的数据做的是合并(merge)操作,因此它们可能是交错的。
我们可以用代码示例说明:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
String flatMap = "I am value " + integer;
return Observable.just(flatMap);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
println("flatMap : accept : " + s + "\n");
}
});
输出结果:
flatMap : accept : I am value 1
flatMap : accept : I am value 2
flatMap : accept : I am value 3
下面我们将从源码的角度来分析下:
这里我们首先使用create操作符创建一个Observable,并且发射指定的数据。关于create如何创建Observable对象,我们这里不做分析,前面文章中已经说明。我们直接分析flatMap的源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) {
@SuppressWarnings("unchecked")
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
flatMap方法具有多个重载方法,在调用flatMap方法时,首先需要传入一个Function的对象,但对是Function类中的泛型参数有要求,第二个参数必须是继承自ObservableSource类,然后在抽象的apply方法中返回该类型参数。而我们都知道Observable就是继承自该类。所以在Function中,最后必须返回的是一个Observable的对象。
我们知道在执行操作符方法时,最后都会生成一个具体的Observable对象,这个对象作为Observable的具体实现类,里面实现了具体的业务逻辑,然后通过多态的方式进行调用,而flatMap中对应的就是ObservableFlatMap类。
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
.......
}
ObservableFlatMap的构造方法中,传入了五个参数。如下:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
......
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
其中this参数,代表的是当前创建的Observable对象,本例子中使用的是create操作符创建的对象,所以具体的对象就是ObservableCreate实例。
mapper就是Function中的第二个参数,是一个新的Observable对象。其他参数都是系统默认参数。
完成了被观察者的准备工作后,接下来就是订阅观察者了。从之前的分析我们知道在订阅观察者时,实际调用的都是subscribeActual的方法。那么现在我们具体来分析ObservableFlatMap类中的方法:
ObservableFlatMap#subscribeActual
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
里面再次调用了source.subscribe方法,而我们刚才分析了这个source就是this这个参数指代的对象,这里即ObservableCreate对象。
Observable#subscribe
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
参数observer就是上面新建的MergeObserver对象。由于subscribeActual是个抽象方法,这里执行的是ObservableCreate中的方法。我们在看一下该类中的方法:
ObservableCreate#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这里面的通过调用ObservableOnSubscribe类中的subscribe方法,开发发射数据。但是参数observer就是刚才的MergeObserver对象。
在通过onNext发射数据时,其实执行的就是MergeObserver中的onNext的对象。
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
方法中,首先接收了发射的数据,然后在通过mapper.apply(t)将数据转化为具有发射数据的ObservableSource对象,这里就对应了之前所说的Function将发射的数据转换为ObservableSource的对象。然后将新生成的ObservableSource的数据发射,然后观察者接收新发射的数据。这里面代码很多逻辑比较复杂就不再一一分析。