Zookeeper分布式服务协调组件

时间:2022-09-07 09:12:02

1.简介

Zookeeper分布式服务协调组件

Zookeeper是一个分布式服务协调组件,是Hadoop、Hbase、Kafka重要的依赖组件,它是一个为分布式应用提供一致性服务的组件。

Zookeeper的目标就是封装好复杂易出错的服务,为使用者提供高效、稳定的服务。

 
 

Zookeeper的使用场景:

1.Hadoop、Hbase、Kafka的依赖组件。

2.作为注册中心,用于维护服务列表。

3.作为项目的配置中心,将一些重要的可以动态配置的信息放入zk中,利用zk的通知机制,当状态发生改变时通知客户端,将其变量放入静态变量中。

4.作为本地缓存的更新策略,当信息更新时更新节点的值,利用zk的通知机制,各个应用获取到节点的值,再从数据库中查询,然后更新到本地缓存。

5.中间件的高可用性。

 
 

2.模型

2.1 Zookeeper的文件系统

Zookeeper分布式服务协调组件

Zookeeper维护了一个类似文件系统的数据结构,有根目录 (/) 和若干个子目录 (树形结构 , 与Linux类似 )

*每个目录都称为一个znode,每个znode都包含自身节点的数据且每个znode下可以包含多个znode。

*在创建znode时必须指定znode的数据,可以为null。

*znode下的数据有版本号,当进行更新操作时版本号会+1。

*当使用JAVA进行更新和删除操作时,需要传递版本号,其内部进行CAS判断,当且仅当传递的版本号与当前节点的版本号相同时,才进行操作(并非每个znode下包含多个版本的数据,传递版本号只是用来做CAS,默认值为-1,表示不进行CAS判断)

*删除znode时,若该znode包含子znode,那么必须先删除所有的子znode,否则无法删除。

znode类型

持久化节点:无论客户端的连接是否断开,该节点依然存在。

持久化节点并顺序编号:在持久化节点的基础上,Zookeeper动态的为znode进行自动编号,即创建/p节点,那么Zookeeper将把其节点命名为/p1,当再次创建/p节点,那么Zookeeper将把该节点命名为/p2。

临时节点:当客户端的连接断开,临时节点及其节点中的数据将会被删除。

临时节点并顺序编号:在临时节点的基础上,Zookeeper动态的为znode进行自动编号,与持久化节点并顺序编号的区别是,临时节点并顺序编号会在客户端断开连接之后会自动删除节点以及节点中的数据。

2.2 Zookeeper的通知机制

客户端可以监听它关心的节点,当目标节点发生变化时 (数据版本号改变、被删除、子目录节点增加和删除),Zookeeper会通知客户端,客户端再作出相应的处理。

*zk并不是根据节点的值改变而触发通知的,而是根据节点中数据的版本号。

*当节点中的值重复时,但由于数据的版本号发生改变,因此仍然可以通过通知机制通知客户端。

2.3 Zookeeper集群的一致性同步

Zookeeper分布式服务协调组件

Zookeeper一般是通过集群的方式进行使用,即多台Zookeeper服务构成一个有关系的组。

当搭建了一个Zookeeper集群,Zookeeper会根据选举算法,从多个Zookeeper节点中选取一个作为Leader,剩余的节点作为Follower,Leader会与各个Follower建立一个有效的长连接,保证各个节点的通信正常 (每台服务器都有可能被选取为Leader)

当Zookeeper集群搭建完成后,就可以启动很多个客户端与zk节点进行连接 (长连接方式,保证客户端与服务器能有效持久的连接)

当某个节点收到修改的操作时,首先会把请求转发给Leader,Leader内有处理机制,它会操作修改并且同步操作给所有的Follower节点。

*一旦选取的Leader节点宕机,则会重新组织Zookeeper集群,选取新的Leader,重新与各个节点建立连接 (重新选取的时间很短,大概200ms)

3.Zookeeper的使用

3.1 安装

*由于Zookeeper是由java语言编写的,因此在安装Zookeeper前需要安装好JDK,并且配置环境变量JAVA_HOME

Zookeeper分布式服务协调组件

Zookeeper分布式服务协调组件

Zookeeper官网下载zk进行解压安装:

Zookeeper分布式服务协调组件

3.2 目录结构

bin目录

Zookeeper分布式服务协调组件

zkEnv.sh:用于配置zk服务启动时的环境变量 (包括加载配置文件的路径等)

zkServer.sh:用于启动zk服务,默认监听2181端口。

zkCli.sh:用于启动zk客户端。

zookeeper.out:用于存放zk运行时的日志。

conf目录

Zookeeper分布式服务协调组件

log4j.properties文件:zk运行时的日志配置文件,默认日志信息都将打印到bin目录下的zookeeper.out文件 (当使用Zookeeper遇到异常时应该查看此文件下的内容)

zoo_sample.conf文件:zk服务的配置文件,由Zookeeper官方提供 (默认zk服务启动时将加载conf目录下的zoo.cfg配置文件)

3.3 配置文件

Zookeeper启动时默认加载conf目录下的zoo.cfg配置文件,因此将conf目录下的zoo_sample.conf配置文件更名为zoo.cfg。

配置文件

#基础配置
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/Zookeeper/Zookeeper-3.4.6/zkdata
dataLogDir=/usr/Zookeeper/Zookeeper-3.4.6/zklog
clientPort=2181
autopurge.purgeInterval=1

tickTime:initLimit、syncLimit属性的时间单位,值是毫秒。

initLimit:Zookeeper集群搭建前所允许的初始化时间。

syncLimit:Leader发送心跳给Follower,Follower向Leader回复心跳这一过程所允许的最大时长 (rtt,往返时间),一旦超过了这个时间,则Leader认为该Follower宕机。

dataDir:Zookeeper快照日志的存放目录。

dataLogDir:Zookeeper事务日志的存放目录。

clientPort:Zookeeper服务监听的端口,默认为2181。

*当其中一台zk服务启动后,剩余的zk服务必须在initLimit规定的时间内全都启动,否则zk进行集群的搭建时会认为未启动的zk服务已经失效。

*如果不配置dataLogDir,那么Zookeeper的事务日志将写入到dataDir目录下 (会严重影响zk的性能)

3.4 Zookeeper的启动

使用zkServer.sh命令启动Zookeeper服务。

Zookeeper分布式服务协调组件

使用jps命令查询zk进程是否启动成功,当出现QuorumPeerMain表示zk启动成功。

Zookeeper分布式服务协调组件

3.5 Zookeeper集群搭建

1.修改配置文件,添加zk集群配置

#基础配置
tickTime=
initLimit=
syncLimit=
dataDir=/usr/Zookeeper/Zookeeper-3.4./zkdata
dataLogDir=/usr/Zookeeper/Zookeeper-3.4./zklog
clientPort=
autopurge.purgeInterval=1 #集群配置
server.=192.168.1.119::
server.=192.168.1.122::
server.=192.168.1.125::

在conf文件下使用server.标识属性配置zk集群,使多个zk服务构成一个组 (标识必须为整数)

server.本机zk标识 = zk服务地址:leader和follower之间的通信端口:leader选举端口

server.其他zk标识 = zk服务地址:leader和follower之间的通信端口:leader选举端口

*标识与zk服务进行绑定,因此同一个集群下的zk服务的标识不能相同。

*leader和follower之间的通信端口默认是2888,leader选举端口默认是3888。

2.在快照日志目录下创建myid文件,文件中的值是本台zk服务的唯一标识(给节点编号)

#将1输入到myid文件中
echo "" > myid

Zookeeper分布式服务协调组件

*需要修改要构成集群的其他zk节点的配置文件以及设置其myid文件。

3.启动Zookeeper服务

*在 initLimit * tickTime的时间内启动集群中的所有zk节点。

Zookeeper分布式服务协调组件

4.查看Zookeeper的状态

Zookeeper分布式服务协调组件

Zookeeper分布式服务协调组件

Zookeeper分布式服务协调组件

注意事项

*搭建Zookeeper集群时务必遵循2n+1个节点,因为根据Zookeeper的工作原理,只要有大于一半的节点存活,则Zookeeper集群就能够对外提供服务。

*搭建zk集群时需关闭每台zk服务器上的防火墙或者开放对应的端口,否则集群中的zk间无法进行通讯。

*zk集群在高负荷的工作时会产生大量的事务日志,如果日志长期不进行清理容易将分区中的空间占满最终导致zk服务无法运行,因此需要定期清理zk产生的事务日志 (可以配合Linux的crontab命令设置每天定时去执行清除日志文件的脚本)

3.6 Zookeeper客户端操作

Zookeeper分布式服务协调组件

3.7 Java中操作Zookeeper

可以通过Apache提供的Zookeeper API对zk进行操作 (提供基本功能),也可以使用Apache提供的Curator框架操作zk (提供更全面的功能)

*Curator框架除了基本的节点添加、删除、修改、查询,监听节点功能外,还提供了session超时重连、主从选举、分布式计算器、分布式锁等等适用于各种复杂的zk场景的API封装。

1.导入Maven依赖

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
<type>pom</type>
</dependency>

2.建立连接

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException

connectString:zookeeper server列表,多个以逗号隔开。

sessionTimeout:指定连接Zookeeper的超时时间。

watcher:事件回调接口。

*ZooKeeper实例将从服务列表中选择一个server建立连接,若连接失败则选择另外一个server重新进行连接。

*ZooKeeper实例是通过异步的方式建立连接,当连接建立完毕后回调指定Watcher的process方法,因此程序为了保证同步建立连接,可以使用JAVA提供的CountDownLatch同步辅助类进行控制。

3.调用Zookeeper提供的API

//创建节点,指定节点路径、数据、节点的类型
public String create(final String path, byte data[], List<ACL> acl,CreateMode createMode) //获取子节点
public List<String> getChildren(final String path, boolean watch) //判断节点是否存在
public Stat exists(String path, boolean watch) //获取节点中的数据
public byte[] getData(String path, boolean watch, Stat stat) //设置或更新节点中的数据
public Stat setData(final String path, byte data[], int version) //删除节点
public void delete(final String path, int version)

*创建节点时需要指定节点的类型,Apache Zookeeper API中提供了CreateMode枚举类,用于指定节点的类型。

CreateMode.PERSISTENT:持久化节点
CreateMode.PERSISTENT_SEQUENTIAL:持久化节点并顺序编号
CreateMode.EPHEMERAL:临时节点
CreateMode.EPHEMERAL_SEQUENTIAL:临时节点并顺序编号

*当进行更新和删除操作时,需要传递版本号,其内部进行CAS判断,当且仅当传递的版本号与当前节点的版本号相同时,才进行操作(并非每个znode下包含多个版本的数据,传递版本号只是用来做CAS,默认值-1,表示不进行CAS判断)

*Apache Zookeeper API中有很多方法都支持Watcher类型参数,Watcher可用于监听事件的发生以及连接状态的改变,包括节点的创建、删除、节点中数据的改变、节点的子节点发生改变等事件,失去连接、异步连接、认证失败、只读连接、连接过期等连接状态。

完整示例

/**
* @Auther: ZHUANGHAOTANG
* @Date: 2018/11/12 14:55
* @Description:
*/
public class ZKUtils { /**
* 日志输出
*/
private static Logger logger = LoggerFactory.getLogger(ZKUtils.class); /**
* ZK服务列表
*/
private static final String URLS = "192.168.1.80:2181,192.168.1.81:2181,192.168.1.83:2181"; /**
* 连接Zookeeper的超时时长(单位:毫秒)
*/
private static final int SESSION_TIMEOUT = 3000; /**
* Zookeeper连接对象
*/
private static ZooKeeper zk = null; static {
try {
CountDownLatch countDownLatch = new CountDownLatch(1);//锁存器(同步辅助类)
zk = new ZooKeeper(URLS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
countDownLatch.countDown();//倒数器-1
}
}
});
countDownLatch.await();
} catch (Exception e) {
logger.info("Zookeeper获取连接失败,{}", e);
}
} /**
* 创建节点
*
* @param path
* @param data
* @param createMode
* @throws Exception
*/
public static void createPath(String path, String data, CreateMode createMode) throws Exception {
if (StringUtils.isBlank(path)) {
throw new Exception("path is null");
}
if (!exists(path)) {
zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
}
} /**
* 获取子节点
*
* @param path
* @return
* @throws Exception
*/
public static List<String> getSubNode(String path) throws Exception {
if (StringUtils.isBlank(path)) {
throw new Exception("path is null");
}
return zk.getChildren(path, false);
} /**
* 判断节点是否存在
*
* @param path
* @return
* @throws Exception
*/
public static boolean exists(String path) throws Exception {
if (StringUtils.isBlank(path)) {
throw new Exception("path is null");
}
if (zk.exists(path, false) != null) {
return true;
}
return false;
} /**
* 获取节点中的数据
*
* @param path
* @return
* @throws Exception
*/
public static String getData(String path) throws Exception {
if (StringUtils.isBlank(path)) {
throw new Exception("path is null");
}
return new String(zk.getData(path, false, null));
} /**
* 更新节点中的数据
*
* @param path
* @param data
* @throws Exception
*/
public static void setData(String path, String data) throws Exception {
if (StringUtils.isBlank(path)) {
throw new Exception("path is null");
}
zk.setData(path, data.getBytes(), -1);
} /**
* 删除节点
*
* @param path
* @throws Exception
*/
public static void deletePath(String path) throws Exception {
if (StringUtils.isBlank(path)) {
throw new Exception("path is null");
}
zk.delete(path, -1);
} }

Zookeeper分布式服务协调组件的更多相关文章

  1. 【转】浅谈分布式服务协调技术 Zookeeper

    非常好介绍Zookeeper的文章, Google的三篇论文影响了很多很多人,也影响了很多很多系统.这三篇论文一直是分布式领域传阅的经典.根据MapReduce,于是我们有了Hadoop:根据GFS, ...

  2. Hadoop Zookeeper 分布式服务框架

    what is Zookeeper? 1,开源的分布式的,为分布式应用提供协调服务的Apache项目2,提供一个简单原语集合,以便于分布式应用可以在它之上构建更高层次的同步服务3,设计非常易于编程,它 ...

  3. zookeeper分布式服务中选主的应用

    通常zookeeper在分布式服务中作为注册中心,实际上它还可以办到很多事.比如分布式队列.分布式锁 由于公司服务中有很多定时任务,而这些定时任务由于一些历史原因暂时不能改造成框架调用 于是想到用zo ...

  4. 分布式服务协调技术zookeeper笔记

    本文主要学习ZooKeeper的体系结构.节点类型.节点监听.常用命令等基础知识,最后还学习了ZooKeeper的高可用集群的搭建与测试.希望能给想快速掌握ZooKeeper的同学有所帮助. ZooK ...

  5. ZooKeeper -- 分布式开源协调服务

    ZooKeeper是一个为分布式应用所设计的开源协调服务,适用于大型的分布式系统,可以提供统一命名服务.状态同步服务.集群管理.分布式应用配置项的管理等服务.ZooKeeper支持Java和C两种编程 ...

  6. Java学习之Dubbo&plus;ZooKeeper分布式服务Demo

    背景:在之前的一个<Java学习之SpringBoot整合SSM Demo>分享中说到搭建ZooKeeper和Dubbo分布式框架中遇到了一些技术问题没能成功,只分享了其中的一个中间产物, ...

  7. 分布式服务框架 Zookeeper — 管理分布式环境中的数据

    本节本来是要介绍ZooKeeper的实现原理,但是ZooKeeper的原理比较复杂,它涉及到了paxos算法.Zab协议.通信协议等相关知识,理解起来比较抽象所以还需要借助一些应用场景,来帮我们理解. ...

  8. 分布式服务框架Zookeeper

    协议介绍 zookeeper协议分为两种模式 崩溃恢复模式和消息广播模式 崩溃恢复协议是在集群中所选举的leader 宕机或者关闭 等现象出现 follower重新进行选举出新的leader 同时集群 ...

  9. 分布式服务框架 Zookeeper -- 管理分布式环境中的数据

    转自:http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/index.html Zookeeper 分布式服务框架是 Apa ...

随机推荐

  1. java retention注解

    Retention注解 Retention(保留)注解说明,这种类型的注解会被保留到那个阶段. 有三个值:1.RetentionPolicy.SOURCE —— 这种类型的Annotations只在源 ...

  2. kindeditor-4&period;1&period;10 结合 Asp&period;Net MVC 添加图片功能

    KindEditor是一套开源的HTML可视化编辑器,现在我要结合Asp.Net MVC4 上传图片功能,做相应的配置和修改, 其实网上也有人写过类似的文章了,我写出来是以防以后使用的时候出现这样的问 ...

  3. java&period;lang&period;NoClassDefFoundError&colon; com&sol;opensymphony&sol;xwork2&sol;util&sol;TextUtils

    java.lang.NoSuchMethodError: com.opensymphony.xwork2.ActionContext.get(Ljava/lang/String;)Ljava/lang ...

  4. leetCode 70&period;Climbing Stairs (爬楼梯) 解题思路和方法

    Climbing Stairs  You are climbing a stair case. It takes n steps to reach to the top. Each time you ...

  5. call&lpar;&rpar;与apply&lpar;&rpar;区别

    一.方法的定义 call方法: 语法:call(thisObj,Object)定义:调用一个对象的一个方法,以另一个对象替换当前对象.说明:call 方法可以用来代替另一个对象调用一个方法.call ...

  6. c3p0获取连接Connection后的Close&lpar;&rpar;---释疑

    论题: java c3p0获取连接Connnection 之后, 调用 con.close( ) 是否真的关闭了物理连接 ? 简答: c3p0采用连接池, 目的就是提前预置一定数量的连接, 在使用时候 ...

  7. maven自定义jar到本地仓库

    Apache Maven为项目构建提供了绝佳的解决方案,其本地仓库中缓存了远程代理仓库或*仓库中的资源,从而提高网络资源使用效率,很好很强大!  但是并非所有资源都可以根据GroupId.Artif ...

  8. &lbrack;数据清洗&rsqb;- Pandas 清洗&OpenCurlyDoubleQuote;脏”数据(三)

    预览数据 这次我们使用 Artworks.csv ,我们选取 100 行数据来完成本次内容.具体步骤: 导入 Pandas 读取 csv 数据到 DataFrame(要确保数据已经下载到指定路径) D ...

  9. dojo API中英文缩写的意思

    dojo API中英文缩写的意思 1.A-Array(数组) 2.B-Boolean(布尔类型) 3.C-Constructor(构造器) 4.D-Date(日期) 5.{}-DomNode(节点) ...

  10. 2019-04-29 EasyWeb下配置Atomikos&plus;SQLServer分布式数据源

    初次尝试: 配置Mysql时候使用的是Atomikos+DruidXADataSource,所以觉得配置SQLServer应该也是仅仅配置配置就够了,于是引入JDBC驱动依赖后,配置了文件 sprin ...