Flink 算子链可以合并的源码分析时间:2023-03-24 16:16:07- **源码示例** StreamingJobGraphGenerator.java ```java for (StreamEdge outEdge : currentNode.getOutEdges()) { if (isChainable(outEdge, streamGraph)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } } ``` - **isChainable()** ```java public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph); } ``` - **isChainableInput()** ```java private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && (edge.getPartitioner() instanceof ForwardPartitioner) && edge.getShuffleMode() != ShuffleMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { return false; } // check that we do not have a union operation, because unions currently only work // through the network/byte-channel stack. // we check that by testing that each "type" (which means input position) is used only once for (StreamEdge inEdge : downStreamVertex.getInEdges()) { if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) { return false; } } return true; } ```