JDK8----集合之流式(Streams)操作

时间:2022-09-03 07:51:29

JDK8 —- 集合之流式(Streams)操作

为什么需要 Stream

  Stream 作为 Java 8 的一大亮点,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。它也不同于 StAX 对 XML 解析的 Stream,也不是 Amazon Kinesis 对大数据实时处理的 Stream。Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。Stream API 借助于同样新出现的 Lambda表达式,极大的提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。通常编写并行代码很难而且容易出错, 但使用 Stream API 无需编写一行多线程的代码,就可以很方便地写出高性能的并发程序。所以说,Java 8 中首次出现的 java.util.stream 是一个函数式语言+多核时代综合影响的产物

什么是聚合操作
  在传统的 J2EE 应用中,Java 代码经常不得不依赖于关系型数据库的聚合操作来完成诸如:
  * 客户每月平均消费金额
  * 最昂贵的在售商品
  * 本周完成的有效订单(排除了无效的)
  * 取十个数据样本作为首页推荐
这类的操作。
  但在当今这个数据大爆炸的时代,在数据来源多样化、数据海量化的今天,很多时候不得不脱离 RDBMS,或者以底层返回的数据为基础进行更上层的数据统计。而 Java 的集合 API 中,仅仅有极少量的辅助型方法,更多的时候是程序员需要用 Iterator 来遍历集合,完成相关的聚合应用逻辑。这是一种远不够高效、笨拙的方法。在 Java 7 中,如果要发现 type 为 grocery 的所有交易,然后返回以交易值降序排序好的交易 ID 集合,我们需要这样写:

        List<Transaction> groceryTransactions = new ArrayList<>();
for (Transaction t : transactions) {
if (t.getType() == Transaction.GROCERY) {
groceryTransactions.add(t);
}
}
Collections.sort(groceryTransactions, new Comparator() {
public int compare(Transaction t1, Transaction t2) {
return t2.getValue().compareTo(t1.getValue());
}
});

List<Integer> transactionIds = new ArrayList<>();
for (Transaction t : groceryTransactions) {
transactionIds.add(t.getId());
}

  而在 Java 8 使用 Stream,代码更加简洁易读;而且使用并发模式,程序执行速度更快。

        List<Integer> transactionsIds = transactions.parallelStream().
filter(t -> t.getType() == Transaction.GROCERY).
sorted(comparing(Transaction::getValue).reversed()).
map(Transaction::getId).
collect(toList());

集合之流式操作
  Java 8 引入了流式操作(Stream),通过该操作可以实现对集合(Collection)的并行处理和函数式操作。根据操作返回的结果不同,流式操作分为中间操作和最终操作两种。最终操作返回一特定类型的结果,而中间操作返回流本身,这样就可以将多个操作依次串联起来。根据流的并发性,流又可以分为串行和并行两种。流式操作实现了集合的过滤、排序、映射等功能。
Stream 和 Collection 集合的区别:Collection 是一种静态的内存数据结构,而 Stream 是有关计算的。前者是主要面向内存,存储在内存中,后者主要是面向 CPU,通过 CPU 实现计算。

串行和并行的流
  流有串行和并行两种,串行流上的操作是在一个线程中依次完成,而并行流则是在多个线程上同时执行。并行与串行的流可以相互切换:通过 stream.sequential() 返回串行的流,通过 stream.parallel() 返回并行的流。相比较串行的流,并行的流可以很大程度上提高程序的执行效率。

Stream 总览

什么是流
  Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。
  Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。
  而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:
  1.0-1.4 中的 java.lang.Thread
  5.0 中的 java.util.concurrent
  6.0 中的 Phasers 等
  7.0 中的 Fork/Join 框架
  8.0 中的 Lambda
Stream 的另外一大特点是,数据源本身可以是无限的

流的构成
  当我们使用一个流的时候,通常包括三个基本步骤:
  获取一个数据源(source)→ 数据转换→执行操作获取想要的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道,如下图所示。
图 1. 流管道 (Stream Pipeline) 的构成

流的操作类型分为两种:
  Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
  Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
  
  在对于一个 Stream 进行多次转换操作 (Intermediate 操作),每次都对 Stream 的每个元素进行转换,而且是执行多次,这样时间复杂度就是 N(转换次数)个 for 循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream 里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在 Terminal 操作的时候循环 Stream 对应的集合,然后对每个元素执行所有的函数。

还有一种操作被称为 short-circuiting。用以指:
  对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个有限的新 Stream。
  对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。
当操作一个无限大的 Stream,而又希望在有限时间内完成操作,则在管道内拥有一个 short-circuiting 操作是必要非充分条件。

流的使用详解
  简单说,对 Stream 的使用就是实现一个 filter-map-reduce 过程,产生一个最终结果,或者导致一个副作用(side effect)。
  
流的构造与转换
  下面提供最常见的几种构造 Stream 的样例。

        // 1. Individual values
Stream stream = Stream.of("a", "b", "c");

// 2. Arrays
String[] strArray = new String[]{"a", "b", "c"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);

// 3. Collections
List<String> list = Arrays.asList(strArray);
stream = list.stream();

需要注意的是,对于基本数值型,目前有三种对应的包装类型 Stream:
  IntStream、LongStream、DoubleStream。当然我们也可以用 Stream、Stream >、Stream,但是 boxing 和 unboxing 会很耗时,所以特别为这三种基本数值型提供了对应的 Stream。
  Java 8 中还没有提供其它数值型 Stream,因为这将导致扩增的内容较多。而常规的数值型聚合运算可以通过上面三种 Stream 进行。