/*--------------- StreamingJobGraphGenerator ------------------*/
// 从StreamNode递归创建JobVertex对象
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
// 创建算子链
for (OperatorChainInfo info : initialEntryPoints) {
createChain(
info.getStartNodeId(),
1, // 索引从1开始,0是Source
info,
chainEntryPoints);
}
}
// 创建算子链
private List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
final Map<Integer, OperatorChainInfo> chainEntryPoints) {
// 获取起始Node-ID
Integer startNodeId = chainInfo.getStartNodeId();
// builtVertices用于存放已经进行构建的StreamNode ID,避免重复构造
if (!builtVertices.contains(startNodeId)) {
// transitiveOutEdges 存储整个算子链的出边
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
// chainableOutputs 存储所有可以形成算子链的StreamEdge
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
// nonChainableOutputs 存储不可以形成算子链的StreamEdge
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
// 获取当前处理的SteamNode
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
// 对所有的StreamEdge进行处理,分为可以形成算子链和不可以形成算子链两类
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
// 如果是可以形成算子链的StreamEdge对象,递归调用createChain,并添加到transitiveOutEdges
// 递归结束条件:
// 1. 当前节点不再有出边;
// 2. 当前节点已经完成转换
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(
chainable.getTargetId(),
chainIndex + 1,
chainInfo,
chainEntryPoints));
}
// 如果是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainEntryPoints.computeIfAbsent(
nonChainable.getTargetId(),
(k) -> chainInfo.newChain(nonChainable.getTargetId())),
chainEntryPoints);
}
// 设置算子链名称
chainedNames.put(
currentNodeId,
createChainedName(
currentNodeId,
chainableOutputs,
Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
// 设置算子链所需最小资源
chainedMinResources.put(
currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
// 设置算子链所需最佳资源
chainedPreferredResources.put(
currentNodeId,
createChainedPreferredResources(currentNodeId, chainableOutputs));
//
OperatorID currentOperatorId =
chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId)
.addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId)
.addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
// 如果currentNodeId和startNodeId相等,说明需要创建一个新的chain,会生成一个JobVertex
StreamConfig config =
currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
// 设置的顶点属性到config中
setVertexConfig(
currentNodeId,
config,
chainableOutputs,
nonChainableOutputs,
chainInfo.getChainedSources());
if (currentNodeId.equals(startNodeId)) {
// 开始一个新的算子链的连接
config.setChainStart();
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
// 对于每一个算子链,把它和指向下一个算子链的出边连接起来
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
//
config.setOutEdgesInOrder(transitiveOutEdges);
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(
startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
// 判断是否可以形成算子链
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled())) {
return false;
}
for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
return false;
}
}
return true;
}
public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {
return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)
|| (slotSharingGroup != null
&& slotSharingGroup.equals(downstreamVertex.slotSharingGroup));
}
private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {
final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();
// 遍历所有的Source-StreamNode
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
// 根据ID获取StreamNode对象
final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);
if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory
&& sourceNode.getOutEdges().size() == 1) {
final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
final ChainingStrategy targetChainingStrategy =
target.getOperatorFactory().getChainingStrategy();
if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES
&& isChainableInput(sourceOutEdge, streamGraph)) {
final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
final StreamConfig.SourceInputConfig inputConfig =
new StreamConfig.SourceInputConfig(sourceOutEdge);
final StreamConfig operatorConfig = new StreamConfig(new Configuration());
setVertexConfig(
sourceNodeId,
operatorConfig,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap());
operatorConfig.setChainIndex(0); // sources are always first
operatorConfig.setOperatorID(opId);
operatorConfig.setOperatorName(sourceNode.getOperatorName());
chainedSources.put(
sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));
final SourceOperatorFactory<?> sourceOpFact =
(SourceOperatorFactory<?>) sourceNode.getOperatorFactory();
final OperatorCoordinator.Provider coord =
sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);
final OperatorChainInfo chainInfo =
chainEntryPoints.computeIfAbsent(
sourceOutEdge.getTargetId(),
(k) ->
new OperatorChainInfo(
sourceOutEdge.getTargetId(),
hashes,
legacyHashes,
chainedSources,
streamGraph));
chainInfo.addCoordinatorProvider(coord);
continue;
}
}
// 将SourceID-OperatorChainInfo添加到HashMap中
chainEntryPoints.put(
sourceNodeId,
new OperatorChainInfo(
sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));
}
return chainEntryPoints;
}
/*---------------------------------------------------------*/
/*--------------- ChainingStrategy ------------------*/
public enum ChainingStrategy {
// 最大程度连接前后算子
ALWAYS,
// 算子不会连接前后的算子形成算子链
NEVER,
// 算子只会连接后面的算子但是不会连接前面的算子
HEAD,
// 头部算子,尽可能连接多个source算子
HEAD_WITH_SOURCES;
// 默认连接策略是【ALWAYS】
public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}
/*--------------------------------------------------*/