Flink API 解析 Flink Job 依赖的checkpoint 路径

时间:2025-02-25 10:02:12
import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; import org.apache.flink.runtime.state.*; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import java.io.*; import java.util.HashSet; import java.util.Set; /* * @author: david.zhou * @Flink Version: 1.14.4 * @Date: 2025/2/24 11:25 * @Description: Flink API 解析 Checkpoint 路径 * */ public class CheckpointMetadataParser { private static Set<String> ckPath = new HashSet<>(); public static void main(String[] args) throws IOException { // 读取元数据文件 File f=new File("/tmp/_metadata"); FileInputStream fis=new FileInputStream(f); BufferedInputStream bis = new BufferedInputStream(fis); DataInputStream dis = new DataInputStream(bis); // 通过 Flink 的 Checkpoints 类解析元数据文件 CheckpointMetadata savepoint = Checkpoints.loadCheckpointMetadata(dis, CheckpointMetadataParser.class.getClassLoader(), f.getAbsolutePath()); // 打印当前的 CheckpointId System.out.println("CheckpointId:" + savepoint.getCheckpointId()); // 遍历 OperatorState,这里的每个 OperatorState 对应一个 Flink 任务的 Operator 算子 // 不要与 O // peratorState 和 KeyedState 混淆,不是一个层级的概念 for(OperatorState operatorState :savepoint.getOperatorStates()) { //System.out.println(operatorState); // 当前算子的状态大小为 0 ,表示算子不带状态,直接退出 if(operatorState.getStateSize() == 0){ continue; } // 遍历当前算子的所有 subtask for(OperatorSubtaskState operatorSubtaskState: operatorState.getStates()) { // 解析 operatorSubtaskState 的 ManagedKeyedState parseManagedKeyedState(operatorSubtaskState); // 解析 operatorSubtaskState 的 ManagedOperatorState parseManagedOperatorState(operatorSubtaskState); } } for(String path: ckPath) { System.out.println("sstable 文件对应的 hdfs 位置:" + path); } } /** * 解析 operatorSubtaskState 的 ManagedKeyedState * @param operatorSubtaskState operatorSubtaskState */ private static void parseManagedKeyedState(OperatorSubtaskState operatorSubtaskState) { // 遍历当前 subtask 的 KeyedState for(KeyedStateHandle keyedStateHandle:operatorSubtaskState.getManagedKeyedState()) { // 处理增量 Checkpoint if(keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) { IncrementalRemoteKeyedStateHandle incrementalStateHandle = (IncrementalRemoteKeyedStateHandle) keyedStateHandle; // 获取 RocksDB 的 sharedState for (StateHandleID stateHandleID : incrementalStateHandle.getSharedStateHandleIDs()) { StreamStateHandle stateHandle = incrementalStateHandle.getSharedState().get(stateHandleID); //System.out.println("sstable 文件名:" + stateHandleID); if (stateHandle instanceof FileStateHandle) { Path filePath = ((FileStateHandle) stateHandle).getFilePath(); //System.out.println("filePath = " + filePath); String ckSubPath = filePath.getPath().substring(0, filePath.getPath().indexOf("/shared")); ckPath.add(ckSubPath); } } } } } /** * 解析 operatorSubtaskState 的 ManagedOperatorState * @param operatorSubtaskState operatorSubtaskState */ private static void parseManagedOperatorState(OperatorSubtaskState operatorSubtaskState) { // 遍历当前 subtask 的 OperatorState for(OperatorStateHandle operatorStateHandle:operatorSubtaskState.getManagedOperatorState()) { StreamStateHandle delegateStateHandle = operatorStateHandle.getDelegateStateHandle(); if(delegateStateHandle instanceof FileStateHandle) { Path filePath = ((FileStateHandle) delegateStateHandle).getFilePath(); //System.out.println("filePath: " + filePath.getPath()); } } } }