Flink API 解析 Flink Job 依赖的checkpoint 路径
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.*;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import java.io.*;
import java.util.HashSet;
import java.util.Set;
/*
* @author: david.zhou
* @Flink Version: 1.14.4
* @Date: 2025/2/24 11:25
* @Description: Flink API 解析 Checkpoint 路径
* */
public class CheckpointMetadataParser {
private static Set<String> ckPath = new HashSet<>();
public static void main(String[] args) throws IOException {
// 读取元数据文件
File f=new File("/tmp/_metadata");
FileInputStream fis=new FileInputStream(f);
BufferedInputStream bis = new BufferedInputStream(fis);
DataInputStream dis = new DataInputStream(bis);
// 通过 Flink 的 Checkpoints 类解析元数据文件
CheckpointMetadata savepoint = Checkpoints.loadCheckpointMetadata(dis,
CheckpointMetadataParser.class.getClassLoader(), f.getAbsolutePath());
// 打印当前的 CheckpointId
System.out.println("CheckpointId:" + savepoint.getCheckpointId());
// 遍历 OperatorState,这里的每个 OperatorState 对应一个 Flink 任务的 Operator 算子
// 不要与 O
// peratorState 和 KeyedState 混淆,不是一个层级的概念
for(OperatorState operatorState :savepoint.getOperatorStates()) {
//System.out.println(operatorState);
// 当前算子的状态大小为 0 ,表示算子不带状态,直接退出
if(operatorState.getStateSize() == 0){
continue;
}
// 遍历当前算子的所有 subtask
for(OperatorSubtaskState operatorSubtaskState: operatorState.getStates()) {
// 解析 operatorSubtaskState 的 ManagedKeyedState
parseManagedKeyedState(operatorSubtaskState);
// 解析 operatorSubtaskState 的 ManagedOperatorState
parseManagedOperatorState(operatorSubtaskState);
}
}
for(String path: ckPath) {
System.out.println("sstable 文件对应的 hdfs 位置:" + path);
}
}
/**
* 解析 operatorSubtaskState 的 ManagedKeyedState
* @param operatorSubtaskState operatorSubtaskState
*/
private static void parseManagedKeyedState(OperatorSubtaskState operatorSubtaskState) {
// 遍历当前 subtask 的 KeyedState
for(KeyedStateHandle keyedStateHandle:operatorSubtaskState.getManagedKeyedState()) {
// 处理增量 Checkpoint
if(keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
IncrementalRemoteKeyedStateHandle incrementalStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
// 获取 RocksDB 的 sharedState
for (StateHandleID stateHandleID : incrementalStateHandle.getSharedStateHandleIDs()) {
StreamStateHandle stateHandle = incrementalStateHandle.getSharedState().get(stateHandleID);
//System.out.println("sstable 文件名:" + stateHandleID);
if (stateHandle instanceof FileStateHandle) {
Path filePath = ((FileStateHandle) stateHandle).getFilePath();
//System.out.println("filePath = " + filePath);
String ckSubPath = filePath.getPath().substring(0, filePath.getPath().indexOf("/shared"));
ckPath.add(ckSubPath);
}
}
}
}
}
/**
* 解析 operatorSubtaskState 的 ManagedOperatorState
* @param operatorSubtaskState operatorSubtaskState
*/
private static void parseManagedOperatorState(OperatorSubtaskState operatorSubtaskState) {
// 遍历当前 subtask 的 OperatorState
for(OperatorStateHandle operatorStateHandle:operatorSubtaskState.getManagedOperatorState()) {
StreamStateHandle delegateStateHandle = operatorStateHandle.getDelegateStateHandle();
if(delegateStateHandle instanceof FileStateHandle) {
Path filePath = ((FileStateHandle) delegateStateHandle).getFilePath();
//System.out.println("filePath: " + filePath.getPath());
}
}
}
}