Mathematical and Aggregate Operators(算术和聚合操作符)
count
count 函数和 Java 集合中的 size 或者 length 一样。用来统计源 Observable 完成的时候一共发射了多少个数据。
Observable<Integer> values = Observable.range(0, 3);
values.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("values:Complete!");
}
@Override
public void onError(Throwable e) {
log("values:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("values:" + integer);
}
});
values.count()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("count:Complete!");
}
@Override
public void onError(Throwable e) {
log("count:" + e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("count:" + integer);
}
});
结果:
values: 0
values: 1
values: 2
values: Complete!
count: 3
count: Complete!
如果发射数据的个数超过了 int 最大值,则可以使用 countLong 函数。
reudce
你可能从 [MapReduce] 中了解过 reduce。该思想是使用源 Observable 中的所有数据两两组合来生成一个单一的 数据。在大部分重载函数中都需要一个函数用来定义如何组合两个数据变成一个。
public final Observable<T> reduce(Func2<T,T,T> accumulator)
Observable<Integer> values = Observable.range(0,5);
values.reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer+integer2;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Sum:Complete!");
}
@Override
public void onError(Throwable e) {
log("Sum:"+e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Sum:"+integer);
}
});
values.reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return (integer>integer2) ? integer2 : integer;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Min:Complete!");
}
@Override
public void onError(Throwable e) {
log("Min:"+e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Min:"+integer);
}
});
结果:
Sum: 10
Sum: Complete!
Min: 0
Min: Complete!
- Rx 中的 reduce 和并行系统中的 reduce 不一样。
- 在并行系统中的 reduce 是指,计算的取值是不相关的,这样多个机器可以独立并行工作。
- 在 Rx 中是使用从数据流中第一个数据到最后一个数据(从左往右)中的数据来调用 参数 accumulator ,accumulator 用前一次返回的结果和下一个数据来再次调用 accumulator (累加器)。
- 下面这个重载函数更加暴露了这个设计意图。
public final <R> Observable<R> reduce(R initialValue, Func2<R,? super T,R> accumulator)
Observable<String> values = Observable.just("Rx", "is", "easy");
values.reduce(0, new Func2<Integer, String, Integer>() {
@Override
public Integer call(Integer integer, String s) {
return integer+1;
}
})
//实现Last
// values.reduce("", new Func2<String, String, String>() {
// @Override
// public String call(String s, String s2) {
// return s2;
// }
// })
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
log("Count:Complete!");
}
@Override
public void onError(Throwable e) {
log("Count:"+e.getMessage().toString());
}
@Override
public void onNext(Integer integer) {
log("Count:"+integer);
}
});
结果:
Count:3
Count: Complete!
上面示例中的 accumulator 参数为 new Func2<String, String, String>()
,其中call(integer,s)
需要两个参数 integer和s, 当第一个数据从 源 Observable 发射出来的时候,由于 call(integer,s)
还没有调用过,所以使用 初始值 0 来替代 integer,使用第一个字符串“Rx” 来调用 accumulator (也就是new Func2()
),这样 call(integer,s)
返回的值就是 integer+ 1 (而 integer为初始值 0 ,所以返回 1, 可以看到 这个 s参数 为源 Observable 的值在这里是没有用的);这样 源Observable 每次发射一个数据,accumulator 就把上一次的结果加1 返回。和 count 的功能一样。
对于前面只返回一个数据结果的操作函数,大部分都可以通过 reduce 来实现。对于那些 源 Observable 没有完成就返回的操作函数来说,是不能使用 reduce 来实现的。所以 可以用 reduce 来实现 last,但是用 reduce 实现的 all 函数和原来的 all 是不太一样的。
Aggregation to collections(把数据聚合到集合中)
使用 reduce 可以把源Observable 发射的数据放到一个集合中:
Observable<Integer> values = Observable.range(10,5);
values
.reduce(new ArrayList<Integer>(), new Func2<ArrayList<Integer>, Integer, ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call(ArrayList<Integer> integers, Integer integer) {
integers.add(integer);
return integers;
}
})
.subscribe(new Observer<ArrayList<Integer>>() {
@Override
public void onCompleted() {
log("Complete!");
}
@Override
public void onError(Throwable e) {
log( e.getMessage().toString());
}
@Override
public void onNext(ArrayList<Integer> arrayList) {
log(arrayList.toString());
}
});
reduce 的参数初始值为 new ArrayList(), reduce把源Observable 发射的数据添加到这个 List 中。当 源Observable 完成的时候,返回这个 List 对象。
结果:
[10, 11, 12, 13, 14]
Complete!
collect
- 前面提到的,使用 reduce 可以把源Observable 发射的数据放到一个集合中,其实并不太符合 Rx 操作符的原则。
- 操作符有个原则是不能修改其他对象的状态。
- 所以符合原则的实现应该是在每次转换中都创建一个新的 ArrayList 对象。
下面是一个符合原则但是效率很低的实现:.
.reduce(new ArrayList<Integer>(), new Func2<ArrayList<Integer>, Integer, ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call(ArrayList<Integer> integers, Integer integer) {
ArrayList<Integer> newAcc = (ArrayList<Integer>) integers.clone();
newAcc.add(integer);
return integers;
}
})
上面每一个值都创建一个新对象的性能是无法接受的,需要通过文档说明你没有遵守 Rx 的原则使用不可变对象,避免其他人误解。为此,
- Rx 提供了一个 collect 函数来实现该功能,该函数使用了一个可变的 accumulator 。
Observable<Integer> values = Observable.range(10,5);
values
.collect(new Func0<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() {
return new ArrayList<Integer>();
}
}, new Action2<ArrayList<Integer>, Integer>() {
@Override
public void call(ArrayList<Integer> arrayList, Integer integer) {
arrayList.add(integer);
}
})
.subscribe(new Action1<ArrayList<Integer>>() {
@Override
public void call(ArrayList<Integer> integers) {
log(integers.toString());
}
});
结果:
[10, 11, 12, 13, 14]
toList
Observable<Integer> values = Observable.range(10,5);
values
.toList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
log(integers.toString());
}
});.
结果:
[10, 11, 12, 13, 14]
toSortedList
toSortedList 和前面类似,返回一个排序后的 list,下面是该函数的定义:
public final Observable<java.util.List<T>> toSortedList()
public final Observable<java.util.List<T>> toSortedList(
Func2<? super T,? super T,java.lang.Integer> sortFunction)
可以使用默认的比较方式来比较对象,也可以提供一个比较参数。该比较参数和 Comparator 接口语义一致。
下面通过一个自定义的比较参数来返回一个倒序排列的整数集合:
Observable<Integer> values = Observable.range(10,5);
values
.toSortedList(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer2 - integer;
}
})
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
log(integers.toString());
}
});
结果:
[14, 13, 12, 11, 10]
toMap
toMap 把数据流 T 变为一个 Map<TKey,T>。 该函数有三个重载形式:
public final <K> Observable<java.util.Map<K,T>> toMap(
Func1<? super T,? extends K> keySelector)
public final <K,V> Observable<java.util.Map<K,V>> toMap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector)
public final <K,V> Observable<java.util.Map<K,V>> toMap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,V>> mapFactory)
- keySelector 功能是从一个值 T 中获取他对应的 key。
- valueSelector 功能是从一个值 T 中获取需要保存 map 中的值。
- mapFactory 功能是创建该 map 对象。
来看看一个示例:
有这么一个 Person 对象:
class Person {
public final String name;
public final Integer age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
下面的代码使用 Person 的 name 作为 key, Person 作为 map 的value:
Observable<Person> values = Observable.just(
new Person("Will", 25),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMap(new Func1<Person, String>() {
@Override
public String call(Person person) {
return person.name;
}
})
.subscribe(new Action1<Map>() {
@Override
public void call(Map mMap) {
log(mMap.toString());
}
});.
结果:
Saul=Person@5280919c, Will=$Person@5280914c, Nick=$Person@52809174}
还可以用 Person 的 age 作为map 的value:
values
.toMap(new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.name;
}
}, new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.age;
}
})
结果:
{Saul=35, Will=25, Nick=40}
还可以自定义如何生成这个 map 对象:
values
.toMap(new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.name;
}
}, new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.age;
}
}, new Func0<Map<Object, Object>>() {
@Override
public Map<Object, Object> call() {
return new HashMap();
}
})
最后一个参数为工厂函数,每次一个新的 Subscriber 订阅的时候, 都会返回一个新的 map 对象。
toMultimap
通常情况下多个 value 的 key 可能是一样的。 一个 key 可以映射多个 value 的数据结构为
multimap,multimap 的 value 为一个集合。该过程被称之为 “grouping” (分组)。
public final <K> Observable<java.util.Map<K,java.util.Collection<T>>> toMultimap(
Func1<? super T,? extends K> keySelector)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,java.util.Collection<V>>> mapFactory)
public final <K,V> Observable<java.util.Map<K,java.util.Collection<V>>> toMultimap(
Func1<? super T,? extends K> keySelector,
Func1<? super T,? extends V> valueSelector,
Func0<? extends java.util.Map<K,java.util.Collection<V>>> mapFactory,
Func1<? super K,? extends java.util.Collection<V>> collectionFactory)
下面是通过 age 来分组 Person 的实现:
Observable<Person> values = Observable.just(
new Person("Will", 35),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMultimap(
new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.age;
}
}, new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.name;
}
})
.subscribe(new Action1<Map>() {
@Override
public void call(Map mMap) {
log(mMap.toString());
}
});
结果:
{35=[Will, Saul], 40=[Nick]}
toMultimap 的参数和 toMap 类似,最后一个 collectionFactory 参数是用来创建 value 的集合对象的,collectionFactory 使用 key 作为参数,这样你可以根据 key 来做不同的处理。下面示例代码中没有使用这个 key 参数:
Observable<Person> values = Observable.just(
new Person("Will", 35),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMultimap(
new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.age;
}
}, new Func1<Person, Object>() {
@Override
public Object call(Person person) {
return person.name;
}, new Func0<Map<Object, Collection<Object>>>() {
@Override
public Map<Object, Collection<Object>> call() {
return new HashMap();
}
}, new Func1<Object, Collection<Object>>() {
@Override
public Collection<Object> call(Object o) {
return new ArrayList();
}
}) // 没有使用这个 key 参数
注意事项
这些操作函数都有非常有限的用法。这些函数只是用来给初学者把数据收集到集合中使用的,并且内部使用传统的方式来处理数据。这些方式不应该在实际项目中实现,因为他们和使用Rx 的理念并不相符。
Concatenation一个数据流发射完后继续发射下一个数据流是一种很常见的组合方法。
concat
concat 操作函数把多个数据流按照顺序一个一个的发射数据。第一个数据流发射完后,继续发射下一个。 concat 函数有多个重载函数:
public static final <T> Observable<T> concat(
Observable<? extends Observable<? extends T>> observables)
public static final <T> Observable<T> concat(
Observable<? extends T> t1,
Observable<? extends T> t2)
public static final <T> Observable<T> concat(Observable<? extends T> t1,
Observable<? extends T> t2,
Observable<? extends T> t3)
public static final <T> Observable<T> concat(Observable<? extends T> t1,
Observable<? extends T> t2,
Observable<? extends T> t3,
Observable<? extends T> t4)
// All the way to 10 observables
示例:
Observable<Integer> seq1 = Observable.range(0, 3);
Observable<Integer> seq2 = Observable.range(10, 3);
Observable.concat(seq1, seq2)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
0
1
2
10
11
12
如果需要组合的数据流是动态的,则依然可以使用 concat 来组合返回多个 Observable 的情况。
下面的示例中,使用groupBy 来把一个 Observable 中的数据流分组为多个 Observable ,这样 groupBy 返回的是多个
Observable, 然后使用 concat 把这些 动态生成的 Observable 给组合起来:
Observable<String> words = Observable.just(
"First",
"Second",
"Third",
"Fourth",
"Fifth",
"Sixth"
);
Observable
.concat(words.groupBy(new Func1<String, Object>() {
@Override
public Object call(String s) {
return s.charAt(0);
}
}))
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
First
Fourth
Fifth
Second
Sixth
Third
concat 的行为有点像 concatMap 操作函数的扁平处理(flattening phase)。事实上, concatMap 等价于 先应用 map 操作函数然后再使用 concat。
concatWith
concatWith 函数是 concat 的另外一种使用方式,可以通过串联的方法来一个一个的组合数据流。
Observable<Integer> seq1 = Observable.range(0, 3);
Observable<Integer> seq2 = Observable.range(10, 3);
Observable<Integer> seq3 = Observable.just(20);
seq1.concatWith(seq2)
.concatWith(seq3)
.subscribe(System.out::println);
}
结果:
0
1
2
10
11
12
20
amb
amb 的参数为多个 Observable,使用第一个先发射数据的 Observable ,其他的 Observable 被丢弃。
public static final <T> Observable<T> amb(
java.lang.Iterable<? extends Observable<? extends T>> sources)
public static final <T> Observable<T> amb(
Observable<? extends T> o1,
Observable<? extends T> o2)
public static final <T> Observable<T> amb(
Observable<? extends T> o1,
Observable<? extends T> o2,
Observable<? extends T> o3)
Observable.amb(
Observable.timer(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "First";
}
}),
Observable.timer(50, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "Second";
}
})
)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
Second
- 由于第二个 Observable 先开始发射数据,所以第一个 Observable 被丢弃了, 使用 第二个 Observable。
- 该操作函数可以用于如下情况:
你有多个廉价的资源提供方,但是这些资源提供方返回数据的时间是不一样的。例如一个天气预报应用,可以从多个数据源获取数据,当其中一个数据源返回数据的时候,就丢弃其的请求,而使用这个数据源。
ambWith
- 同样,还有一个 ambWith 版本的函数,可以通过链式调用每个 Observable。让代码看起来更优雅一些:
servable.timer(100, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "First";
}
})
.ambWith(Observable.timer(50, TimeUnit.MILLISECONDS).map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "Second";
}
}))
.ambWith(Observable.timer(70, TimeUnit.MILLISECONDS).map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
return "Third";
}
}))
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
log(o.toString());
}
});
结果:
Second
Sum
计算并发射数据序列的和
Average
计算Observable发射的数据序列的平均值,然后发射这个结果
Max
计算并发射数据序列的最大值
Min
计算并发射数据序列的最小值
项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第二部分:事件流基础之 聚合 - 云在千峰
Android RxJava使用介绍(三) RxJava的操作符 - 呼啸而过的专栏 - 博客频道 - CSDN.NET