Zookeeper基础教程(五):C#实现Zookeeper分布式锁

时间:2021-07-20 06:50:51

  分布式锁 

  互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况。

  为解决服务器压力太大,响应慢的特点,分布式系统部署出现了。

  简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx)

  Zookeeper基础教程(五):C#实现Zookeeper分布式锁

  虽然分布式解决了服务器压力的问题,但也带来了新的问题。

  比如,我们有一个下单统计的功能,当完成下单后,需要执行统计功能,而在高访问的情况下,可能有两个下单请求(A和B)同时完成,然后一起执行了统计功能,这样可能导致的结果就是A请求未将B请求数据统计在内,而B请求可能也未将A请求数据统计在内,这样就造成了数据的统计错,这个问题的产生的根本原因就是统计功能的并发导致的,如果是单点部署的系统,我们简单的使用一个锁操作就能完成了,但是在分布式环境下,A和B请求可能同时运行在两个服务器中,普通的锁就不能起到效果了,这个时候就要使用分布式锁了。

  Zookeeper分布式锁原理

  分布式锁的实现发放有多种,简单的,我们可以使用数据库表去实现它,也可以使用redis去实现它,这里要使用的Zookeeper去实现分布式锁

  Zookeeper分布式锁的原理是巧妙的是使用了znode临时节点的特点和监听(watcher)机制,监听机制很简单,就是我们可以给znode添加一个监听器,当znode节点状态发生改变时(如:数据内容改变,节点被删除),会通知到监听器。

  前面几节介绍过znode有三种类型  

  PERSISTENT:持久节点,即使在创建该特定znode的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有znode都是持久的。
  EPHEMERAL:临时节点,客户端是连接状态时,临时节点就是有效的。当客户端与ZooKeeper集合断开连接时,临时节点会自动删除。临时节点不允许有子节点。临时节点在leader选举中起着重要作用。
  SEQUENTIAL:顺序节点,可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径,顺序节点在锁定和同步中起重要作用。

  其中,顺序节点,可以是持久的或临时的,而临时节点有个特点,就是它属于创建它的那个会话,当会话断开,临时节点就会自动删除,如果在临时节点上注册了监听器,那么监听器就会收到通知,如果临时节点有了时间顺序,那我们为实现分布式锁就又有一个想法:

  假如在Zookeeper中有一个znode节点/Locker

  1、当client1连接Zookeeper时,先判断/Locker节点是否存在子节点,如果没有子节点,那么会在/Locker节点下创建一个临时顺序的znode节点,假如是/client1,表示client1获取了锁状态,client1可以继续执行。

  2、当client2连接Zookeeper时,先判断/Locker节点是否存在子节点,发现已经存在子节点了,然后获取/Locker下的所有子节点,同时按时间顺序排序,在最后一个节点,也就是/client1节点上注册一个监听器(watcher1),同时在/Locker节点下创建一个临时顺序的znode节点,假如是/client2。同时client2将被阻塞,而阻塞状态的释放是在监听器(watcher1)中的。

  3、当client3连接Zookeeper时,先判断/Locker节点是否存在子节点,发现已经存在子节点了,然后获取/Locker下的所有子节点,同时按时间顺序排序,在最后一个节点,也就是/client2节点上注册一个监听器(watcher2),同时在/Locker节点下创建一个临时顺序的znode节点,假如是/client3。同时client2将被阻塞,而阻塞状态的释放是在监听器(watcher2)中的。

  以此类推。

  4、当client1执行完操作了,断开Zookeeper的连接,因为/client1是临时顺序节点,于是将会自动删除,而client2已经往/client1节点中注册了一个监听器(watcher1),于是watcher1将会受到通知,而watcher1又会释放client2的阻塞状态。于是client2获取锁状态,继续执行。

  5、当client2执行完操作了,断开Zookeeper的连接,因为/client2是临时顺序节点,于是将会自动删除,而client3已经往/client2节点中注册了一个监听器(watcher2),于是watcher2将会受到通知,而watcher2又会释放client3的阻塞状态。于是client3获取锁状态,继续执行。

  以此类推。

  这样,不管分布式环境中有几台服务器,都可以保证程序的排队似的执行了。

  C#实现Zookeeper分布式锁

  上一节有封装过一个ZookeeperHelper的辅助类(Zookeeper基础教程(四):C#连接使用Zookeeper),使用这个辅助类实现了一个ZookeeperLocker类:  

  Zookeeper基础教程(五):C#实现Zookeeper分布式锁Zookeeper基础教程(五):C#实现Zookeeper分布式锁
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks; namespace AspNetCore.ZookeeperConsole
{
/// <summary>
/// 基于Zookeeper的分布式锁
/// </summary>
public class ZookeeperLocker : IDisposable
{
/// <summary>
/// 单点锁
/// </summary>
static object locker = new object();
/// <summary>
/// Zookeeper集群地址
/// </summary>
string[] address;
/// <summary>
/// Zookeeper操作辅助类
/// </summary>
ZookeeperHelper zookeeperHelper; /// <summary>
/// 构造函数
/// </summary>
/// <param name="lockerPath">分布式锁的根路径</param>
/// <param name="address">集群地址</param>
public ZookeeperLocker(string lockerPath, params string[] address) : this(lockerPath, 0, address)
{
}
/// <summary>
/// 构造函数
/// </summary>
/// <param name="lockerPath">分布式锁的根路径</param>
/// <param name="sessionTimeout">回话过期时间</param>
/// <param name="address">集群地址</param>
public ZookeeperLocker(string lockerPath, int sessionTimeout, params string[] address)
{
this.address = address.ToArray(); zookeeperHelper = new ZookeeperHelper(address, lockerPath);
if (sessionTimeout > 0)
{
zookeeperHelper.SessionTimeout = sessionTimeout;
}
if (!zookeeperHelper.Connect())
{
throw new Exception("connect failed:" + string.Join(",", address));
}
lock (locker)
{
if (!zookeeperHelper.Exists())//根节点不存在则创建
{
zookeeperHelper.SetData("", "", true);
}
}
}
/// <summary>
/// 生成一个锁
/// </summary>
/// <returns>返回锁名</returns>
public string CreateLock()
{
var path = Guid.NewGuid().ToString().Replace("-", "");
while (zookeeperHelper.Exists(path))
{
path = Guid.NewGuid().ToString().Replace("-", "");
}
return CreateLock(path);
}
/// <summary>
/// 使用指定的路径名称设置锁
/// </summary>
/// <param name="path">锁名,不能包含路径分隔符(/)</param>
/// <returns>返回锁名</returns>
public string CreateLock(string path)
{
if (path.Contains("/"))
{
throw new ArgumentException("invalid path");
}
return zookeeperHelper.SetData(path, "", false, true);
}
/// <summary>
/// 获取锁
/// </summary>
/// <param name="path">锁名</param>
/// <returns>如果获得锁返回true,否则一直等待</returns>
public bool Lock(string path)
{
return LockAsync(path).GetAwaiter().GetResult();
}
/// <summary>
/// 获取锁
/// </summary>
/// <param name="path">锁名</param>
/// <param name="millisecondsTimeout">超时时间,单位:毫秒</param>
/// <returns>如果获得锁返回true,否则等待指定时间后返回false</returns>
public bool Lock(string path, int millisecondsTimeout)
{
return LockAsync(path, millisecondsTimeout).GetAwaiter().GetResult();
}
/// <summary>
/// 异步获取锁等等
/// </summary>
/// <param name="path">锁名</param>
/// <returns>如果获得锁返回true,否则一直等待</returns>
public async Task<bool> LockAsync(string path)
{
return await LockAsync(path, System.Threading.Timeout.Infinite);
}
/// <summary>
/// 异步获取锁等等
/// </summary>
/// <param name="path">锁名</param>
/// <param name="millisecondsTimeout">超时时间,单位:毫秒</param>
/// <returns>如果获得锁返回true,否则等待指定时间后返回false</returns>
public async Task<bool> LockAsync(string path, int millisecondsTimeout)
{
var array = await zookeeperHelper.GetChildrenAsync("", true);
if (array != null && array.Length > 0)
{
var first = array.FirstOrDefault();
if (first == path)//正好是优先级最高的,则获得锁
{
return true;
} var index = array.ToList().IndexOf(path);
if (index > 0)
{
//否则添加监听
var are = new AutoResetEvent(false);
var watcher = new NodeWatcher();
watcher.NodeDeleted += (ze) =>
{
are.Set();
};
if (await zookeeperHelper.WatchAsync(array[index - 1], watcher))//监听顺序节点中的前一个节点
{
if (!are.WaitOne(millisecondsTimeout))
{
return false;
}
} are.Dispose();
}
else
{
throw new InvalidOperationException($"no locker found in path:{zookeeperHelper.CurrentPath}");
}
}
return true;
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
zookeeperHelper.Dispose();
}
}
}

  现在写个程序可以模拟一下  

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading; namespace AspNetCore.ZookeeperConsole
{
class Program
{
static void Main(string[] args)
{
//Zookeeper连接字符串,采用host:port格式,多个地址之间使用逗号(,)隔开
string[] address = new string[] { "192.168.209.133:2181", "192.168.209.133:2181", "192.168.209.133:2181" };
//会话超时时间,单位毫秒
int sessionTimeOut = 10000;
//锁节点根路径
string lockerPath = "/Locker"; for (var i = 0; i < 10; i++)
{
string client = "client" + i;
//多线程模拟并发
new Thread(() =>
{
using (ZookeeperLocker zookeeperLocker = new ZookeeperLocker(lockerPath, sessionTimeOut, address))
{
string path = zookeeperLocker.CreateLock();
if (zookeeperLocker.Lock(path))
{
//模拟处理过程
Console.WriteLine($"【{client}】获得锁:{DateTime.Now}");
Thread.Sleep(3000);
Console.WriteLine($"【{client}】处理完成:{DateTime.Now}");
}
else
{
Console.WriteLine($"【{client}】获得锁失败:{DateTime.Now}");
}
}
}).Start();
} Console.ReadKey();
}
}
}

  运行结果如下:

  Zookeeper基础教程(五):C#实现Zookeeper分布式锁

  可以发现,锁功能是实现了的

  如果程序运行中打印日志:Client session timed out, have not heard from server in 8853ms for sessionid 0x1000000ec5500b2

  或者直接抛出异常:org.apache.zookeeper.KeeperException.ConnectionLossException:“Exception_WasThrown”

  只需要适当调整sessionTimeOut时间即可

Zookeeper基础教程(五):C#实现Zookeeper分布式锁的更多相关文章

  1. Zookeeper基础教程(六):&period;net core使用Zookeeper

    Demo代码已提交到gitee,感兴趣的更有可以直接克隆使用,地址:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Zookeep ...

  2. Java基础教程:多线程杂谈——双重检查锁与Volatile

    Java基础教程:多线程杂谈——双重检查锁与Volatile 双重检查锁 有时候可能需要推迟一些高开销的对象初始化操作,并且只有在使用这些对象时才进行初始化.此时程序员可能会采用延迟初始化.但要正确实 ...

  3. zookeeper基础教程

    一.关于zookeeper Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储, Zookeeper 作用 ...

  4. Zookeeper基础教程(二):Zookeeper安装

    上一篇说了,一个Zookeeper集群一般认为至少需要3个节点,所以我们这里安装需要准备三台虚拟机: # 192.168.209.133 test1 # 192.168.209.134 test2 # ...

  5. Zookeeper基础教程(一):认识Zookeeper

    引用百度百科的话 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它是一个为分布式应用提供一致性服 ...

  6. ZooKeeper实践方案:(7) 分布式锁

    1.基本介绍 分布式锁是控制分布式系统之间同步訪问共享资源的一种方式,须要相互排斥来防止彼此干扰来保证一致性. 利用Zookeeper的强一致性能够完毕锁服务.Zookeeper的官方文档是列举了两种 ...

  7. ZooKeeper连接并创建节点以及实现分布式锁操作节点排序输出最小节点Demo

    class LockThread implements Runnable { private DistributedLock lock; public LockThread(int threadId, ...

  8. zookeeper应用:屏障、队列、分布式锁

    zookeeper工具类: 获取连接实例:创建节点:获取子节点:设置节点数据:获取节点数据:访问控制等. package org.windwant.zookeeper; import org.apac ...

  9. SpringCloud2&period;0 Feign 服务发现 基础教程&lpar;五&rpar;

    1.启动[服务中心]集群,即 Eureka Server 参考 SpringCloud2.0 Eureka Server 服务中心 基础教程(二) 2.启动[服务提供者]集群,即 Eureka Cli ...

随机推荐

  1. 对想进入Unity开发新人的一些建议

    提前声明:本文只是写给那些非职业游戏开发人士,只面向那些在校本科生,或已就业但无unity背景的同学们,当然是面对程序员方向的.本人刚工作也没多久,资历尚浅,之前在网上有一位同学让我谈谈一些想法,所以 ...

  2. IIS配置默认文档

    我们在配置IIS的默认文档时是在这里配置的,如下图: 但是,有可能我们的根目录下没有这个文件,而且我们网站运行的时候也不想访问根目录下的这个文件,而是要访问其他文件夹下的某一个文件,比如网站运行的时候 ...

  3. 让浏览器屏蔽js

    有时候为了测试js是否做到了渐进增强,需要屏蔽下js 做法: Internet选项>>安全>>自定义级别 禁用java小程序脚本和活动脚本 有的浏览器调试器直接就有这个功能

  4. SaaS系列介绍之七&colon; SaaS模式分析&lpar;下&rpar;

    1 SaaS模式下的质量管理 质量管理是从事SaaS事业的企业管理的重要课题,质量管理的职能是质量方针.质量目标和质量指标的制定和贯彻实施,中心目标是促进产品质量.提高客户满意度. 软件质量要素包含以 ...

  5. postInvalidate、removeAllViewsInLayout、refreshDrawableState用法

    postInvalidate.invalidate:会调用控件的onDraw()重绘控件 refreshDrawableState:当控件在使用一个对控件状态敏感的Drawable对象时使用,如一个B ...

  6. 多行滚动jQuery循环新闻列表代码

    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/ ...

  7. LA 4726 再看斜率优化

    感觉最近一批解题报告没写,现在慢慢补吧,算是noip前攒攒rp了 首先感到深深的自责,因为之前对斜率优化没有深入的理解,只是记住了一般步骤,并没有完全了解为什么这样做 先就这道题目而言 首先这种序列题 ...

  8. 201771010134杨其菊《面向对象程序设计(java)》第十七周学习总结

    第十七周学习总结 1. 程序是一段静态的代码,它是应用程序执行的蓝本.进程是程序的一次动态执行,它对应了从代码加载.执行至执行完毕的一个完整过程.操作系统为每个进程分配一段独立的内存空间和系统资源,包 ...

  9. CentOS 7 yum安装zabbix 设置中文界面

    1.  配置安装前环境 2.  安装zabbix 3.  设置中文环境 准备搭建环境 : 系统:CentOS7.5 首先关闭SElinux 和防火墙 安装MariaDB数据库 [root@DaMoWa ...

  10. 山西WebGIS项目总结

    有一段时间没写blog了,说实话,最近的心态一直在变化,看了一部日剧,回想了这一年所学所见,感觉生活目标变了. 做国土项目这段时间不是很忙,由于数据一直给不到位,时间拖得很久,所以在这期间也在继续学习 ...