ZooKeeper分布式配置,看这篇就够了

时间:2021-12-27 04:11:46

ZooKeeper分布式配置,看这篇就够了

ZooKeeper 的由来

PS:这个不重要,不感兴趣的,可以直接看下面

来源:《从 Paxos 到 ZooKeeper 》

ZooKeeper 最早起源于雅虎研究院的一个研究小组,在当时,研究人员发现,在雅虎内部有很多的大型系统基本上都需要依赖一个类似的系统来进行分布式协调,但是这些系统往往都存在分布式单点的问题,所有雅虎的开发人员就尝试开发了一个通用的无单点问题的分布式协调框架,以便让开发人员将精力集中在处理业务逻辑上。关于"ZooKeeper"这个项目的名字。也有一个故事,在项目开始初期,因为考虑到内部的很多项目都是用动物的名字来命名的(例如:Pig项目),所以雅虎的工程师也希望给这个项目也取一个动物的名字,这个时候担任研究院的首席科学家 Raghu Ramakrishnan 开玩笑地说:“在这样下去,我们这儿就变成动物园”,此话一出,大家纷纷表示就叫动物园管理员吧,因为各个以动物命名的分布式组件放在一起,这个分布式系统看上去就像一个大型的动物园了,而ZooKeeper 正好要用来进行分布式环境的协调,于是ZooKeeper 的名字也就由此诞生了。

分布式配置中心

在上一期中我们讲解了 ZooKeeper集群的配置和安装,ZooKeeper集群 主要是帮我们做分布式协调的,今天我们用ZK实现分布式配置。关于ZooKeeper 集群的配置大家可以参考上一篇文章《Zookeeper 集群部署的那些事儿》。

为什么需要分布式配置中心

对于刚开始的时候,很多公司的服务器可能是由单个组成,但是随着业务的发展,单一节点的服务无法满足业务的飞速发展,后面就出现了分布式、集群的概念,到了现在形成的微服务,技术的改进能够更好的满足业务的需要。

假设我们线上有很多个微服务分布在不同的服务器上,其中一个微服务,我们就叫它 goods-service,当 goods-service的IP地址需要变更的时候,但是 goods-service又对很多其他的程序提供了服务,这个时候如果没有一个统一配置的东西,每一个应用到 goods-service的应用程序都要做相应的IP地址修改,这是一个很麻烦的事情!

如果使用ZooKeeper来做分布式配置的话,是可以解决这个问题的。

ZooKeeper分布式配置,看这篇就够了

注册中心对比

ZooKeeper分布式配置,看这篇就够了

如果我们只考虑服务治理的话,Eureka是比较合适的,Eureka是比较纯粹的注册中心了,和Eureka不同Apache ZooKeeper 在设计的时候就遵循 CP原则,任何时候对 ZooKeeper 的访问请求都能得到一致的数据结果,同事系统对网络分割具有容错性,今天我们讲解的就是关于ZooKeeper 的注册发现。

配置中心的核心

低延迟: 配置改变( create/update/delete)后能够最快的把最新的配置同步到其他节点中

高可用: 配置中心可以稳定的对外提供服务

其中 低延迟 我们可以通过 ZooKeeper 的 Watcher 机制来实现(等下会讲到Watcher机制)。约定一个节点用来存放配置信息,每个客户端都来监听这个节点的NodeDataChanged事件,当配置发生改变时将最新的配置更新到这个节点上,谁更新无所谓,任何节点都可以更新,当这个节点触发 NodeDataChanged 事件后,在去通知所有监听这个节点的客户端去获取这个节点的最新信息,因为watcher 是一次性的,所以当我们在获取最新信息的时候需要设置监听事件,大部分查询信息都是具有原子性的,所以ZooKeeper中的 getData 也是具有原子性操作,能够保证我们取得的信息是最新的。

对于 高可用 我们首先需要保证的多集群操作来进行ZooKeeper进行部署,在代码层不太需要做过多的工作。

ZooKeeper分布式配置,看这篇就够了

Watch 机制

Watch 是 ZooKeeper 针对节点的一次性观察者机制,就如同我们上面 * 低延迟* 中讲到的,一次触发后就失效,需要手工重新创建Watch。

当Watch监视的数据发生变化的时候,就会通知设置了 Watch 的客户端,就是我们API中的Watcher,Watcher机制就是为了监听Znode节点发生了哪些变化,所以会有对应的事件类型和状态类型,用过代码中switch进行监听,一个客户端可以链接多个节点,只要Znode节点发生变化就会执行 process(WatchedEventevent)。

如下图所示:

ZooKeeper分布式配置,看这篇就够了

从上图中我们可以看到,在ZooKeeper中,Watch采用的是推送机制,而不是客户端轮询,有些中间件采用的是拉取的模式,例如:KafKa。

Watch有两种监听模式,分别为 事件类型和状态类型 :

事件类型:Znode 节点关联,主要是针对节点的操作

  • 创建节点:EventType.NodeCreated
  • 节点数据发生变化:EventType.NodeDataChanged
  • 当前节点的子节点发生变化:EventType.NodeChildrenChanged
  • 删除节点:EventType.NodeDeleted

状态类型:客户端关联,主要是针对于ZooKeeper集群和应用服务之间的状态的变更

  • 未连接:KeeperState.Disconnected
  • 已连接:KeeperState.SyncConnected
  • 认证失败:KeeperState.AuthFailed
  • 过期:KeeperState.Expired
  • 客户端连接到只读服务器:KeeperState.ConnectedReadOnly

watch的特性

一次性触发: 对于ZooKeeper的Watcher事件,是一次性触发的,当 Watch 监视的数据发生变化的时候,通知设置当前Watch 的 Client,就是我们对应的 Watcher,因为ZooKeeper 的监控都是一次性的,所以我们需要在每次触发后设置监控。

客户端串行执行: 客户端Watcher回调的过程是一个串行同步的过程,可以为我们保证顺序的执行。

轻量级: WatchedEvent是ZooKeeper整个Watcher通知机制的最小通知单元,总共包含三个部分(通知状态、事件类型和节点路径),Watcher通知,只会告诉客户端发生事件而不会告知具体内容,需要客户端主动去进行获取,比如 当监听到 WatchedEvent.NodeDataChanged 信息变化的时候,只会告诉我们这个节点的数据发生了变更,你快来获取最新的值吧。

客户端设置的每个监视点与会话关联,如果会话过期,等待中的监视点将会被删除。不过监视点可以跨越不同服务端的连接而保持,例如,当一个ZooKeeper客户端与一个ZooKeeper服务端的连接断开后连接到集合中的另一个服务端,客户端会发送未触发的监视点列表,在注册监视点时,服务端将要检查已监视的znode节点在之前注册监视点之后是否已经变化,如果znode节点已经发生变化,一个监视点的事件就会被发送给客户端,否则在新的服务端上注册监视点。这一机制使得我们可以关心逻辑层会话,而非底层连接本身。

客户端注册

ZooKeeper分布式配置,看这篇就够了

ZooKeeper 注册的时候会向ZooKeeper 服务端请求注册,服务端会返回请求响应,不管成功失败,都会返回响应结果,当响应成功的时候,ZooKeeper服务端会把Watcher对象放到客户端的WatchManager管理并返回响应给客户端

服务端注册

ZooKeeper分布式配置,看这篇就够了

FinalRequestProcessor.ProcessRequest()会判断当前请求是否需要注册Watcher

如果ZooKeeper判断当前客户端需要进行Watcher注册,会将当前的ServerCnxn 对象和数据路径传入 getData 方法中去。ServerCnxn 是ZooKeeper 客户端和服务端之间的连接接口,代表了一个客户端和服务端的连接,可以将 ServerCnxn 当做一个 Watcher 对象,因为它实现了 Watcher 的 process 接口。

WatcherManager

WatcherManager是 ZK服务端 Watcher 的管理器,分为 WatchTable 和 Watch2Paths 两个存储结构,这两个是不同的存储结构 1)WatchTable:从数据节点路径的颗粒度管理 Watcher 2)Watch2Paths:从Watcher的颗粒度来控制时间出发的数据节点

在服务端,DataTree 中会托管两个 WatchManager, 分别是 dataWatches (数据变更Watch) 和 childWatches(子节点变更Watch)。

Watcher 触发逻辑

1)封装WatchedEven:将(KeeperState(通知状态),EventType(事件类型),Path(节点路径))封装成一个 WatchedEvent 对象 2)查询Watcher:根据路径取出对应的Watcher,如果存在,取出数据同时从 WatcherManager(WatchTable/Watch2Paths) 中删除 3)调用Process方法触发Watcher

4.客户端回调 Watcher

1)反序列化:字节流转换成 WatcherEvent 对象 2)处理 chrootPath:如果客户端设置了 chrootPath 属性,那么需要对服务器传过来的完整节点路径进行 chrootPath 处理,生成客户端的一个相对节点路径。比如(/mxn/app/love,经过chrootPath处理,会变成 /love) 3)还原 WatchedEvent:WatcherEvent 转换成 WatchedEvent 4)回调 Watcher:将 WatcherEvent 对象交给 EventThread 线程,在下一个轮询周期中进行 Watcher 回调

EventThread 处理时间通知

1) SendThread 接收到服务端的通知事件后,会通过调用 EventThread.queueEvent 方法将事件传给 EventThread 线程 2)queueEvent 方法首先会根据该通知事件,从 ZKWatchManager 中取出所有相关的 Watcher 客户端识别出 事件类型 EventType 后,会从相应的 Watcher 存储 (即3个注册方法( dataWatches、existWatcher 或 childWatcher)中去除对应的 Watcher 3) 获取到相关的所有 Watcher 后,会将其放入 waitingEvents 这个队列去

代码实现

下面我们就来演示如何使用代码来实现ZooKeeper的配置

首先我们需要引入ZK的jar

  1. org.apache.zookeeper
  2. zookeeper
  3. 3.6.3

配置类

既然我们要做的是分布式配置,首先我们需要模拟一个配置,这个配置用来同步服务的地址

  1. /**
  2. *@program:mxnzookeeper
  3. *@ClassNameMyConf
  4. *@description:配置类
  5. *@author:muxiaonong
  6. *@create:2021-10-1922:18
  7. *@Version1.0
  8. **/
  9. publicclassMyConfig{
  10. privateStringconf;
  11. publicStringgetConf(){
  12. returnconf;
  13. }
  14. publicvoidsetConf(Stringconf){
  15. this.conf=conf;
  16. }
  17. }

Watcher

创建ZooKeeper的时候,我们需要一个Watcher进行监听,后续对Znode节点操作的时候,我们也需要使用到Watcher,但是这两类的功能不一样,所以我们需要定义一个自己的watcher类,如下所示:

  1. importorg.apache.zookeeper.WatchedEvent;
  2. importorg.apache.zookeeper.Watcher;
  3. importjava.util.concurrent.CountDownLatch;
  4. /**
  5. *@program:mxnzookeeper
  6. *@ClassNameDefaultWatch
  7. *@description:
  8. *@author:muxiaonong
  9. *@create:2021-10-1922:02
  10. *@Version1.0
  11. **/
  12. publicclassDefaultWatchimplementsWatcher{
  13. CountDownLatchcc;
  14. publicvoidsetCc(CountDownLatchcc){
  15. this.cc=cc;
  16. }
  17. @Override
  18. publicvoidprocess(WatchedEventevent){
  19. System.out.println(event.toString());
  20. switch(event.getState()){
  21. caseUnknown:
  22. break;
  23. caseDisconnected:
  24. break;
  25. caseNoSyncConnected:
  26. break;
  27. caseSyncConnected:
  28. System.out.println("连接成功。。。。。");
  29. //连接成功后,执行countDown,此时便可以拿zk对象使用了
  30. cc.countDown();
  31. break;
  32. caseAuthFailed:
  33. break;
  34. caseConnectedReadOnly:
  35. break;
  36. caseSaslAuthenticated:
  37. break;
  38. caseExpired:
  39. break;
  40. caseClosed:
  41. break;
  42. }
  43. }
  44. }

由于是异步进行操作的,我们创建一个ZooKeeper对象之后,如果不进行阻塞操作的话,有可能还没有连接完成就执行后续的操作,所以这里我们用 CountDownLatch进行阻塞操作,当监测连接成功后,进行 countDown放行,执行后续的ZK的动作。

当我们连接成功 ZooKeeper 之后,我们需要通过 exists判断是否存在节点,存在就进行 getData操作。这里我们创建一个 WatchCallBack因为 exists和getData都需要一个callback,所以除了实现Watcher以外还需要实现 节点状态:AsyncCallback.StatCallback数据监听:AsyncCallback.DataCallback

  1. importorg.apache.zookeeper.AsyncCallback;
  2. importorg.apache.zookeeper.WatchedEvent;
  3. importorg.apache.zookeeper.Watcher;
  4. importorg.apache.zookeeper.ZooKeeper;
  5. importorg.apache.zookeeper.data.Stat;
  6. importjava.util.concurrent.CountDownLatch;
  7. /**
  8. *@program:mxnzookeeper
  9. *@ClassNameWatchCallBack
  10. *@description:
  11. *@author:muxiaonong
  12. *@create:2021-10-1922:13
  13. *@Version1.0
  14. **/
  15. publicclassWatchCallBackimplementsWatcher,AsyncCallback.StatCallback,AsyncCallback.DataCallback{
  16. ZooKeeperzk;
  17. MyConfigconf;
  18. CountDownLatchcc=newCountDownLatch(1);
  19. publicMyConfiggetConf(){
  20. returnconf;
  21. }
  22. publicvoidsetConf(MyConfigconf){
  23. this.conf=conf;
  24. }
  25. publicZooKeepergetZk(){
  26. returnzk;
  27. }
  28. publicvoidsetZk(ZooKeeperzk){
  29. this.zk=zk;
  30. }
  31. publicvoidaWait(){
  32. //exists的异步实现版本
  33. zk.exists(ZKConstants.ZK_NODE,this,this,"existswatch");
  34. try{
  35. cc.await();
  36. }catch(InterruptedExceptione){
  37. e.printStackTrace();
  38. }
  39. }
  40. /**@Authormxn
  41. *@Description//TODO此回调用于检索节点的stat
  42. *@Date21:242021/10/20
  43. *@paramrc调用返回的code或结果
  44. *@parampath传递给异步调用的路径
  45. *@paramctx传递给异步调用的上下文对象
  46. *@paramstat指定路径上节点的Stat对象
  47. *@return
  48. **/
  49. @Override
  50. publicvoidprocessResult(intrc,Stringpath,Objectctx,Statstat){
  51. if(stat!=null){
  52. //getData的异步实现版本
  53. zk.getData(ZKConstants.ZK_NODE,this,this,"status");
  54. }
  55. }
  56. /**@Authormxn
  57. *@Description//TODO此回调用于检索节点的数据和stat
  58. *@Date21:232021/10/20
  59. *@paramrc调用返回的code或结果
  60. *@parampath传递给异步调用的路径
  61. *@paramctx传递给异步调用的上下文对象
  62. *@paramdata节点的数据
  63. *@paramstat指定节点的Stat对象
  64. *@return
  65. **/
  66. @Override
  67. publicvoidprocessResult(intrc,Stringpath,Objectctx,byte[]data,Statstat){
  68. if(data!=null){
  69. Strings=newString(data);
  70. conf.setConf(s);
  71. cc.countDown();
  72. }
  73. }
  74. /**@Authormxn
  75. *@Description//TODOWatcher接口的实现。
  76. *Watcher接口指定事件处理程序类必须实现的公共接口。
  77. *ZooKeeper客户机将从它连接到的ZooKeeper服务器获取各种事件。
  78. *使用这种客户机的应用程序通过向客户机注册回调对象来处理这些事件。
  79. *回调对象应该是实现监视器接口的类的实例。
  80. *@Date21:242021/10/20
  81. *@ParamwatchedEventWatchedEvent表示监视者能够响应的ZooKeeper上的更改。
  82. *WatchedEvent包含发生了什么,
  83. *ZooKeeper的当前状态,以及事件中涉及的znode的路径。
  84. *@return
  85. **/
  86. @Override
  87. publicvoidprocess(WatchedEventevent){
  88. switch(event.getType()){
  89. caseNone:
  90. break;
  91. caseNodeCreated:
  92. //当一个node被创建后,获取node
  93. //getData中又会触发StatCallback的回调processResult
  94. zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");
  95. break;
  96. caseNodeDeleted:
  97. //节点删除
  98. conf.setConf("");
  99. //重新开启CountDownLatch
  100. cc=newCountDownLatch(1);
  101. break;
  102. caseNodeDataChanged:
  103. //节点数据被改变了
  104. //触发DataCallback的回调
  105. zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");
  106. break;
  107. //子节点发生变化的时候
  108. caseNodeChildrenChanged:
  109. break;
  110. }
  111. }
  112. }

当前面准备好了之后,我们可以编写测试用例了:

ZKUtils 工具类

  1. importorg.apache.zookeeper.ZooKeeper;
  2. importjava.util.concurrent.CountDownLatch;
  3. /**
  4. *@program:mxnzookeeper
  5. *@ClassNameZKUtils
  6. *@description:
  7. *@author:muxiaonong
  8. *@create:2021-10-1921:59
  9. *@Version1.0
  10. **/
  11. publicclassZKUtils{
  12. privatestaticZooKeeperzk;
  13. //192.168.5.130:2181/mxn这个后面/mxn,表示客户端如果成功建立了到zk集群的连接,
  14. //那么默认该客户端工作的根path就是/mxn,如果不带/mxn,默认根path是/
  15. //当然我们要保证/mxn这个节点在ZK上是存在的
  16. privatestaticStringaddress="192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn";
  17. privatestaticDefaultWatchwatch=newDefaultWatch();
  18. privatestaticCountDownLatchinit=newCountDownLatch(1);
  19. publicstaticZooKeepergetZK(){
  20. try{
  21. //因为是异步的,所以要await,等到连接上zk集群之后再进行后续操作
  22. zk=newZooKeeper(address,1000,watch);
  23. watch.setCc(init);
  24. init.await();
  25. }catch(Exceptione){
  26. e.printStackTrace();
  27. }
  28. returnzk;
  29. }
  30. }

测试类:

  1. importorg.apache.zookeeper.ZooKeeper;
  2. importorg.junit.Before;
  3. importorg.junit.Test;
  4. /**
  5. *@program:mxnzookeeper
  6. *@ClassNameTestConfig
  7. *@description:
  8. *@author:muxiaonong
  9. *@create:2021-10-1922:04
  10. *@Version1.0
  11. **/
  12. publicclassTestConfig{
  13. ZooKeeperzk;
  14. @Before
  15. publicvoidconn(){
  16. zk=ZKUtils.getZK();
  17. }
  18. /**@Authormxn
  19. *@Description//TODO关闭ZK
  20. *@Date21:162021/10/20
  21. *@Param
  22. *@return
  23. **/
  24. publicvoidclose(){
  25. try{
  26. zk.close();
  27. }catch(Exceptione){
  28. e.printStackTrace();
  29. }
  30. }
  31. @Test
  32. publicvoidgetConf(){
  33. WatchCallBackwatchCallBack=newWatchCallBack();
  34. watchCallBack.setZk(zk);
  35. MyConfigmyConfig=newMyConfig();
  36. watchCallBack.setConf(myConfig);
  37. //阻塞等待
  38. watchCallBack.aWait();
  39. while(true){
  40. if(myConfig.getConf().equals("")){
  41. System.out.println("zknode节点丢失了......");
  42. watchCallBack.aWait();
  43. }else{
  44. System.out.println(myConfig.getConf());
  45. }
  46. //
  47. try{
  48. //每隔500毫秒打印一次
  49. Thread.sleep(500);
  50. }catch(InterruptedExceptione){
  51. e.printStackTrace();
  52. }
  53. }
  54. }

运行测试

首先我们要知道,因为我们连接IP的时候加上了 /mxn这个目录结构,所以我们在服务器初始状态就必须要有这个节点:

集群初始状态:

  1. [zk:localhost:2181(CONNECTED)7]ls/
  2. [mxn,zookeeper]

我们启动程序看看

连接成功

ZooKeeper分布式配置,看这篇就够了

ZooKeeper 下 /mxn 现在也是空

  1. [zk:localhost:2181(CONNECTED)9]ls/mxn
  2. []
  3. [zk:localhost:2181(CONNECTED)10]

现在我们来创建一个 /mxn/myZNode节点数据

  1. [zk:localhost:2181(CONNECTED)10]create/mxn/myZNode"muxiaonong666"
  2. Created/mxn/myZNode

可以看到,创建完成之后,程序马上给出响应,打印出了我配置的值,muxiaonong666

ZooKeeper分布式配置,看这篇就够了

此时,再设置 /mxn/myZNode的值为 muxiaonong6969

ZooKeeper分布式配置,看这篇就够了

啪,很快啊!!!我们就可以看到值瞬间改变了

这个时候我们如果删除 /mxn/myZNode节点,会发生什么呢,前面我们已经写了watch,如果Znode被删除了,,watch and callback执行

  1. caseNodeDeleted:
  2. //节点删除
  3. conf.setConf("");
  4. //重新开启CountDownLatch
  5. cc=newCountDownLatch(1);
  6. break;
  7. if(myConfig.getConf().equals("")){
  8. System.out.println("zknode节点丢失了......");
  9. ////此时应该阻塞住,等待着node重新创建
  10. watchCallBack.aWait();
  11. }

删除 /mxn/myZNode 节点

  1. delete/mxn/myZNode

我们可以看到前面还在打印数据,后面就提示丢失。

ZooKeeper分布式配置,看这篇就够了

但是这个时候我们客户端没有关闭,而是还在等待数据的更新,如果这个时候当重新进行创建 /mxn/myZNode节点的时候,程序又会继续疯狂输出。

  1. create/mxn/myZNode"muxiaonong666"

ZooKeeper分布式配置,看这篇就够了

程序正常运行,并且成功获取到了zk配置的最新数据,到这里基本上就实现了,ZooKeeper的分布式配置中心功能了

在这里我测试用的是 getData,但是在项目实战用可能用的更多的是 子节点的操作 getChildren

总结

到这里我们这篇 ZooKeeper分布式配置注册发现 就讲完了,如果有疑问的地方欢迎进行讨论,ZooKeeper 可以作为分布式配置中心,也可以用来当然微服务的注册,不过现在微服务都有自己的一套服务发现,对于了解ZooKeeper可以我们方便我们在进行技术选型的时候更好的去抉择, ZooKeeper 的高可用和最终一致性也是比较稳定。

本文代码地址:https://github.com/muxiaonong/ZooKeeper/tree/master/mxnzookeeper

原文链接:https://mp.weixin.qq.com/s/FD8q1CIZDxWheRmXeRbl3A