一、整体架构设计
-
分阶段处理
将同步拆分为全量迁移和增量同步两个阶段:
• 全量迁移:首次一次性迁移所有历史数据(4亿条)
• 增量同步:后续通过监听Oracle日志实现实时同步(参考物化视图日志或触发器)
注:若需实时同步,可结合Oracle GoldenGate或Debezium实现CDC(变更数据捕获) -
模块化设计
• 数据读取层:分页查询Oracle数据
• 转换层:处理数据类型差异(如Oracle的NUMBER
转PostgreSQL的NUMERIC
)
• 写入层:批量插入PostgreSQL
• 监控层:记录进度、失败重试及性能指标
二、关键代码实现(核心逻辑)
1. 分页查询优化(Oracle端)
// 使用ROWID分页避免深分页性能问题
String sql = "SELECT * FROM oracle_table WHERE ROWID IN (" +
"SELECT rid FROM (" +
" SELECT ROWID rid, ROWNUM rn FROM oracle_table ORDER BY ROWID" +
") WHERE rn BETWEEN ? AND ?)";
try (Connection oraConn = oraDataSource.getConnection();
PreparedStatement pstmt = oraConn.prepareStatement(sql)) {
pstmt.setInt(1, start);
pstmt.setInt(2, end);
ResultSet rs = pstmt.executeQuery();
// 流式处理结果集
}
优化点:
• 通过ROWID分段读取,避免OFFSET
导致的性能衰减
• 每批次处理10万~100万条数据(根据内存调整)
2. 多线程并行处理
ExecutorService executor = Executors.newFixedThreadPool(8); // 根据CPU核数调整
List<Future<Integer>> futures = new ArrayList<>();
// 将4亿数据划分为1000个区间(每个区间40万条)
for (int i = 0; i < 1000; i++) {
int start = i * 400000 + 1;
int end = (i + 1) * 400000;
futures.add(executor.submit(() -> syncRange(start, end)));
}
// 等待所有任务完成
for (Future<Integer> future : futures) {
future.get();
}
3. 批量写入优化(PostgreSQL端)
// 使用COPY命令实现高速导入(比INSERT快10倍以上)
String copySql = "COPY pg_table (col1, col2) FROM STDIN WITH (FORMAT binary)";
try (Connection pgConn = pgDataSource.getConnection();
CopyManager copyManager = new CopyManager((BaseConnection) pgConn)) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(buffer);
while (hasNext()) {
// 将数据转换为PostgreSQL二进制格式
writeBinaryData(dos, data);
if (buffer.size() > 10_485_760) { // 每10MB提交一次
copyManager.copyIn(copySql, new ByteArrayInputStream(buffer.toByteArray()));
buffer.reset();
}
}
// 提交剩余数据
if (buffer.size() > 0) {
copyManager.copyIn(copySql, new ByteArrayInputStream(buffer.toByteArray()));
}
}
优化点:
• 采用PostgreSQL的COPY
命令代替INSERT
• 二进制传输减少序列化开销
• 批量提交控制内存使用
三、性能调优策略
- 连接池配置
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://host/db");
config.setUsername("user");
config.setPassword("pass");
config.setMaximumPoolSize(20); // 根据数据库连接数限制调整
config.setConnectionTimeout(30000);
HikariDataSource ds = new HikariDataSource(config);
- JVM参数优化
-Xmx16G -Xms16G
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-
数据库层面优化
• PostgreSQL:
ALTER TABLE pg_table SET (autovacuum_enabled = off); -- 导入期间关闭自动清理
DROP INDEX idx_pg_table; -- 导入前删除索引,完成后重建
• Oracle:
ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'; -- 统一日期格式
四、异常处理机制
-
断点续传
记录已处理的数据区间到Redis或文件,重启时跳过已完成部分:
try {
processBatch(start, end);
markProcessed(start, end); // 记录到Redis
} catch (Exception e) {
log.error("Batch {}-{} failed: {}", start, end, e.getMessage());
retryQueue.add(new BatchRange(start, end));
}
- 错误重试策略
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3)); // 最多重试3次
retryTemplate.execute(context -> {
copyManager.copyIn(...);
return null;
});
五、预估性能指标
优化项 |
未优化速度 |
优化后速度 |
单线程插入速度 |
~5k rec/s |
~50k rec/s |
8线程并行+COPY命令 |
- |
~300k rec/s |
4亿条数据预计耗时 |
~220小时 |
~4小时 |
注:实际性能取决于硬件配置(建议使用SSD存储、万兆网络)
六、扩展方案
- 分布式架构:若单机性能不足,可采用Spark分布式处理:
JavaRDD<Row> oraData = sparkSession.read()
.format("jdbc")
.option("url", oraUrl)
.option("dbtable", "(SELECT * FROM table) tmp")
.load()
.javaRDD();
oraData.foreachPartition(partition -> {
// 每个分区写入PostgreSQL
});
-
增量同步实现:
• 使用Oracle的物化视图日志(MATERIALIZED VIEW LOG)
• 通过ORA_ROWSCN
字段识别变更数据
• 结合Kafka实现异步消息队列
以上方案通过多级优化组合,能够在保证数据一致性的前提下,显著提升海量数据同步效率。建议先在小规模数据(如100万条)验证流程,再逐步放大到全量数据。