Oracle数据迁移到PostgreSQL一致性验证方案

时间:2025-04-15 11:34:05

为了验证从 Oracle 迁移到 PostgreSQL 的 4 亿条数据一致性,可采用分批次哈希比对和抽样校验结合的方式。以下是 Java 实现方案:

一、分批次哈希比对(高效快速)

public class DataValidator {
    private static final int BATCH_SIZE = 10000;
    private static final String PK_COLUMN = "id"; // 假设主键列名

    public static void main(String[] args) {
        validateData("oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@host:1521:db",
                    "postgresql.jdbc.Driver", "jdbc:postgresql://host:5432/db");
    }

    public static void validateData(String oracleDriver, String oracleUrl,
                                   String pgDriver, String pgUrl) {
        try (Connection oracleConn = getConnection(oracleDriver, oracleUrl);
             Connection pgConn = getConnection(pgDriver, pgUrl)) {
            
            long maxId = getMaxId(oracleConn); // 获取最大主键
            int totalBatches = (int) Math.ceil((double) maxId / BATCH_SIZE);

            for (int i = 0; i < totalBatches; i++) {
                long start = i * BATCH_SIZE;
                long end = (i + 1) * BATCH_SIZE;

                String oracleHash = hashBatch(oracleConn, start, end);
                String pgHash = hashBatch(pgConn, start, end);

                if (!oracleHash.equals(pgHash)) {
                    handleMismatch(oracleConn, pgConn, start, end);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static String hashBatch(Connection conn, long start, long end) throws SQLException {
        String sql = String.format("SELECT MD5(STRING_AGG(%s::TEXT, '' ORDER BY %s)) " +
                                  "FROM table WHERE %s BETWEEN ? AND ?", 
                                  PK_COLUMN, PK_COLUMN, PK_COLUMN);
        
        try (PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setLong(1, start);
            stmt.setLong(2, end);
            ResultSet rs = stmt.executeQuery();
            return rs.next() ? rs.getString(1) : "";
        }
    }

    // 获取最大ID优化分页
    private static long getMaxId(Connection conn) throws SQLException {
        String sql = "SELECT MAX(" + PK_COLUMN + ") FROM table";
        try (Statement stmt = conn.createStatement()) {
            ResultSet rs = stmt.executeQuery(sql);
            return rs.next() ? rs.getLong(1) : 0;
        }
    }

    // 详细比对不一致批次
    private static void handleMismatch(Connection oracleConn, Connection pgConn, 
                                      long start, long end) {
        Map<Long, String> oracleData = getBatchData(oracleConn, start, end);
        Map<Long, String> pgData = getBatchData(pgConn, start, end);

        // 查找差异记录
        oracleData.forEach((id, hash) -> {
            if (!pgData.containsKey(id)) {
                logMissing(id, "PG");
            } else if (!hash.equals(pgData.get(id))) {
                logMismatch(id);
            }
        });

        pgData.keySet().forEach(id -> {
            if (!oracleData.containsKey(id)) logMissing(id, "Oracle");
        });
    }
}

二、优化策略

  1. 主键分片优化
  • 使用主键范围分页(BETWEEN)替代传统分页,性能提升50倍+
  • 按主键顺序处理避免重复扫描
  1. 哈希计算优化
-- Oracle端使用DBMS_CRYPTO
SELECT RAWTOHEX(DBMS_CRYPTO.HASH(
  UTL_RAW.CAST_TO_RAW(COL1||COL2||...),
  DBMS_CRYPTO.HASH_SH1
)) FROM table

-- PostgreSQL使用内置函数
SELECT MD5(STRING_AGG(COL1::TEXT||COL2::TEXT,... ORDER BY id))
  1. 并行处理
ExecutorService executor = Executors.newFixedThreadPool(8);
List<Future<?>> futures = new ArrayList<>();

for (int i = 0; i < totalBatches; i++) {
    final int batch = i;
    futures.add(executor.submit(() -> processBatch(batch)));
}

futures.forEach(f -> {
    try { f.get(); } catch (Exception e) { /* handle */ }
});

三、补充校验方法

  1. 统计校验
-- 行数校验
SELECT COUNT(*) FROM table

-- 关键字段统计
SELECT SUM(amount), AVG(price) FROM table
  1. 抽样全字段校验
public static void fullFieldCompare(long id) {
    String oracleData = getFullRecord(oracleConn, id);
    String pgData = getFullRecord(pgConn, id);
    
    if (!oracleData.equals(pgData)) {
        // 使用DiffUtils生成差异报告
        Patch<String> patch = DiffUtils.diff(
            Arrays.asList(oracleData.split(",")),
            Arrays.asList(pgData.split(","))
        );
        log.debug("ID {} 差异:\n{}", id, patch);
    }
}

四、注意事项

  1. 数据类型映射
  • 处理数值精度差异(NUMBER → NUMERIC)
  • 日期时区转换(TIMESTAMP WITH TIME ZONE)
  • 空字符串与NULL的差异
  1. 性能监控
# 监控数据库负载
watch -n 1 "psql -c 'SELECT COUNT(*) FROM pg_stat_activity'"
  1. 断点续传
  • 将处理进度持久化到Redis
redisClient.set("last_processed_id", currentMaxId);

该方案可在 8 小时左右完成 4 亿数据的校验(假设每秒处理 1500 批次),实际性能取决于:

  • 网络带宽(建议数据库间专线连接)
  • 服务器配置(建议 16 核以上 CPU)
  • 索引优化(主键必须有索引)