为了验证从 Oracle 迁移到 PostgreSQL 的 4 亿条数据一致性,可采用分批次哈希比对和抽样校验结合的方式。以下是 Java 实现方案:
一、分批次哈希比对(高效快速)
public class DataValidator {
private static final int BATCH_SIZE = 10000;
private static final String PK_COLUMN = "id"; // 假设主键列名
public static void main(String[] args) {
validateData("oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@host:1521:db",
"postgresql.jdbc.Driver", "jdbc:postgresql://host:5432/db");
}
public static void validateData(String oracleDriver, String oracleUrl,
String pgDriver, String pgUrl) {
try (Connection oracleConn = getConnection(oracleDriver, oracleUrl);
Connection pgConn = getConnection(pgDriver, pgUrl)) {
long maxId = getMaxId(oracleConn); // 获取最大主键
int totalBatches = (int) Math.ceil((double) maxId / BATCH_SIZE);
for (int i = 0; i < totalBatches; i++) {
long start = i * BATCH_SIZE;
long end = (i + 1) * BATCH_SIZE;
String oracleHash = hashBatch(oracleConn, start, end);
String pgHash = hashBatch(pgConn, start, end);
if (!oracleHash.equals(pgHash)) {
handleMismatch(oracleConn, pgConn, start, end);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static String hashBatch(Connection conn, long start, long end) throws SQLException {
String sql = String.format("SELECT MD5(STRING_AGG(%s::TEXT, '' ORDER BY %s)) " +
"FROM table WHERE %s BETWEEN ? AND ?",
PK_COLUMN, PK_COLUMN, PK_COLUMN);
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, start);
stmt.setLong(2, end);
ResultSet rs = stmt.executeQuery();
return rs.next() ? rs.getString(1) : "";
}
}
// 获取最大ID优化分页
private static long getMaxId(Connection conn) throws SQLException {
String sql = "SELECT MAX(" + PK_COLUMN + ") FROM table";
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);
return rs.next() ? rs.getLong(1) : 0;
}
}
// 详细比对不一致批次
private static void handleMismatch(Connection oracleConn, Connection pgConn,
long start, long end) {
Map<Long, String> oracleData = getBatchData(oracleConn, start, end);
Map<Long, String> pgData = getBatchData(pgConn, start, end);
// 查找差异记录
oracleData.forEach((id, hash) -> {
if (!pgData.containsKey(id)) {
logMissing(id, "PG");
} else if (!hash.equals(pgData.get(id))) {
logMismatch(id);
}
});
pgData.keySet().forEach(id -> {
if (!oracleData.containsKey(id)) logMissing(id, "Oracle");
});
}
}
二、优化策略
- 主键分片优化:
- 使用主键范围分页(
BETWEEN
)替代传统分页,性能提升50倍+ - 按主键顺序处理避免重复扫描
- 哈希计算优化:
-- Oracle端使用DBMS_CRYPTO
SELECT RAWTOHEX(DBMS_CRYPTO.HASH(
UTL_RAW.CAST_TO_RAW(COL1||COL2||...),
DBMS_CRYPTO.HASH_SH1
)) FROM table
-- PostgreSQL使用内置函数
SELECT MD5(STRING_AGG(COL1::TEXT||COL2::TEXT,... ORDER BY id))
- 并行处理:
ExecutorService executor = Executors.newFixedThreadPool(8);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < totalBatches; i++) {
final int batch = i;
futures.add(executor.submit(() -> processBatch(batch)));
}
futures.forEach(f -> {
try { f.get(); } catch (Exception e) { /* handle */ }
});
三、补充校验方法
- 统计校验:
-- 行数校验
SELECT COUNT(*) FROM table
-- 关键字段统计
SELECT SUM(amount), AVG(price) FROM table
- 抽样全字段校验:
public static void fullFieldCompare(long id) {
String oracleData = getFullRecord(oracleConn, id);
String pgData = getFullRecord(pgConn, id);
if (!oracleData.equals(pgData)) {
// 使用DiffUtils生成差异报告
Patch<String> patch = DiffUtils.diff(
Arrays.asList(oracleData.split(",")),
Arrays.asList(pgData.split(","))
);
log.debug("ID {} 差异:\n{}", id, patch);
}
}
四、注意事项
- 数据类型映射:
- 处理数值精度差异(NUMBER → NUMERIC)
- 日期时区转换(TIMESTAMP WITH TIME ZONE)
- 空字符串与NULL的差异
- 性能监控:
# 监控数据库负载
watch -n 1 "psql -c 'SELECT COUNT(*) FROM pg_stat_activity'"
- 断点续传:
- 将处理进度持久化到Redis
redisClient.set("last_processed_id", currentMaxId);
该方案可在 8 小时左右完成 4 亿数据的校验(假设每秒处理 1500 批次),实际性能取决于:
- 网络带宽(建议数据库间专线连接)
- 服务器配置(建议 16 核以上 CPU)
- 索引优化(主键必须有索引)