现需求:需要批量将1000个数据先查询在更新到mongodb(如果查询不到数据,则添加数据)
1:工具类BathUpdateOptions
1 import org.springframework.data.mongodb.core.query.Query; 2 import org.springframework.data.mongodb.core.query.Update; 3 4 public class BathUpdateOptions { 5 6 private Query query; 7 private Update update; 8 private boolean upsert = true; 9 private boolean multi = false; 10 11 public Query getQuery() { 12 return query; 13 } 14 15 public void setQuery(Query query) { 16 this.query = query; 17 } 18 19 public Update getUpdate() { 20 return update; 21 } 22 23 public void setUpdate(Update update) { 24 this.update = update; 25 } 26 27 public boolean isUpsert() { 28 return upsert; 29 } 30 31 public void setUpsert(boolean upsert) { 32 this.upsert = upsert; 33 } 34 35 public boolean isMulti() { 36 return multi; 37 } 38 39 public void setMulti(boolean multi) { 40 this.multi = multi; 41 } 42 43 }
2:需要更新的domain
1 @Document(collection = "video_show_view") 2 public class VideoShowView { 3 //唯一值 4 private String id; 5 //节目id 6 private String cid; 7 //app播放次数 8 private String view; 9 //app虚拟播放次数 10 private String virtualViews; 11 //最后更新时间 时间戳 12 private String updateTime; 13 //get set ...... 14 }
3:获取BathUpdateOptions 集合
/** * @author xuyou * @Package com.migu.live.mao * @Description: * @date 2018/6/11 16:13 */ @Repository public class VideoShowViewMao { @Autowired private MongoTemplate mongoTemplate; public BathUpdateOptions getBathUpdateOptions(VideoShowView videoShowView){ BathUpdateOptions options = new BathUpdateOptions(); Query query = new Query(); //查询条件 query.addCriteria(Criteria.where("cid").is(videoShowView.getCid())); query.addCriteria(Criteria.where("types").is(videoShowView.getTypes())); options.setQuery(query); //mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新。 options.setMulti(true); Update update = new Update(); //更新内容 update.set("view", videoShowView.getView()); update.set("updateTime", videoShowView.getUpdateTime()); options.setUpdate(update); return options; } public void bathUpdate(List<BathUpdateOptions> bups){ BathUpdateUtil.bathUpdate(mongoTemplate, VideoShowView.class, bups); } }
4:操作mongodb的工具类BathUpdateUtil
1 import java.util.ArrayList; 2 import java.util.List; 3 4 import org.springframework.dao.InvalidDataAccessApiUsageException; 5 import org.springframework.data.mongodb.core.MongoTemplate; 6 import org.springframework.data.mongodb.core.mapping.Document; 7 8 import com.migu.live.data.BathUpdateOptions; 9 import com.mongodb.BasicDBObject; 10 import com.mongodb.CommandResult; 11 import com.mongodb.DBCollection; 12 import com.mongodb.DBObject; 13 14 public class BathUpdateUtil { 15 16 /** 17 * @description:批量更新mongodb数据 18 * @author: xuyou 19 * @date: 2018年8月14日 上午11:28:29 20 */ 21 public static int bathUpdate(MongoTemplate mongoTemplate, Class<?> entityClass, 22 List<BathUpdateOptions> options) { 23 String collectionName = determineCollectionName(entityClass); 24 return doBathUpdate(mongoTemplate.getCollection(collectionName), 25 collectionName, options, true); 26 } 27 28 private static String determineCollectionName(Class<?> entityClass) { 29 if (entityClass == null) { 30 throw new InvalidDataAccessApiUsageException( 31 "No class parameter provided, entity collection can\'t be determined!"); 32 } 33 String collName = entityClass.getSimpleName(); 34 if(entityClass.isAnnotationPresent(Document.class)) { 35 Document document = entityClass.getAnnotation(Document.class); 36 collName = document.collection(); 37 } else { 38 collName = collName.replaceFirst(collName.substring(0, 1) 39 ,collName.substring(0, 1).toLowerCase()) ; 40 } 41 return collName; 42 } 43 44 private static int doBathUpdate(DBCollection dbCollection, String collName, 45 List<BathUpdateOptions> options, boolean ordered) { 46 DBObject command = new BasicDBObject(); 47 command.put("update", collName); 48 List<BasicDBObject> updateList = new ArrayList<BasicDBObject>(); 49 for (BathUpdateOptions option : options) { 50 BasicDBObject update = new BasicDBObject(); 51 update.put("q", option.getQuery().getQueryObject()); 52 update.put("u", option.getUpdate().getUpdateObject()); 53 update.put("upsert", option.isUpsert()); 54 update.put("multi", option.isMulti()); 55 updateList.add(update); 56 } 57 command.put("updates", updateList); 58 command.put("ordered", ordered); 59 CommandResult commandResult = dbCollection.getDB().command(command); 60 return Integer.parseInt(commandResult.get("n").toString()); 61 } 62 }
5:业务代码 可根据实际需求 进行修改
1 /** 2 * @description:执行更新 3 * @param liveKeys 需要更新的集合 4 * @return void 5 * @author: xuyou 6 * @date: 2018年8月14日 上午11:44:06 7 * @throws 8 */ 9 public void bathUpdateMongoDB (Set<String> liveKeys){ 10 //将直播播放次数入mongoDB 11 List<BathUpdateOptions> bupsList = new ArrayList<BathUpdateOptions>(); 12 for (String id : liveKeys) { 13 VideoShowView videoShowView = new VideoShowView(); 14 //设置一些更新条件 此处省略 15 videoShowView.setUpdateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); 16 BathUpdateOptions options = videoShowViewMao.getBathUpdateOptions(videoShowView); 17 bupsList.add(options); 18 if(bupsList.size() >= 1000){ 19 logger.info("bupsList : {}",bupsList.size()); 20 videoShowViewMao.bathUpdate(bupsList); 21 bupsList = new ArrayList<BathUpdateOptions>(); 22 } 23 } 24 //TODO 更新liveList剩余少于1000的数据 25 logger.info("bupsList : {}",bupsList.size()); 26 if (bupsList != null && bupsList.size() > 0) { 27 videoShowViewMao.bathUpdate(bupsList); 28 } 29 }
6;pom文件
1 <!-- 对mongodb的支持 --> 2 <dependency> 3 <groupId>org.springframework.boot</groupId> 4 <artifactId>spring-boot-starter-data-mongodb</artifactId> 5 <version>2.0.2.RELEASE</version> 6 </dependency>
7:配置
spring.data.mongodb.uri=mongodb://10.200.85.97:27017,10.200.85.98:27017,10.200.85.99:27017/data_consumer