业务中的数据库使用的mongo,离线使用spark计算的每天统计指标需要累加到历史指标中,然后将结果写到mongo库中。如果mongo库中已经有这条记录则覆盖,mongo库中没有此记录则为新增。
官方文档:/spark-connector/master/java/write-to-mongodb/
根据此文档进行RDD写入,只会覆盖原有数据,无法更新,不符合需求。
Mongo Spark Connector不支持更新RDD,只有Dataset的数据形式才能更新;
所以将结果Dataset<Row>进行Append的形式写入mongo即可,因为mongo的主键是_id,所以要将Row的主键改成_id的列名。
具体实操如下:
Dataset<Row> rowDataset = ("select * from A");
Dataset<Row> mongoData = ("itemId", "_id");
// Create a custom WriteConfig Map<String, String> writeOverrides = new HashMap<>(); ("collection", "test_collection"); WriteConfig writeConfig = (jsc).withOptions(writeOverrides); (().mode(), writeConfig);
如果是Dataset<Object>形式的数据则调用.toDF()先转成Dataset<Row>
参考资料:/p/65f16fb61e96