最近使用mongodb实现一个需求,需要批量处理数据,并且存在则修改,不存在则添加,updateMany方法,无法达到要求,经查,发现mongodb中存在一个BulkWrite方法可以实现该需求,该操作可实现类似于mysql的insert into ... on duplicate key udpdate ...
使用如下命令创建用于测试的db和collection以及添加index并查看
use mockdb db.mockcoll.createIndex({"flag":1}) db.mockcoll.getIndexes()
mockcoll中只存在一个字段flag,并在该字段建立唯一索引
下面为代码示例
public interface MongoCons { String mockdbName = "mockdb"; String mockcollName = "mockcoll"; static MongoCollection<Document> getCollection(MongoTemplate mongoTemplate){ MongoDatabase mongoDatabase = mongoTemplate.getMongoDbFactory().getMongoDatabase(mockdbName); MongoCollection<Document> collection = mongoDatabase.getCollection(mockcollName); return collection; } }
@Component public class MongoBulkDao { private Logger logger = LoggerFactory.getLogger(MongoBulkDao.class); @Autowired private MongoTemplate mongoTemplate; public void addOrUpdate(List<String> dataList) { try { MongoCollection<Document> collection = MongoCons.getCollection(mongoTemplate); List<WriteModel<Document>> list = new ArrayList<>(); UpdateOptions upsert = new UpdateOptions().upsert(true); Iterator<String> iterator = dataList.iterator(); String next = null; while (iterator.hasNext()) { next = iterator.next(); BasicDBObject filter = new BasicDBObject().append("flag", next); BasicDBObject update = new BasicDBObject().append("$set", new BasicDBObject().append("flag", next)); UpdateOneModel<Document> um = new UpdateOneModel<Document>(filter, update, upsert); list.add(um); // 删除掉不需要的数据 iterator.remove(); } collection.bulkWrite(list, new BulkWriteOptions().ordered(false)); } catch (Exception e) { logger.error("", e); } } }
上述代码那种调用了bulkWrite的其中一个构造方法,new BulkWriteOptions().ordered(false)中orderd默认为ture,即如果其中一条执行错误,后面的将不再执行,为false,其中的一条执行错误,跳过去,继续执行下面的命令。