服务注册中心之ZooKeeper系列(三) 实现分布式锁

时间:2023-03-09 02:56:08
服务注册中心之ZooKeeper系列(三) 实现分布式锁

  通过ZooKeeper的有序节点、节点路径不回重复、还有节点删除会触发Wathcer事件的这些特性,我们可以实现分布式锁。

一、思路

  1. zookeeper中创建一个根节点Locks,用于后续各个客户端的锁操作。
  2. 当要获取锁的时候,在Locks节点下创建“Lock_序号”的零时有序节点(临时节点为了客户端突发断开连接,则此节点消失)。
  3. 如果没有得到锁,就监控排在自己前面的序号节点,等待它的释放。
  4. 当前面的锁被释放后,触发Process方法,然后继续获取当前子节点,判断当前节点是不是第一个,是 返回锁,否 获取锁失败。

二、实现

  在实现是要了解一个类 AutoResetEvent。AutoResetEvent 常常被用来在两个线程之间进行信号发送。它有两个重要的方法:

  Set() :发送信号到等待线程以继续其工作。

  bool WaitOne():等待另一个线程发信号,只有收到信号,线程才继续往下执行 ,会一直等待下去,返回值表示是否收到信号。

  bool WaitOne(int millisecondsTimeout):等待指定时间,如果没有收到信号继续执行,返回值表示是否收到信号。

下面为具体实现方法:

public class ZooKeeperLock
{
private MyWatcher myWatcher; private string lockNode; private org.apache.zookeeper.ZooKeeper zooKeeper; public ZooKeeperLock()
{
myWatcher = new MyWatcher();
} /// <summary>
/// 获取锁
/// </summary>
/// <param name="millisecondsTimeout">等待时间</param>
/// <returns></returns>
public async Task<bool> TryLock(int millisecondsTimeout = )
{
try
{
zooKeeper = new org.apache.zookeeper.ZooKeeper("127.0.0.1", , new MyWatcher()); //创建锁节点
if (await zooKeeper.existsAsync("/Locks") == null)
await zooKeeper.createAsync("/Locks", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //新建一个临时锁节点
lockNode = await zooKeeper.createAsync("/Locks/Lock_", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //获取锁下所有节点
var lockNodes = await zooKeeper.getChildrenAsync("/Locks"); lockNodes.Children.Sort(); //判断如果创建的节点就是最小节点 返回锁
if (lockNode.Split("/").Last() == lockNodes.Children[])
return true;
else
{
//当前节点的位置
var location = lockNodes.Children.FindIndex(n => n == lockNode.Split("/").Last());
//获取当前节点 前面一个节点的路径
var frontNodePath = lockNodes.Children[location - ];
//在前面一个节点上加上Watcher ,当前面那个节点删除时,会触发Process方法
await zooKeeper.getDataAsync("/Locks/" + frontNodePath, myWatcher); //如果时间为0 一直等待下去
if (millisecondsTimeout == )
myWatcher.AutoResetEvent.WaitOne();
else //如果时间不为0 等待指定时间后,返回结果
{
var result = myWatcher.AutoResetEvent.WaitOne(millisecondsTimeout); if (result)//如果返回True,说明在指定时间内,前面的节点释放了锁(但是可能是中间节点主机宕机 导致,所以不一定触发了Process方法就是得到了锁。需要重新判断是不是第一个节点)
{
//获取锁下所有节点
lockNodes = await zooKeeper.getChildrenAsync("/Locks");
//判断如果创建的节点就是最小节点 返回锁
if (lockNode.Split("/").Last() == lockNodes.Children[])
return true;
else
return false;
}
else
return false; }
}
}
catch (KeeperException e)
{
await UnLock();
throw e;
}
return false;
} /// <summary>
/// 释放锁
/// </summary>
/// <returns></returns>
public async Task UnLock()
{
try
{
myWatcher.AutoResetEvent.Dispose();await zooKeeper.deleteAsync(lockNode);
}
catch (KeeperException e)
{
throw e;
}
} }

Process方法实现:

 public class MyWatcher : Watcher
{
public AutoResetEvent AutoResetEvent; public MyWatcher()
{
this.AutoResetEvent = new AutoResetEvent(false);
} public override Task process(WatchedEvent @event)
{
if (@event.get_Type() == EventType.NodeDeleted)
{
AutoResetEvent.Set();
}
return null;
}
}

本文源代码在:分布式实现代码

如果你认为文章写的不错,就点个【推荐】吧