为什么需要Steam
Java 8 中的Steam是对集合 (Collection) 对象功能的增强, 他专注于对集合对象进行各种非常便利,高效的聚合操作(aggregate operation), 或者大批量数据操作 (bulk data operation).Steam API借助于同样新出现的Lambda 表达式, 极大的提高编程效率和程序可读性. 同时他提供穿行和并行两种模式进行汇聚操作, 并发模式能够成分利用多核处理器的优势, 使用fork/join 并行法师来拆分任务和加速处理过程.通常编写并行代码很难而且容易出错, 但使用Steam API无需编写一行多线程的代码, 就可以很方便地写出高性能的并发代码.Java 8中首次出现的java.util.stream是一个函数式语言+多核时代综合影响的产物.
什么是聚合操作
TODO
(需求: 如果发现type为grocery的所有交易, 然后返回以交易值降序排序的交易ID集合)
public class Transaction {
private final int id;
private final Integer value;
private final Type type;
public Transaction(int id, Integer value, Type type) {
this.id = id;
this.value = value;
this.type = type;
}
public enum Type {
A, B, C, D, GEOCERY
}
public int getId() {return id;}
public Integer getValue() {return value;}
public Type getType() {return type;}
}
清单 1. Java 7的排序,取值实现
public static void main(String[] args) { List<Transaction> transactions = new ArrayList<>(); transactions.add(new Transaction(1, 10, Transaction.Type.GEOCERY)); transactions.add(new Transaction(3, 30, Transaction.Type.GEOCERY)); transactions.add(new Transaction(6, 60, Transaction.Type.GEOCERY)); transactions.add(new Transaction(5, 50, Transaction.Type.GEOCERY)); transactions.add(new Transaction(2, 20, Transaction.Type.A)); transactions.add(new Transaction(4, 40, Transaction.Type.C)); // JDK 7 发现type为grocery的所有交易 List<Transaction> groceryTransactions = new ArrayList<>(); for (Transaction t : transactions) { if (t.getType() == Transaction.Type.GEOCERY) { groceryTransactions.add(t); } } // 集合排序 交易值降序排序 Collections.sort(groceryTransactions, new Comparator<Transaction>() { @Override public int compare(Transaction o1, Transaction o2) { return o2.getValue().compareTo(o1.getValue()); } }); // 交易ID 获取 List<Integer> transactionIds = new ArrayList<>(); for (Transaction t : groceryTransactions) { transactionIds.add(t.getId()); } System.out.println(transactionIds);//[6, 5, 3, 1] }
清单 2. Java 8的排序,取值实现
// JDK 8 如果发现type为grocery的所有交易, 然后返回以交易值降序排序的交易ID集合 List<Integer> transactionsIds = transactions.parallelStream().filter(t -> t.getType() == Transaction.Type.GEOCERY) .sorted(Comparator.comparing(Transaction::getValue).reversed()) .map(Transaction::getId) .collect(Collectors.toList()); System.out.println(transactionsIds);//[6, 5, 3, 1]
Steam 总览
流的操作类型分为两种:
Intermediate: 一个流可以后面跟随零个或者多个intermediate操作, 其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是Lazy的,也就是说仅仅调用这类方法,并没有真正开始流的遍历.
Terminal: 一个流只能有一个terminal操作, 当这个操作执行后,流就被使用“光”了, 无法在被操作。所以这必定是流的最后一个操作。Terminal操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个side effect.
清单 3. 一个流的操作示例
// JDK 8 public class Widget { private final Color color; private final int weight; enum Color {RED, BLACK, BLUE} public Widget(Color color, int weight) { this.color = color; this.weight = weight; } public Color getColor() {return color;} public int getWeight() {return weight;} public static void main(String[] args) { List<Widget> widgets = new ArrayList<>(); widgets.add(new Widget(Color.RED, 1)); widgets.add(new Widget(Color.RED, 2)); widgets.add(new Widget(Color.BLACK, 3)); widgets.add(new Widget(Color.BLUE, 4)); // stream() 获取当前的source, filter 和 mapToInt为intermediate操作, 进行数据筛选和转换, // 最后一个sum为terminal操作,对符合条件的全部widget做重量求和 int sum = widgets.stream() .filter(w -> w.getColor() == Color.RED) .mapToInt(w -> w.getWeight()) .sum(); System.out.println(sum);// 3 }}
清单 4. 构造流的几种常见方法
// JDK 8 public class SteamConstruct { public static void main(String[] args) { // 1. Individual values 单独值 Stream stream = Stream.of("a1", "b1", "c1"); stream.forEach(System.out::print);//打印 a1b1c1 // 2. Arrays 数组 String[] strArray = new String[] {"a2", "b2", "c2"}; stream = Stream.of(strArray); stream = Arrays.stream(strArray); System.out.println(stream.collect(Collectors.joining(",")).toString());//打印 a2,b2,c2 // 3. Collections 集合 List<String> list = Arrays.asList(strArray); stream = list.stream(); }}
清单 5. 数值流的构造(对于基本数值型,目前有三种对应的包装类型Stream: 1. IntStream 2. LongStream 3. DoubleStream )
// JDK 8 public class BasicStream { // IntStream, LongStream, DoubleStream. 当然我们也可以用Stream<Integer>, Stream<Long>, Stream<Double>, // 但是boxing 和 unboxing会很耗时, 所以特别为这三个基本数值型提供了对应的Stream public static void main(String[] args) { IntStream.of(new int[] {1, 2, 3}).forEach(System.out::print);// 123 IntStream.range(1, 3).forEach(System.out::print);// [1,3) 12 IntStream.rangeClosed(1, 3).forEach(System.out::print);// [1,3] 123 }}
清单 6. 流转换为其他数据结构 (一个Stream只可以使用一次,否则会报错)
public class StreamExchange { public static void main(String[] args) { Stream stream = Stream.of("a1", "b1", "c1"); // 1. Array String[] strArray1 = (String[]) stream.toArray(String[]::new); for (String s : strArray1) { System.out.print(s); } //a1b1c1 // 2.Collection list stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed List<String> list1 = (List<String>) stream.collect(Collectors.toList()); for (String s : list1) { System.out.print(s); }//a1b1c1 // 2.Collection list stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed List<String> list2 = (List<String>) stream.collect(Collectors.toCollection(ArrayList::new)); for (String s : list2) { System.out.print(s); } //a1b1c1 // 2.Collection set stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed Set<String> set = (Set<String>) stream.collect(Collectors.toSet()); for (String s : set) { System.out.print(s); } //a1c1b1 // 2.Collection stack stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed Stack<String> stack = (Stack<String>) stream.collect(Collectors.toCollection(Stack::new)); for (String s : stack) { System.out.print(s); } //a1b1c1 // 3. String stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed String str = stream.collect(Collectors.joining()).toString(); System.out.print(str); // a1b1c1 }}
流的操作
- Intermediate
map(mapToInt,flatMap等), filter, distinct, sorted, peek, limit, skip, parallel, sequential, unordered - Terminal
forEach, forEachOrdered, toArray, reduce, collect, min, max, count, anyMatch, allMatch, noneMatch, findFirst, findAny, iterator - Short-cricuiting
anyMatch, allMatch, noneMatch, findFirst, findAny, limit
Map/flatMap
清单 7. 转换大写 【.map(String::toUpperCase)】和【map(s -> { return s.toUpperCase(); })】和 【.map(s -> s.toUpperCase())】
public class ToUpperCase { public static void main(String[] args) { Stream<String> stream = Stream.of("hello", "world", "java8", "stream"); List<String> wordList = stream.map(String::toUpperCase).collect(Collectors.toList()); System.out.println(wordList.toString());// [HELLO, WORLD, JAVA8, STREAM] stream = Stream.of("hello", "world", "java8", "stream"); wordList = stream.map(s -> { return s.toUpperCase(); }).collect(Collectors.toList()); System.out.println(wordList.toString());// [HELLO, WORLD, JAVA8, STREAM] stream = Stream.of("hello", "world", "java8", "stream"); wordList = stream.map(s -> s.toUpperCase()).collect(Collectors.toList()); System.out.println(wordList.toString());// [HELLO, WORLD, JAVA8, STREAM] }}
清单 8. 平方数 (map 生产的是个1:1的映射,每个输入元素,都按照规则转换成另一个元素)
public class ToSquare { public static void main(String[] args) { Stream<Integer> stream = Arrays.asList(1, 2, 3, 4).stream(); List<Integer> squareList = stream.map(n -> n * n).collect(Collectors.toList()); System.out.println(squareList.toString());// [1, 4, 9, 16] }}
清单 9. 一对多 (flatMap把input stream中的层级结构扁平化,就是将底层元素抽出来放到一起,最终output的Stream里面已经没有List了,都是直接的数字)
public class ManyToOne { public static void main(String[] args) { Stream<List<Integer>> inputStream = Stream.of(Arrays.asList(1), Arrays.asList(2, 3), Arrays.asList(4, 5, 6)); Stream<Integer> outputStream = inputStream.flatMap(childList -> childList.stream()); System.out.print(outputStream.collect(Collectors.toList()).toString());// [1, 2, 3, 4, 5, 6] }}
Filter
清单 10. 留下偶数
public class KeepEvenNumber { public static void main(String[] args) { Integer[] sixNums = {1, 2, 3, 4, 5, 6}; Integer[] evens = Stream.of(sixNums).filter(n -> n % 2 == 0).toArray(Integer[]::new); System.out.println(Arrays.toString(evens));// [2, 4, 6] }}
清单 11. 把单词挑出来 (首先把每行的单词用flatMap整理到新的Stream, 然后保留长度不为0的,就是正品文章中的全部单词了)
public class PickAllWords { public static void main(String[] args) { Path path = Paths.get(System.getProperty("user.dir") + "/src/main/java/com/wdxxl/jdk8/ibm/stream/PickAllWords.java"); // 1. Java 8 Read File + Stream try (Stream<String> stream = Files.lines(path)) { List<String> output = stream.flatMap(line -> Stream.of(line.split(" "))) .filter(word -> word.length() > 0).collect(Collectors.toList()); System.out.println(output); } catch (IOException e) { e.printStackTrace(); } // 2. BufferedReader + Stream try (BufferedReader br = Files.newBufferedReader(path)) { List<String> output = br.lines().flatMap(line -> Stream.of(line.split(" "))) .filter(word -> word.length() > 0).collect(Collectors.toList()); System.out.println(output); } catch (IOException e) { e.printStackTrace(); } }}
ForEach
清单 12. 打印姓名 (forEach 和pre-java8的对比) 【forEach 不能修改自己包含的本地变量值,也不能用break/return 之类的关键字提前结束循环】
public class TestForEach { public static void main(String[] args) { List<Person> roster = new ArrayList<>(); roster.add(new Person(Person.Sex.FEMALE, "Lisa")); roster.add(new Person(Person.Sex.MALE, "King")); roster.add(new Person(Person.Sex.MALE, "Jake")); // JDK 8 roster.stream().filter(p -> p.gender == Person.Sex.MALE) .forEach(p -> System.out.println(p.name)); // JDK 7 for (Person p : roster) { if(p.gender == Person.Sex.MALE){ System.out.println(p.name); } } }}class Person { Sex gender; String name; public enum Sex { MALE, FEMALE } public Person(Sex gender, String name) { this.gender = gender; this.name = name; }}
清单 13. peek 对每个元素执行操作并且返回一个新的Stream 【peek : 偷窥】注意执行顺序
public class Peek { public static void main(String[] args) { Stream.of("one", "two", "three", "four") .filter(p -> p.length() > 3) .peek(v -> System.out.println("Filtered Value:" + v)) .map(String::toUpperCase) .peek(v -> System.out.println("Mapped Value:" + v)) .collect(Collectors.toList()); // 1. Filtered Value:three // 2. Mapped Value:THREE // 3. Filtered Value:four // 4. Mapped Value:FOUR }}
清单 14. Optional的两个用例 【使用Optional代码的可读性好,而且它提供的是编译时检查,能极大的降低NPE对程序的影响】
public class OptionalTest { public static void main(String[] args) { String strA = " abcd", strB = null; print(strA); print(" "); print(strB); System.out.println(getLength(strA)); System.out.println(getLength(" ")); System.out.println(getLength(strB)); } public static void print(String text) { // JDK 8 Optional.ofNullable(text).ifPresent(System.out::println); // Pre-JDK 8 if (text != null) { System.out.println(text); } } public static int getLength(String text) { // JDK 8 return Optional.ofNullable(text).map(String::length).orElse(-1); // Pre-JDK 8 // return (text != null) ? text.length() : -1; }}
reduce
清单 15. reduce的用例
public class ReduceTest { public static void main(String[] args) { // 1. 求和 SUM 10 Integer sum = Stream.of(1, 2, 3, 4).reduce(0, (a, b) -> a + b); sum = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum); //有起始值 sum = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get(); //无起始值 // 2. 最小值 minValue = -3.0 double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min); minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double::min).get(); // 2. 最大数值 maxValue = 1.0 double maxValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MIN_VALUE, Double::max); maxValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double::max).get(); // 3. 字符串连接 Concat "ABCD" String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat); // 4. 过滤和字符串连接 Filter & Concat = "ace" concat = Stream.of("a", "B", "c", "D", "e", "F") .filter(x -> x.compareTo("Z") > 0) .reduce("", String::concat); }}
limit/skip (limit 返回Stream的前面n个元素; skip 则是扔掉前n个元素; 它是由一个叫subStream的方法改名而来.)
清单 16. limit和skip对运行次数的影响
public class LimitSkipTest { public static void main(String[] args) { List<LimitSkipTest.User> users = new ArrayList<>(); LimitSkipTest limitSkipTest = new LimitSkipTest(); for (int i = 0; i < 100; i++) { users.add(limitSkipTest.new User(i, "name_" + i)); // 内部类构造 } List<String> userList = users.stream() .map(User::getName) // name_0name_1name_2name_3name_4name_5name_6name_7name_8name_9 .limit(10) .skip(3) .collect(Collectors.toList()); System.out.println(userList);// [name_3, name_4, name_5, name_6, name_7, name_8, name_9] } // 内部类 class User { public int no; private final String name; public User(int no, String name) { this.no = no; this.name = name; } public String getName() { System.out.print(name); return name; } }}
清单 17. limit和skip对sorted后的运行次数无影响
public class LimitSkipTest2 { public static void main(String[] args) { List<LimitSkipTest2.User> users = new ArrayList<>(); LimitSkipTest2 limitSkipTest2 = new LimitSkipTest2(); for (int i = 0; i < 5; i++) { users.add(limitSkipTest2.new User(i, "name_" + i)); } // 对users做了13次微调,首先对5个元素的Stream排序,然后进行limit操作 List<String> userList = users.stream() .sorted((p1, p2) -> p1.getName().compareTo(p2.getName())) .map(User::getName) // name_1,name_0,name_2,name_1,name_3,name_2,name_4,name_3,name_0,name_1, .limit(2) .collect(Collectors.toList()); System.out.println(userList);// [name_0, name_1] } // 内部类 class User { public int no; private final String name; public User(int no, String name) { this.no = no; this.name = name; } public String getName() { System.out.print(name); return name; } }}
sorted
清单 18. 排序前进行limit和skip (这种优化是有business logic上的局限性的: 既不需要排序后再取值)
List<String> userList = users.stream() .limit(2) .sorted((p1, p2) -> p1.getName().compareTo(p2.getName())) .map(User::getName) // name_1,name_0,name_0,name_1, .collect(Collectors.toList());System.out.println(userList);// [name_0, name_1]
min/max/distinct 【min和max的功能也可以通过对Stream元素先排序,再findFirst来实现,但前者的性能会更好,为O(n),而sorted的成本是O(n log n)】
清单 19. 找出最长一行的长度
public class FindLongestLine { public static void main(String[] args) { Path path = Paths.get(System.getProperty("user.dir") + "/src/main/java/com/wdxxl/jdk8/ibm/stream/FindLongestLine.java"); // 2. BufferedReader + Stream try (BufferedReader br = Files.newBufferedReader(path)) { int output = br.lines() .mapToInt(String::length) .max() .getAsInt(); System.out.println(output);// 83 } catch (IOException e) { e.printStackTrace(); } }}
清单 20. 找出全文的单词,转小写,并且排序
public class OperateWords { public static void main(String[] args) { Path path = Paths.get(System.getProperty("user.dir") + "/src/main/java/com/wdxxl/jdk8/ibm/stream/OperateWords.java"); // 2. BufferedReader + Stream try (BufferedReader br = Files.newBufferedReader(path)) { List<String> output = br.lines() .flatMap(line -> Stream.of(line.split(" "))) .map(String::toLowerCase) .distinct() .sorted() .collect(Collectors.toList()); System.out.println(output); } catch (IOException e) { e.printStackTrace(); } }}
Match
- allMatch: Stream 中全部元素符合传入的predicate,返回true
- anyMatch: Stream 中只要有一个元素符合传入的predicate,返回true
- noneMatch: Stream 中没有一个元素符合传入的predicate,返回true
清单 21. 使用Match
public class MatchTest { public static void main(String[] args) { List<MatchTest.User> users = new ArrayList<>(); MatchTest matchTest = new MatchTest(); for (int i = 0; i < 5; i++) { users.add(matchTest.new User(i, "name_" + i, i * 5)); } boolean isAllAdult = users.stream().allMatch(p -> { System.out.println(p.age); // 0 和 private final int age; ?? return p.age > 18; }); System.out.println("All are adult? " + isAllAdult); // All are adult? false boolean isAnyChild = users.stream().anyMatch(p -> p.age < 12); System.out.println("Any Child? " + isAnyChild); // Any Child? true boolean noneOldThan19 = users.stream().noneMatch(p -> p.age > 19); System.out.println("none Old Than 19? " + noneOldThan19);// none Old Than 19? false boolean noneOldThan50 = users.stream().noneMatch(p -> p.age > 50); System.out.println("none Old Than 50? " + noneOldThan50);// none Old Than 50? true } class User { public int no; public String name; private final int age; public User(int no, String name, int age) { this.no = no; this.name = name; this.age = age; } }}
进阶: 自己生成流
Stream.generate
通过实现Supplier借口,你可以自己来控制流的生成。这种情形通常用于随机数,常量的Stream,或者需要前后元素建维持着某种状态信息的Stream。把Supplier示例传递给Stream.generate() 生成的Stream,默认是串行 (相对parallel而言)但无序的(相对于ordered而言)。由于它是无限的,在管道中,必须利用limit之类的操作限制Stream大小。
清单 22. 生产10个随机整数
public class RandomTest { public static void main(String[] args) { Random seed = new Random(); Supplier<Integer> random = seed::nextInt; Stream.generate(random) .limit(10) .forEach(System.out::println); // Another way IntStream.generate(() -> (int) (System.nanoTime() % 100)) .limit(10) .forEach(System.out::println); }}
清单 23. 自实现Supplier 【Stream.generate 还接受自己实现的Supplier。 例如在构造海量测试数据的时候,用某种自动的规则给每一个变量赋值,或者依据公式计算Stream的每个元素之。这些都是维持状态信息的情形】
public class SupplierTest { public static void main(String[] args) { SupplierTest supplierTest = new SupplierTest(); Stream.generate(supplierTest.new UserSupplier()).limit(10) .forEach(p -> System.out.println(p.name + ":" + p.age)); } class UserSupplier implements Supplier<User> { private int index = 0; private final Random random = new Random(); @Override public User get() { return new User(index++, "name_" + index, random.nextInt(100)); } } class User { public int no; private final String name; private final int age; public User(int no, String name, int age) { this.no = no; this.name = name; this.age = age; } }}
清单 24. 生产一个等差数列
public class Sequence { public static void main(String[] args) { Stream.iterate(0, n -> n + 3) .limit(10).forEach(x -> System.out.print(x + " "));// 0 3 6 9 12 15 18 21 24 27 Stream.iterate(4, n -> n + 3) .limit(10).forEach(x -> System.out.print(x + " "));// 4 7 10 13 16 19 22 25 28 31 }}
进阶: 用Collectors来进行reduction操作
grouping/partitioningBy
清单 25. groupingBy 按照年龄归组
public class AdultGroup { public static void main(String[] args) { AdultGroup adultGroup = new AdultGroup(); Map<Integer, List<User>> children = Stream.generate(adultGroup.new UserSupplier()) .limit(100) .collect(Collectors.groupingByConcurrent(User::getAge)); Iterator it = children.entrySet().iterator(); while (it.hasNext()) { Map.Entry<Integer, List<User>> users = (Map.Entry) it.next(); System.out.println("Age: " + users.getKey() + "=" + users.getValue().size()); } } class UserSupplier implements Supplier<User> { private int index = 0; private final Random random = new Random(); @Override public User get() { return new User(index++, "name_" + index, random.nextInt(100)); } } class User { public int no; public String name; public int age; public User(int no, String name, int age) { this.no = no; this.name = name; this.age = age; } public int getAge() { return age; } }}
清单 26. partitioningBy 按照未成年人和成年人归组
在使用条件“年龄小于18”进行分组后可以看到,不到18岁的未成年人是一组,成年人是另外一组。
public class AdultPartition { public static void main(String[] args) { AdultPartition adultPartition = new AdultPartition(); Map<Boolean, List<User>> children = Stream.generate(adultPartition.new UserSupplier()) .limit(100) .collect(Collectors.partitioningBy(p -> p.age > 18)); System.out.println("Children number:" + children.get(false).size()); System.out.println("Adult number:" + children.get(true).size()); } class UserSupplier implements Supplier<User> { private int index = 0; private final Random random = new Random(); @Override public User get() { return new User(index++, "name_" + index, random.nextInt(100)); } } class User { public int no; public String name; public int age; public User(int no, String name, int age) { this.no = no; this.name = name; this.age = age; } }}
结束语
总之,Stream 的特性可以归纳为:
- 不是数据结构
- 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。
- 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。
- 所有 Stream 的操作必须以 lambda 表达式为参数
- 不支持索引访问
- 你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。不过请参阅下一项。
- 很容易生成数组或者 List
- 惰性化
- 很多 Stream 操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。
- Intermediate 操作永远是惰性化的。
- 并行能力
- 当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。
- 可以是无限的
- 集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。
原文: IBM: Java 8 中的 Streams API 详解