springboot 整合 mongodb实现 批量更新数据

时间:2024-03-11 19:17:48

现需求:需要批量将1000个数据先查询在更新到mongodb(如果查询不到数据,则添加数据)

1:工具类BathUpdateOptions

 1 import org.springframework.data.mongodb.core.query.Query;
 2 import org.springframework.data.mongodb.core.query.Update;
 3 
 4 public class BathUpdateOptions {
 5     
 6     private Query query;
 7     private Update update;
 8     private boolean upsert = true;
 9     private boolean multi = false;
10 
11     public Query getQuery() {
12         return query;
13     }
14 
15     public void setQuery(Query query) {
16         this.query = query;
17     }
18 
19     public Update getUpdate() {
20         return update;
21     }
22 
23     public void setUpdate(Update update) {
24         this.update = update;
25     }
26 
27     public boolean isUpsert() {
28         return upsert;
29     }
30 
31     public void setUpsert(boolean upsert) {
32         this.upsert = upsert;
33     }
34 
35     public boolean isMulti() {
36         return multi;
37     }
38 
39     public void setMulti(boolean multi) {
40         this.multi = multi;
41     }
42 
43 }

2:需要更新的domain

 1 @Document(collection = "video_show_view")
 2 public class VideoShowView {
 3     //唯一值
 4     private String id;
 5     //节目id
 6     private String cid;
 7     //app播放次数
 8     private String view;
 9     //app虚拟播放次数
10     private String virtualViews;
11     //最后更新时间 时间戳
12     private String updateTime;
13   //get set ......
14 }

3:获取BathUpdateOptions 集合

/**
 * @author xuyou
 * @Package com.migu.live.mao
 * @Description:
 * @date 2018/6/11 16:13
 */
@Repository
public class VideoShowViewMao {
    @Autowired
    private MongoTemplate mongoTemplate;
    
    public BathUpdateOptions getBathUpdateOptions(VideoShowView videoShowView){
        BathUpdateOptions options = new BathUpdateOptions();
        Query query = new Query();
        //查询条件
        query.addCriteria(Criteria.where("cid").is(videoShowView.getCid()));
        query.addCriteria(Criteria.where("types").is(videoShowView.getTypes()));
        options.setQuery(query);
        //mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新。
        options.setMulti(true);
        Update update = new Update();
        //更新内容
        update.set("view", videoShowView.getView());
        update.set("updateTime", videoShowView.getUpdateTime());
        options.setUpdate(update);
        return options;
    }
    
    public void bathUpdate(List<BathUpdateOptions> bups){
        BathUpdateUtil.bathUpdate(mongoTemplate, VideoShowView.class, bups);
    }
}

4:操作mongodb的工具类BathUpdateUtil

 1 import java.util.ArrayList;
 2 import java.util.List;
 3 
 4 import org.springframework.dao.InvalidDataAccessApiUsageException;
 5 import org.springframework.data.mongodb.core.MongoTemplate;
 6 import org.springframework.data.mongodb.core.mapping.Document;
 7 
 8 import com.migu.live.data.BathUpdateOptions;
 9 import com.mongodb.BasicDBObject;
10 import com.mongodb.CommandResult;
11 import com.mongodb.DBCollection;
12 import com.mongodb.DBObject;
13 
14 public class BathUpdateUtil {
15 
16     /**
17      * @description:批量更新mongodb数据
18      * @author: xuyou
19      * @date: 2018年8月14日 上午11:28:29 
20      */
21     public static int bathUpdate(MongoTemplate mongoTemplate, Class<?> entityClass, 
22         List<BathUpdateOptions> options) {
23         String collectionName = determineCollectionName(entityClass);
24         return doBathUpdate(mongoTemplate.getCollection(collectionName), 
25              collectionName, options, true);
26     }
27     
28     private static String determineCollectionName(Class<?> entityClass) {
29         if (entityClass == null) {
30             throw new InvalidDataAccessApiUsageException(
31                     "No class parameter provided, entity collection can\'t be determined!");
32         }
33         String collName = entityClass.getSimpleName();
34         if(entityClass.isAnnotationPresent(Document.class)) {
35             Document document = entityClass.getAnnotation(Document.class);
36             collName = document.collection();
37         } else {
38             collName = collName.replaceFirst(collName.substring(0, 1)
39                       ,collName.substring(0, 1).toLowerCase()) ;
40         }
41         return collName;
42     }
43 
44     private static int doBathUpdate(DBCollection dbCollection, String collName,         
45                        List<BathUpdateOptions> options, boolean ordered) {
46         DBObject command = new BasicDBObject();
47         command.put("update", collName);
48         List<BasicDBObject> updateList = new ArrayList<BasicDBObject>();
49         for (BathUpdateOptions option : options) {
50             BasicDBObject update = new BasicDBObject();
51             update.put("q", option.getQuery().getQueryObject());
52             update.put("u", option.getUpdate().getUpdateObject());
53             update.put("upsert", option.isUpsert());
54             update.put("multi", option.isMulti());
55             updateList.add(update);
56         }
57         command.put("updates", updateList);
58         command.put("ordered", ordered);
59         CommandResult commandResult = dbCollection.getDB().command(command);
60         return Integer.parseInt(commandResult.get("n").toString());
61     }
62 }

5:业务代码  可根据实际需求 进行修改

 1 /**
 2      * @description:执行更新
 3      * @param liveKeys 需要更新的集合
 4      * @return void     
 5      * @author: xuyou
 6      * @date: 2018年8月14日 上午11:44:06 
 7      * @throws
 8      */
 9     public void bathUpdateMongoDB (Set<String> liveKeys){
10         //将直播播放次数入mongoDB
11         List<BathUpdateOptions> bupsList = new ArrayList<BathUpdateOptions>();
12         for (String id : liveKeys) {
13             VideoShowView videoShowView = new VideoShowView();
14             //设置一些更新条件 此处省略
15             videoShowView.setUpdateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
16             BathUpdateOptions options = videoShowViewMao.getBathUpdateOptions(videoShowView);
17             bupsList.add(options);
18             if(bupsList.size() >= 1000){
19                 logger.info("bupsList : {}",bupsList.size());
20                 videoShowViewMao.bathUpdate(bupsList);
21                 bupsList = new ArrayList<BathUpdateOptions>();
22             }
23         }
24         //TODO 更新liveList剩余少于1000的数据
25         logger.info("bupsList : {}",bupsList.size());
26         if (bupsList != null && bupsList.size() > 0) {
27             videoShowViewMao.bathUpdate(bupsList);
28         }
29     }

6;pom文件

1     <!-- 对mongodb的支持 -->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-data-mongodb</artifactId>
5             <version>2.0.2.RELEASE</version>
6         </dependency>

7:配置

spring.data.mongodb.uri=mongodb://10.200.85.97:27017,10.200.85.98:27017,10.200.85.99:27017/data_consumer