使用java代码如何快速同步4个亿的数据,从Oracle到pg数据库

时间:2025-03-23 11:23:35

一、整体架构设计

  1. 分阶段处理
    将同步拆分为全量迁移增量同步两个阶段:
    全量迁移:首次一次性迁移所有历史数据(4亿条)
    增量同步:后续通过监听Oracle日志实现实时同步(参考物化视图日志或触发器)
    注:若需实时同步,可结合Oracle GoldenGate或Debezium实现CDC(变更数据捕获)
  2. 模块化设计
    数据读取层:分页查询Oracle数据
    转换层:处理数据类型差异(如Oracle的NUMBER转PostgreSQL的NUMERIC
    写入层:批量插入PostgreSQL
    监控层:记录进度、失败重试及性能指标

二、关键代码实现(核心逻辑)

1. 分页查询优化(Oracle端)
// 使用ROWID分页避免深分页性能问题
String sql = "SELECT * FROM oracle_table WHERE ROWID IN (" +
             "SELECT rid FROM (" +
             "   SELECT ROWID rid, ROWNUM rn FROM oracle_table ORDER BY ROWID" +
             ") WHERE rn BETWEEN ? AND ?)";

try (Connection oraConn = oraDataSource.getConnection();
     PreparedStatement pstmt = oraConn.prepareStatement(sql)) {
    pstmt.setInt(1, start);
    pstmt.setInt(2, end);
    ResultSet rs = pstmt.executeQuery();
    // 流式处理结果集
}

优化点
• 通过ROWID分段读取,避免OFFSET导致的性能衰减
• 每批次处理10万~100万条数据(根据内存调整)

2. 多线程并行处理
ExecutorService executor = Executors.newFixedThreadPool(8); // 根据CPU核数调整
List<Future<Integer>> futures = new ArrayList<>();

// 将4亿数据划分为1000个区间(每个区间40万条)
for (int i = 0; i < 1000; i++) {
    int start = i * 400000 + 1;
    int end = (i + 1) * 400000;
    futures.add(executor.submit(() -> syncRange(start, end)));
}

// 等待所有任务完成
for (Future<Integer> future : futures) {
    future.get();
}
3. 批量写入优化(PostgreSQL端)
// 使用COPY命令实现高速导入(比INSERT快10倍以上)
String copySql = "COPY pg_table (col1, col2) FROM STDIN WITH (FORMAT binary)";
try (Connection pgConn = pgDataSource.getConnection();
     CopyManager copyManager = new CopyManager((BaseConnection) pgConn)) {
    
    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(buffer);
    
    while (hasNext()) {
        // 将数据转换为PostgreSQL二进制格式
        writeBinaryData(dos, data);
        if (buffer.size() > 10_485_760) { // 每10MB提交一次
            copyManager.copyIn(copySql, new ByteArrayInputStream(buffer.toByteArray()));
            buffer.reset();
        }
    }
    // 提交剩余数据
    if (buffer.size() > 0) {
        copyManager.copyIn(copySql, new ByteArrayInputStream(buffer.toByteArray()));
    }
}

优化点
• 采用PostgreSQL的COPY命令代替INSERT • 二进制传输减少序列化开销
• 批量提交控制内存使用


三、性能调优策略

  1. 连接池配置
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://host/db");
config.setUsername("user");
config.setPassword("pass");
config.setMaximumPoolSize(20); // 根据数据库连接数限制调整
config.setConnectionTimeout(30000);
HikariDataSource ds = new HikariDataSource(config);
  1. JVM参数优化
-Xmx16G -Xms16G 
-XX:+UseG1GC 
-XX:MaxGCPauseMillis=200
  1. 数据库层面优化
    • PostgreSQL:
ALTER TABLE pg_table SET (autovacuum_enabled = off); -- 导入期间关闭自动清理
DROP INDEX idx_pg_table; -- 导入前删除索引,完成后重建

• Oracle:

ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'; -- 统一日期格式

四、异常处理机制

  1. 断点续传
    记录已处理的数据区间到Redis或文件,重启时跳过已完成部分:
try {
    processBatch(start, end);
    markProcessed(start, end); // 记录到Redis
} catch (Exception e) {
    log.error("Batch {}-{} failed: {}", start, end, e.getMessage());
    retryQueue.add(new BatchRange(start, end));
}
  1. 错误重试策略
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3)); // 最多重试3次
retryTemplate.execute(context -> {
    copyManager.copyIn(...);
    return null;
});

五、预估性能指标

优化项

未优化速度

优化后速度

单线程插入速度

~5k rec/s

~50k rec/s

8线程并行+COPY命令

-

~300k rec/s

4亿条数据预计耗时

~220小时

~4小时

注:实际性能取决于硬件配置(建议使用SSD存储、万兆网络)


六、扩展方案

  1. 分布式架构:若单机性能不足,可采用Spark分布式处理:
JavaRDD<Row> oraData = sparkSession.read()
    .format("jdbc")
    .option("url", oraUrl)
    .option("dbtable", "(SELECT * FROM table) tmp")
    .load()
    .javaRDD();

oraData.foreachPartition(partition -> {
    // 每个分区写入PostgreSQL
});
  1. 增量同步实现
    • 使用Oracle的物化视图日志(MATERIALIZED VIEW LOG)
    • 通过ORA_ROWSCN字段识别变更数据
    • 结合Kafka实现异步消息队列

以上方案通过多级优化组合,能够在保证数据一致性的前提下,显著提升海量数据同步效率。建议先在小规模数据(如100万条)验证流程,再逐步放大到全量数据。