【ZooKeeper Java API基本操作-建立连接&增删改查+I操作-Curator Watch事件监听】

时间:2021-07-08 00:41:18

ZooKeeper Java API基本操作-建立连接&增删改查

一、常见的ZooKeeper Java API
二、Curator常用操作

  • 常用操作 ①建立连接 ②添加节点 ③查询节点 ④修改节点 ⑤删除节点


一、常见的ZooKeeper Java API

  • 原生Java API
  • ZkClient
  • Curator

二、Curator常用操作

  • 1、环境准备
    pom.xml


<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
  • 2、常用操作

①建立连接

四个关键参数:
Ⅰ、connectString 连接字符串 zkServer 地址和端口:"192.168.253.128:2181"
Ⅱ、sessionTimeoutMs 会话超时时间,单位ms
Ⅲ、connectionTimeoutMs 连接超时时间,单位ms
Ⅳ、retryPolicy 重试策略


//①定义重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
//②获取客户端连接对象
CuratorFramework client= CuratorFrameworkFactory.builder()
.connectString("192.168.253.128:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("stone").build();
//③开启连接
client.start();

设置namespace的作用,相当于每次API操作create /app的时候,系统会补上create /[namespace]/app。

②添加节点

Ⅰ、基本创建
forPath(String path)


//1.基本创建
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String path = client.create().forPath("/app1");
System.out.println(path);

此处返回值的path是不包含namespace的,但是通过./zkCli.sh上是能查询到包含namespace的新建节点。

【ZooKeeper Java API基本操作-建立连接&增删改查+I操作-Curator Watch事件监听】


另外需要注意的是,API默认会把客户端IP地址值存入到当前节点。

【ZooKeeper Java API基本操作-建立连接&增删改查+I操作-Curator Watch事件监听】

Ⅱ、创建节点 带有数据
forPath(String path, byte[] data)


//2.创建节点 带有数据
String path = client.create().forPath("/app2","hello world".getBytes());

Ⅲ、设置节点类型
withMode(CreateMode mode)
the default is CreateMode.PERSISTENT
常用的四种类型:
PERSISTENT
PERSISTENT_SEQUENTIAL
EPHEMERAL
EPHEMERAL_SEQUENTIAL
需要注意的是:临时节点的生命周期是一次会话内,会话结束临时节点也会被清除。


//3.设置节点类型
//默认类型:持久化
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");

Ⅳ、创建多级节点
creatingParentContainersIfNeeded()


//4.创建多级节点 /aap1/p1
//creatingParentContainersIfNeeded()如果父节点不存在则创建节点
String path = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1");

③查询节点

Ⅰ、查询数据:get
getData()


//1.查询数据:get
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));

Ⅱ、查询子节点:ls
getChildren()


//2.查询子节点:ls
List<String> path = client.getChildren().forPath("/app4");
System.out.println(path);

Ⅲ、查询节点状态信息:ls -s
getData().storingStatIn(status)


Stat status = new Stat();
//3.查询节点状态信息:ls -s
byte[] data = client.getData().storingStatIn(status).forPath("/app1");
System.out.println(new String(data));

④修改节点

Ⅰ、基本修改数据:setData().forPath()


client.setData().forPath("/app1", "itcast".getBytes());

Ⅱ、根据版本修改:setData().withVersion().forPath()
多线程并发情形下需要使用此方法


Stat status = new Stat();
//3.查询节点状态信息:ls -s
byte[] data = client.getData().storingStatIn(status).forPath("/app1");

int version = status.getVersion();//查询版本信息
client.setData().withVersion(version).forPath("/app1", "itcast".getBytes());

⑤删除节点

Ⅰ、删除单个节点


//1.删除单个节点
client.delete().forPath("/app1");

Ⅱ、删除带有子节点的节点


//2.删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app1");

Ⅲ、必须成功的删除


//3.必须成功的删除
client.delete().guaranteed().forPath("/app1");

Ⅳ、回调


//4.回调
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("I am deleted.");
System.out.println(event);
}
}).forPath("/app1");

回调方法中 CuratorEvent 是我们实际业务中回调需要使用到的类,其字段包含:


private final CuratorEventType type;
private final int resultCode;
private final String path;
private final String name;
private final List<String> children;
private final Object context;
private final Stat stat;
private final byte[] data;
private final WatchedEvent watchedEvent;
private final List<ACL> aclList;
private final List<CuratorTransactionResult> opResults;
//本次操作输出如下:
CuratorEventImpl{type=DELETE,
resultCode=0,
path='/app2',
name='null',
children=null,
context=null,
stat=null,
data=null,
watchedEvent=null,
aclList=null,
opResults=null}

三、结语

以上即为ZooKeeper Java API基本操作的全部内容




》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》

接下来讲解:

ZooKeeper Java API操作-Curator Watch事件监听

  • 一、Watch事件监听
  • 二、Curator三种Watcher NodeCache PathChildrenCache TreeCache


一、Watch事件监听

  • ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
  • ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
  • ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。
  • Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
    ZooKeeper提供了三种Watcher:
    NodeCache : 只是监听某一个特定的节点
    PathChildrenCache : 监控一个ZNode的子节点.
    TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合


二、Curator三种Watcher

0、建立连接


RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
//第二种方式
CuratorFramework client= CuratorFrameworkFactory.builder()
.connectString("192.168.253.128:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("stone").build();
//开启连接
client.start();

1、NodeCache

  • 只是监听某一个特定的节点(增删改,注意:查询则无变化)
    ①创建NodeCache对象
    ②注册监听
    ③开启监听 如果设置为true,则开始监听时,加载缓存数据


//1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(client,"/app1");
//2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了~");

//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3.开启监听 如果设置为true,则开始监听时,加载缓存数据
nodeCache.start(true);

2、PathChildrenCache

  • 监听某个节点的所有子节点们
    ①创建监听对象
    ②绑定监听器
    ③开启监听 如果设置为true,则开始监听时,加载缓存数据


//1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//2.绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点变化了");
System.out.println(event);
//监听子节点的数据变更,并且拿到变更后的数据
//1.获取类型
PathChildrenCacheEvent.Type type = event.getType();
//2.判断类型是否是UPDATE
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3.开启监听 如果设置为true,则开始监听时,加载缓存数据
pathChildrenCache.start();

此处需留意PathChildrenCacheEvent类,我们可以通过此类来获取自己需要的相应数据。

  • 常见事件类型:
    CHILD_ADDED
    CHILD_UPDATED
    CHILD_REMOVED

3、TreeCache

  • 监听某个节点自己和所有子节点们


//1.创建监听对象
TreeCache treeCache = new TreeCache(client,"/app1");
//2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("节点变化了");
System.out.println(event);
}
});
//3.开启
treeCache.start();
  • 常见事件类型: NODE_ADDED NODE_UPDATED NODE_REMOVED


三、结尾

以上即为Curator Watch事件监听的基础内容