ZooKeeper 学习 (四) ZooKeeper Java客户端API使用

时间:2022-11-06 08:25:05

1.创建会话(new Zookeeper())

有四种构造方法,我们以其中一种为例

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;

/**
*
* @author tanlk
* @date 2017年8月2日 下午11:24:57
*/
public class ZookeeperUsageSimple implements Watcher{

public static CountDownLatch c = new CountDownLatch(1);

public static void main(String[] args) throws IOException {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZookeeperUsageSimple());
System.out.println(zooKeeper.getState());
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("zooKeeper连接成功");

}

public void process(WatchedEvent event) {
System.out.println("接收到WatchedEvent:" + event);
//收到服务端发来的SyncConnected事件之后,接触等待阻塞
if (KeeperState.SyncConnected == event.getState()) {
c.countDown();
}
}

}

2.创建节点(create)

分同步和异步方式创建节点,代码分别如下:

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
* ZooKeeper API 创建节点,使用同步接口
* @author tanlk
* @date 2017年8月2日 下午11:38:16
*/
public class ZkCreateSync implements Watcher{

public static CountDownLatch c = new CountDownLatch(1);

/**
* 临时节点和本次会话有关,会话结束(main方法执行完),节点删除
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkCreateSync());
c.await();

//创建临时节点
String path1 = zooKeeper.create("/test", "测试1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

System.out.println("create success :" +path1);

//创建临时有序节点
String path2 = zooKeeper.create("/test", "测试1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println("create success :" +path2);
}

public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
c.countDown();
}
}

}

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
* ZooKeeper API 创建节点,使用异步接口
* @author tanlk
* @date 2017年8月3日 下午9:23:55
*/
public class ZkCreateASync implements Watcher{

public static CountDownLatch c = new CountDownLatch(1);

/**
* 临时节点和本次会话有关,会话结束(main方法执行完),节点删除
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkCreateASync());
c.await();

//创建临时节点
zooKeeper.create("/test", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new IStringCallback(),"I am context.");


//创建临时有序节点
zooKeeper.create("/test", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,new IStringCallback(),"I am context.");

TimeUnit.SECONDS.sleep(100);
}

public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
c.countDown();
}
}

static class IStringCallback implements AsyncCallback.StringCallback{

public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("create path result:" + rc +"," + path + "," + ctx + "," +name);
}

}
}


3.删除节点(delete)

同样分同步和异步两种方式,代码在4读取数据中有用到


4.读取数据(getChildren getData)

getChildren 获取子节点信息, 有8个接口可供使用,我们看其中一个代码示例

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
* getChildren读取数据
* 程序打印结果如下:
* [c1]
*重新获取子节点:[c1, c2]
* @author tanlk
* @date 2017年8月3日 下午9:59:05
*/
public class ZkGetChildrenSync implements Watcher{
private static CountDownLatch c = new CountDownLatch(1);

private static ZooKeeper zooKeeper = null;

public static void main(String[] args) throws Exception {
String path = "/get_child";


zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkGetChildrenSync());

Stat stat = zooKeeper.exists(path, false);

//如果stat有结果返回,删除此节点
if (stat != null) {
zooKeeper.delete(path, 0);
}
//创建持节点
zooKeeper.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

//创建临时节点
zooKeeper.create(path+"/c1", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

//getChildren(String path, boolean watch) watch设置为true,就会默认的Watcher
//通过子节点变更的事件通知,我们可以重新再次拉取数据,保持数据的更新
List<String> childrenList = zooKeeper.getChildren(path, true);

System.out.println(childrenList);

zooKeeper.create(path+"/c2", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

TimeUnit.SECONDS.sleep(100000);

}

public void process(WatchedEvent event) {

if (KeeperState.SyncConnected == event.getState()) {
//初始连接
if (EventType.None == event.getType() || null == event.getPath()) {
c.countDown();
}else if (event.getType() == EventType.NodeChildrenChanged) { //子节点有变动
try {
System.out.println("重新获取子节点:" + zooKeeper.getChildren(event.getPath(), true));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

}

}


getData 获取当前节点的信息,代码示例如下:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import com.alibaba.fastjson.JSON;

/**
* 通过getData获取这个节点的内容
* @author tanlk
* @date 2017年8月3日 下午10:20:13
*/
public class ZkGetDataSync implements Watcher{

public static CountDownLatch c = new CountDownLatch(1);

public static ZooKeeper zk;

public static Stat stat = new Stat();

public static void main(String[] args) throws Exception {
String path = "/get_data";
zk = new ZooKeeper("127.0.0.1:2181", 5000, new ZkGetDataSync());
c.await();

//创建临时节点
zk.create(path, "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

System.out.println(new String(zk.getData(path, true, stat)));
System.out.println(JSON.toJSONString(stat));

//更新数据setData(final String path, byte data[], int version)
//更新数据有个version,可以说是从CAS衍生来的,如果对数据节点的更新操作没有原子性要求,可以使用-1
//CAS:对于值V,每次更新前都会比对其值是否是预期值A,只有符合预期,才会将V原子化地更新到新值B
zk.setData(path, "hahahahah".getBytes(), -1);

TimeUnit.SECONDS.sleep(10000);
}

public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() || null == event.getPath()) {
c.countDown();
}else if (event.getType() == EventType.NodeDataChanged) {
try {
System.out.println("重新获取该节点数据:" + new String(zk.getData(event.getPath(),true, stat)));
System.out.println(JSON.toJSONString(stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

}

更新数据(setData)

在getData示例中有体现

更新数据方法中可以传入version,可以说是从CAS衍生来的,如果对数据节点的更新操作没有原子性要求,可以使用-1
CAS:对于值V,每次更新前都会比对其值是否是预期值A,只有符合预期,才会将V原子化地更新到新值B


检测节点是否存在(exists)

在以上代码示例也有体现