java基于mongodb实现分布式锁的示例代码

时间:2022-05-19 00:18:11

原理

通过线程安全findAndModify 实现锁

实现

定义锁存储对象:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * mongodb 分布式锁
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "distributed-lock-doc")
public class LockDocument {
    @Id
    private String id;
    private long expireAt;
    private String token;
}

定义Lock API:

?
1
2
3
4
5
6
7
8
public interface LockService {
 
    String acquire(String key, long expiration);
 
    boolean release(String key, String token);
 
    boolean refresh(String key, String token, long expiration);
}

获取锁:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
  public String acquire(String key, long expiration) {
      Query query = Query.query(Criteria.where("_id").is(key));
      String token = this.generateToken();
      Update update = new Update()
          .setOnInsert("_id", key)
          .setOnInsert("expireAt", System.currentTimeMillis() + expiration)
          .setOnInsert("token", token);
 
      FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
                                                               .returnNew(true);
      LockDocument doc = mongoTemplate.findAndModify(query, update, options,
                                                     LockDocument.class);
      boolean locked = doc.getToken() != null && doc.getToken().equals(token);
 
      // 如果已过期
      if (!locked && doc.getExpireAt() < System.currentTimeMillis()) {
          DeleteResult deleted = this.mongoTemplate.remove(
              Query.query(Criteria.where("_id").is(key)
                                  .and("token").is(doc.getToken())
                                  .and("expireAt").is(doc.getExpireAt())),
              LockDocument.class);
          if (deleted.getDeletedCount() >= 1) {
              // 成功释放锁, 再次尝试获取锁
              return this.acquire(key, expiration);
          }
      }
 
      log.debug("Tried to acquire lock for key {} with token {} . Locked: {}",
                key, token, locked);
      return locked ? token : null;
  }

原理:

  • 先尝试upsert锁对象,如果成功且token一致,说明拿到锁
  • 否则加锁失败
  • 如果未拿到锁,但是锁已过期,尝试删除锁
    • 如果删除成功,再次尝试拿锁
    • 如果失败,说明锁可能已经续期了

释放和续期锁:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Override
public boolean release(String key, String token) {
    Query query = Query.query(Criteria.where("_id").is(key)
                                      .and("token").is(token));
    DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class);
    boolean released = deleted.getDeletedCount() == 1;
    if (released) {
        log.debug("Remove query successfully affected 1 record for key {} with token {}",
                  key, token);
    } else if (deleted.getDeletedCount() > 0) {
        log.error("Unexpected result from release for key {} with token {}, released {}",
                  key, token, deleted);
    } else {
        log.error("Remove query did not affect any records for key {} with token {}",
                  key, token);
    }
 
    return released;
}
 
@Override
public boolean refresh(String key, String token,
                       long expiration) {
    Query query = Query.query(Criteria.where("_id").is(key)
                                      .and("token").is(token));
    Update update = Update.update("expireAt",
                                  System.currentTimeMillis() + expiration);
    UpdateResult updated =
        mongoTemplate.updateFirst(query, update, LockDocument.class);
 
    final boolean refreshed = updated.getModifiedCount() == 1;
    if (refreshed) {
        log.debug("Refresh query successfully affected 1 record for key {} " +
                  "with token {}", key, token);
    } else if (updated.getModifiedCount() > 0) {
        log.error("Unexpected result from refresh for key {} with token {}, " +
                  "released {}", key, token, updated);
    } else {
        log.warn("Refresh query did not affect any records for key {} with token {}. " +
                 "This is possible when refresh interval fires for the final time " +
                 "after the lock has been released",
                 key, token);
    }
 
    return refreshed;
}

使用  

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private  LockService lockService;
 
private void tryAcquireLockAndSchedule() {
        while (!this.stopSchedule) {
            // 尝试拿锁
            this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000);
            if (this.token != null) {
    // 拿到锁
            } else {
                // 等待LOCK_EXPIRATION, 再次尝试
                Thread.sleep(LOCK_EXPIRATION);
            }
        }
    }
  • 先尝试拿锁,如果获取到token,说明拿锁成功
  • 否则可以sleep一段时间后再拿锁

完整代码,可到github查看 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java

到此这篇关于java基于mongodb实现分布式锁的示例代码的文章就介绍到这了,更多相关java mongodb实现分布式锁内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/xiaoqi/p/mongodb-lock.html