spark如何将数据写入mongo

时间:2025-04-12 07:34:21

业务中的数据库使用的mongo,离线使用spark计算的每天统计指标需要累加到历史指标中,然后将结果写到mongo库中。如果mongo库中已经有这条记录则覆盖,mongo库中没有此记录则为新增。

 

官方文档:/spark-connector/master/java/write-to-mongodb/

根据此文档进行RDD写入,只会覆盖原有数据,无法更新,不符合需求。

 

Mongo Spark Connector不支持更新RDD,只有Dataset的数据形式才能更新;

所以将结果Dataset<Row>进行Append的形式写入mongo即可,因为mongo的主键是_id,所以要将Row的主键改成_id的列名。

 

具体实操如下:

 

Dataset<Row> rowDataset = ("select * from A");
Dataset<Row> mongoData = ("itemId", "_id");
// Create a custom WriteConfig
Map<String, String> writeOverrides = new HashMap<>();
("collection", "test_collection");
WriteConfig writeConfig = (jsc).withOptions(writeOverrides);

(().mode(), writeConfig);
如果是Dataset<Object>形式的数据则调用.toDF()先转成Dataset<Row>

 

参考资料:/p/65f16fb61e96