1.创建会话(new Zookeeper())
有四种构造方法,我们以其中一种为例
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
/**
*
* @author tanlk
* @date 2017年8月2日 下午11:24:57
*/
public class ZookeeperUsageSimple implements Watcher{
public static CountDownLatch c = new CountDownLatch(1);
public static void main(String[] args) throws IOException {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZookeeperUsageSimple());
System.out.println(zooKeeper.getState());
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("zooKeeper连接成功");
}
public void process(WatchedEvent event) {
System.out.println("接收到WatchedEvent:" + event);
//收到服务端发来的SyncConnected事件之后,接触等待阻塞
if (KeeperState.SyncConnected == event.getState()) {
c.countDown();
}
}
}
2.创建节点(create)
分同步和异步方式创建节点,代码分别如下:
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
/**
* ZooKeeper API 创建节点,使用同步接口
* @author tanlk
* @date 2017年8月2日 下午11:38:16
*/
public class ZkCreateSync implements Watcher{
public static CountDownLatch c = new CountDownLatch(1);
/**
* 临时节点和本次会话有关,会话结束(main方法执行完),节点删除
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkCreateSync());
c.await();
//创建临时节点
String path1 = zooKeeper.create("/test", "测试1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("create success :" +path1);
//创建临时有序节点
String path2 = zooKeeper.create("/test", "测试1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("create success :" +path2);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
c.countDown();
}
}
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
/**
* ZooKeeper API 创建节点,使用异步接口
* @author tanlk
* @date 2017年8月3日 下午9:23:55
*/
public class ZkCreateASync implements Watcher{
public static CountDownLatch c = new CountDownLatch(1);
/**
* 临时节点和本次会话有关,会话结束(main方法执行完),节点删除
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkCreateASync());
c.await();
//创建临时节点
zooKeeper.create("/test", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new IStringCallback(),"I am context.");
//创建临时有序节点
zooKeeper.create("/test", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,new IStringCallback(),"I am context.");
TimeUnit.SECONDS.sleep(100);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
c.countDown();
}
}
static class IStringCallback implements AsyncCallback.StringCallback{
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("create path result:" + rc +"," + path + "," + ctx + "," +name);
}
}
}
3.删除节点(delete)
同样分同步和异步两种方式,代码在4读取数据中有用到
4.读取数据(getChildren getData)
getChildren 获取子节点信息, 有8个接口可供使用,我们看其中一个代码示例
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* getChildren读取数据
* 程序打印结果如下:
* [c1]
*重新获取子节点:[c1, c2]
* @author tanlk
* @date 2017年8月3日 下午9:59:05
*/
public class ZkGetChildrenSync implements Watcher{
private static CountDownLatch c = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void main(String[] args) throws Exception {
String path = "/get_child";
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkGetChildrenSync());
Stat stat = zooKeeper.exists(path, false);
//如果stat有结果返回,删除此节点
if (stat != null) {
zooKeeper.delete(path, 0);
}
//创建持节点
zooKeeper.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//创建临时节点
zooKeeper.create(path+"/c1", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//getChildren(String path, boolean watch) watch设置为true,就会默认的Watcher
//通过子节点变更的事件通知,我们可以重新再次拉取数据,保持数据的更新
List<String> childrenList = zooKeeper.getChildren(path, true);
System.out.println(childrenList);
zooKeeper.create(path+"/c2", "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
TimeUnit.SECONDS.sleep(100000);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
//初始连接
if (EventType.None == event.getType() || null == event.getPath()) {
c.countDown();
}else if (event.getType() == EventType.NodeChildrenChanged) { //子节点有变动
try {
System.out.println("重新获取子节点:" + zooKeeper.getChildren(event.getPath(), true));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
getData 获取当前节点的信息,代码示例如下:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import com.alibaba.fastjson.JSON;
/**
* 通过getData获取这个节点的内容
* @author tanlk
* @date 2017年8月3日 下午10:20:13
*/
public class ZkGetDataSync implements Watcher{
public static CountDownLatch c = new CountDownLatch(1);
public static ZooKeeper zk;
public static Stat stat = new Stat();
public static void main(String[] args) throws Exception {
String path = "/get_data";
zk = new ZooKeeper("127.0.0.1:2181", 5000, new ZkGetDataSync());
c.await();
//创建临时节点
zk.create(path, "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(new String(zk.getData(path, true, stat)));
System.out.println(JSON.toJSONString(stat));
//更新数据setData(final String path, byte data[], int version)
//更新数据有个version,可以说是从CAS衍生来的,如果对数据节点的更新操作没有原子性要求,可以使用-1
//CAS:对于值V,每次更新前都会比对其值是否是预期值A,只有符合预期,才会将V原子化地更新到新值B
zk.setData(path, "hahahahah".getBytes(), -1);
TimeUnit.SECONDS.sleep(10000);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() || null == event.getPath()) {
c.countDown();
}else if (event.getType() == EventType.NodeDataChanged) {
try {
System.out.println("重新获取该节点数据:" + new String(zk.getData(event.getPath(),true, stat)));
System.out.println(JSON.toJSONString(stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
更新数据(setData)
在getData示例中有体现
更新数据方法中可以传入version,可以说是从CAS衍生来的,如果对数据节点的更新操作没有原子性要求,可以使用-1
CAS:对于值V,每次更新前都会比对其值是否是预期值A,只有符合预期,才会将V原子化地更新到新值B
检测节点是否存在(exists)
在以上代码示例也有体现