获取锁实现思路:
1. 首先创建一个作为锁目录(znode),通常用它来描述锁定的实体,称为:/lock_node
2. 希望获得锁的客户端在锁目录下创建znode,作为锁/lock_node的子节点,并且节点类型为有序临时节点(EPHEMERAL_SEQUENTIAL);
例如:有两个客户端创建znode,分别为/lock_node/lock-1和/lock_node/lock-2
3. 当前客户端调用getChildren(/lock_node)得到锁目录所有子节点,不设置watch,接着获取小于自己(步骤2创建)的兄弟节点
4. 步骤3中获取小于自己的节点不存在 && 最小节点与步骤2中创建的相同,说明当前客户端顺序号最小,获得锁,结束。
5. 客户端监视(watch)相对自己次小的有序临时节点状态
6. 如果监视的次小节点状态发生变化,则跳转到步骤3,继续后续操作,直到退出锁竞争。
public synchronized boolean lock() throws KeeperException, InterruptedException { if (isClosed()) { return false; } // 如果锁目录不存在, 创建锁目录 节点类型为永久类型 ensurePathExists(dir); // 创建锁节点,节点类型EPHEMERAL_SEQUENTIAL // 如果不存在小于自己的节点 并且最小节点 与当前创建的节点相同 获得锁 // 未获得成功,对当前次小节点设置watch return (Boolean) retryOperation(zop); }创建锁目录
protected void ensurePathExists(String path) { ensureExists(path, null, acl, CreateMode.PERSISTENT); }protected void ensureExists(final String path, final byte[] data, final List<ACL> acl, final CreateMode flags) { try { retryOperation(new ZooKeeperOperation() { public boolean execute() throws KeeperException, InterruptedException { // 创建锁目录 Stat stat = zookeeper.exists(path, false); // 节点如果存在 直接返回 if (stat != null) { return true; } // 创建节点 // data为null // flags为持久化节点 zookeeper.create(path, data, acl, flags); return true; } }); } catch (KeeperException e) { LOG.warn("Caught: " + e, e); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); } }
创建锁节点,获得锁目录下的所有节点, 如果为最小节点 获得锁成功
/** * the command that is run and retried for actually * obtaining the lock * @return if the command was successful or not */ public boolean execute() throws KeeperException, InterruptedException { do { if (id == null) { long sessionId = zookeeper.getSessionId(); String prefix = "x-" + sessionId + "-"; // lets try look up the current ID if we failed // in the middle of creating the znode findPrefixInChildren(prefix, zookeeper, dir); idName = new ZNodeName(id); } if (id != null) { List<String> names = zookeeper.getChildren(dir, false); if (names.isEmpty()) { LOG.warn("No children in: " + dir + " when we've just " + "created one! Lets recreate it..."); // lets force the recreation of the id id = null; } else { // lets sort them explicitly (though they do seem to come back in order ususally :) SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); for (String name : names) { sortedNames.add(new ZNodeName(dir + "/" + name)); } // 获得最小节点 ownerId = sortedNames.first().getName(); // lock_1, lock_2, lock_3 传入参数lock_2 返回lock_1 SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); if (!lessThanMe.isEmpty()) { ZNodeName lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); if (LOG.isDebugEnabled()) { LOG.debug("watching less than me node: " + lastChildId); } // 次小节点设置watch Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); if (stat != null) { return Boolean.FALSE; } else { LOG.warn("Could not find the" + " stats for less than me: " + lastChildName.getName()); } } else { // 锁目录下的最小节点 与当前客户端创建相同 if (isOwner()) { if (callback != null) { callback.lockAcquired(); } // 获得锁 return Boolean.TRUE; } } } } } while (id == null); return Boolean.FALSE; } };private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) throws KeeperException, InterruptedException { // 获取锁目录下的所有子节点 List<String> names = zookeeper.getChildren(dir, false); for (String name : names) { //x-sessionId- if (name.startsWith(prefix)) { id = name; if (LOG.isDebugEnabled()) { LOG.debug("Found id created last time: " + id); } break; } } // 当前锁目录下 没有与当前会话对应的子节点 创建子节点 节点类型为临时顺序节点 if (id == null) { // dir/x-sessionId-i id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL); if (LOG.isDebugEnabled()) { LOG.debug("Created id: " + id); } }
释放锁:
释放锁非常简单,删除步骤1中创建的有序临时节点。另外,如果客户端进程死亡或连接失效,对应的节点也会被删除。
public synchronized void unlock() throws RuntimeException { if (!isClosed() && id != null) { // we don't need to retry this operation in the case of failure // as ZK will remove ephemeral files and we don't wanna hang // this process when closing if we cannot reconnect to ZK try { ZooKeeperOperation zopdel = new ZooKeeperOperation() { public boolean execute() throws KeeperException, InterruptedException { // 删除节点 忽略版本 zookeeper.delete(id, -1); return Boolean.TRUE; } }; zopdel.execute(); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); //set that we have been interrupted. Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } catch (KeeperException e) { LOG.warn("Caught: " + e, e); throw (RuntimeException) new RuntimeException(e.getMessage()). initCause(e); } finally { if (callback != null) { callback.lockReleased(); } id = null; } } }