Flink - 算子链合并方法时间:2022-12-01 01:06:07- **方法如下:** 相邻的算子:当两个算子直接连接在一起,没有其他算子或键控操作符将它们分离时,它们就会被自动合并为一个算子链。 相同的操作类型:当两个算子拥有相同的操作类型时,它们就可以被合并为一个算子链。 算子的并行度相同:当相邻的两个算子拥有相同的并行度时,它们可以被合并为一个算子链。 用户手动合并:如果用户通过调用 DataStream.transform() 方法来显式地定义算子链,那么这些算子也会被合并为一个算子链。 - **相邻的算子** ```java public class WordCountExample { public static void main(String[] args) throws Exception { // 获取运行参数 final ParameterTool params = ParameterTool.fromArgs(args); // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取输入数据 DataStream text = env.readTextFile(params.get("input")); // 将每行字符串拆分为单词 DataStream words = text.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { for (String word : value.split("\\s")) { out.collect(word); } } }); // 计算单词出现的次数 DataStream> counts = words .map(new MapFunction>() { @Override public Tuple2 map(String value) throws Exception { return new Tuple2(value, 1); } }) .keyBy(0) .sum(1); // 输出结果 if (params.has("output")) { counts.writeAsText(params.get("output")); } else { counts.print(); } // 启动执行程序 env.execute("Word Count"); } } ``` flatMap 和 map 算子被连接起来,并形成了一个算子链,这是由于 flatMap 直接连接到 map 上,它们没有任何其他算子或键控操作符将它们分离。 - **相同的操作类型** ```java public class OperatorChainingExample { public static void main(String[] args) throws Exception { // 获取运行参数 final ParameterTool params = ParameterTool.fromArgs(args); // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取输入数据 DataStream text = env.readTextFile(params.get("input")); // 转换为大写字母 DataStream upperCaseStream = text .map(new MapFunction() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } }) .map(new MapFunction() { @Override public String map(String value) throws Exception { return value + "!"; } }); // 输出结果 if (params.has("output")) { upperCaseStream.writeAsText(params.get("output")); } else { upperCaseStream.print(); } // 启动执行程序 env.execute("Operator Chaining"); } } ``` map 算子被连接起来,并形成了一个算子链,这是由于 map 算子被重复使用,并且它们的操作类型相同。 - **算子的并行度相同** ```java DataStream lines = env.socketTextStream("localhost", 9999).setParallelism(4); DataStream words = lines.flatMap(new FlatMapFunction() { @Override public void flatMap(String line, Collector out) { for (String word : line.split(" ")) { out.collect(word); } } }).setParallelism(4); DataStream> counts = words .map(new MapFunction>() { @Override public Tuple2 map(String word) { return new Tuple2(word, 1); } }).setParallelism(4) .keyBy(new KeySelector, String>() { @Override public String getKey(Tuple2 tuple) { return tuple.f0; } }).sum(1).setParallelism(4); counts.print().setParallelism(4); env.execute("WordCount"); ``` 相邻的两个算子 flatMap 和 map 的并行度都设置为了 4。因此,它们被合并为了一个算子链。 - **用户手动合并** ```java DataStream lines = env.socketTextStream("localhost", 9999); DataStream> wordCounts = lines .transform("MyWordCount", TypeInformation.of(new TypeHint>() {}), new MyWordCountFunction()) .setParallelism(4); wordCounts.print(); env.execute("User-defined Operator Chain Example"); ```