RxJava系列(二、详解操作符)

时间:2021-04-04 17:51:04

RxJava中的操作符(Operators),RxJava中的操作符主要分成了三类:
转换类操作符、过滤类操作符、组合类操作符

转换类操作符

  • Map

map()函数接受一个Func1类型的参数,然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值。具体可参考上一篇博客。官方原理图如下:

RxJava系列(二、详解操作符)

  • flatMap
    Observable.flatMap()接收一个Observable的输出作为输入,同时输出另外一个Observable。
    先来看一个简单例子:
List<String> dataList = new ArrayList<String>();
dataList.add("1");
dataList.add("2");
dataList.add("3");
Observable.just(dataList)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(@NonNull List<String> strings) throws Exception {
Log.e("just---->", strings.get(0));
}
});
运行结果为:

E/just—->: 1

注意观察可以看出,Observable.just().subscrible(new Consume
Observable.fromArray(dataList)
.flatMap(new Function<List<String>, Observable<String>>() {
@Override
public Observable<String> apply(@NonNull List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e("rx---->", s);
}
});

运行结果为:

E/rx—->: 1
E/rx—->: 2
E/rx—->: 3

flatMap()的原理是这样的:
1.将传入的事件对象装换成一个Observable对象;
- 这是不会直接发送这个Observable, 而是将这个Observable激活让它自己开始发送事件;
- 每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable负责将这些事件统一交给Subscriber的回调方法。
RxJava系列(二、详解操作符)

  • ConcatMap

concatMap()解决了flatMap()的交叉问题,它能够把发射的值连续在一起.

  • FlatMapIterable
    flatMapIterable()和flatMap()几乎是一样的,不同的是flatMapIterable()它转化的多个Observable是使用Iterable作为源数据的。
  • switchMap
    switchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。
  • Scan
    scan()操作符是遍历源Observable产生的结果,通过自定义转换规则,依次输出结果给订阅者的第一个参数使用。

    Observable.range(0, 4)
    .scan(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
    return integer + integer2;
    }
    }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer integer) throws Exception {
    Log.e("scan---->", integer+" ");
    }
    });

    运行结果:

    E/scan—->: 0
    E/scan—->: 1
    E/scan—->: 3
    E/scan—->: 6

  • GroupBy
    和SQL中的groupBy类似,分组。
Observable.range(0, 4)
.groupBy(new Function<Integer, String>() {

@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer % 2==0?"偶数":"奇数";
}
}).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(@NonNull GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
if(stringIntegerGroupedObservable.getKey().equals("奇数")){
stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e("groupby---->", integer +"");
}
});
}
}
});

运行结果:

E/groupby—->: 1
E/groupby—->: 3

官方原理图:
RxJava系列(二、详解操作符)

过滤操作符

这类operators主要用于对事件数据的筛选过滤,只返回满足我们条件的数据。过滤类操作符主要包含: Filter, Take, TakeLast, TakeUntilSkip, SkipLast, ElementAt, Debounce, Distinct, DistinctUntilChanged, First, Last等等。

  • Filter
Observable.fromArray(0, 1, 2, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e("filter---->",integer+"");
}
});

运行结果:
E/filter—->: 0
E/filter—->: 2
E/filter—->: 4
E/filter—->: 6

  • Take
    take(int)用一个整数n作为一个参数,从原始的序列中发射前n个元素。
    RxJava系列(二、详解操作符)

  • TakeLast
    takeLast(int)同样用一个整数n作为参数,只不过它发射的是观测序列中后n个元素。

  • TakeUntil
    订阅并开始发射原始Observable,同时监视我们提供的第二个Observable。如果第二个Observable发射了一项数据或者发射了一个终止通知,takeUntil()返回的Observable会停止发射原始Observable并终止。
    RxJava系列(二、详解操作符)

  • Skip
    跳过Observable的前n项
    RxJava系列(二、详解操作符)

  • SkipLast
    skipLast(int)忽略Observable发射的后n项数据

  • ElementAt
    elementAt(int)用来获取元素Observable发射的事件序列中的第n项数据,并当做唯一的数据发射出去。
    RxJava系列(二、详解操作符)

  • Distinct
    distinct()的过滤规则是只允许还没有发射过的数据通过,所有重复的数据项都只会发射一次.
    RxJava系列(二、详解操作符)

  • First
    它是的Observable只发送观测序列中的第一个数据项.

  • Last
    只发射观测序列中的最后一个数据项.

组合操作符

  • Merge
    把两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。

    Observable strignObservable = Observable.fromArray("A", "S", "D", "F", "G", "H", "I")
    .skip(1)
    .take(3);

    Observable intObservable = Observable.fromArray(1, 2, 3, 4, 5, 6, 7, 8)
    .filter(new Predicate<Integer>() {
    @Override
    public boolean test(@NonNull Integer integer) throws Exception {
    return integer > 3;
    }
    }).skipLast(1);
    Observable.merge(intObservable,strignObservable)
    .subscribe(new Consumer<Object>() {
    @Override
    public void accept(@NonNull Object o) throws Exception {
    Log.e("merge---->", o + "");
    }
    });

    运行结果:

    E/merge—->: 4
    E/merge—->: 5
    E/merge—->: 6
    E/merge—->: 7
    E/merge—->: S
    E/merge—->: D

    原理图:
    RxJava系列(二、详解操作符)
    合并后结果是无序的,我并未测出这一点,可能与发送时间间隔有关,未做进一步研究。我们可以看一下Merge方法源码:

public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return fromArray(source1, source2).flatMap((Function)Functions.identity(), false, 2);
}
  • StartWith
    startWith(T)用于在源Observable发射的数据前插入数据。
    startWith(Observable)用于在源Observable发射的数据前插入另一个Observable发射的数据(这些数据会被插入到 源Observable发射数据的前面)。

  • Concat

    concat(Observable<? extends T>, Observable<? extends T>) concat(Observable<? extends Observable<T>>)

    用于将多个obserbavle发射的的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射玩是不会发射后一个Observable的数据的。它和merge、startWitch和相似,不同之处在于:
    1.merge:合并后发射的数据是无序的;
    2.startWitch:只能在源Observable发射的数据前插入数据。

    参考:
    1.https://zhuanlan.zhihu.com/p/21926591
    2.http://blog.csdn.net/lzyzsd/article/details/44094895