Curator使用
Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。
使用Curator需要依赖包:
guava-17.0.jar
zookeeper-3.4.6.jar
curator-framework-3.2.1.jar
创建连接:
public class CreateSession {
public static void main(String[] args) throws Throwable {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//刚开始重试间隔为1秒,之后重试间隔逐渐增加,最多重试不超过三次
/*RetryPolicy retryPolicy1 = new RetryNTimes(3, 1000);//最大重试次数,和两次重试间隔时间
RetryPolicy retryPolicy2 = new RetryUntilElapsed(5000, 1000);//会一直重试直到达到规定时间,第一个参数整个重试不能超过时间,第二个参数重试间隔
//第一种方式
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.3:2181", 5000,5000,retryPolicy);//最后一个参数重试策略
*/
//第二种方式
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.0.3:2181")
.sessionTimeoutMs(5000)//会话超时时间
.connectionTimeoutMs(5000)//连接超时时间
.retryPolicy(retryPolicy)
.build();
client1.start();
String path = client1.create().creatingParentsIfNeeded()//若创建节点的父节点不存在会先创建父节点再创建子节点
.withMode(CreateMode.EPHEMERAL)//withMode节点类型,
.forPath("/curator/3","131".getBytes());
System.out.println(path);
List<String> list = client1.getChildren().forPath("/");
System.out.println(list);
//String re = new String(client1.getData().forPath("/curator/3"));//只获取数据内容
Stat stat = new Stat();
String re = new String(client1.getData().storingStatIn(stat)//在获取节点内容的同时把状态信息存入Stat对象
.forPath("/curator/3"));
System.out.println(re);
System.out.println(stat);
client1.delete().guaranteed()//保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
.deletingChildrenIfNeeded()//若当前节点包含子节点
.withVersion(-1)//指定版本号
.forPath("/curator");
Thread.sleep(Integer.MAX_VALUE);
}
修改节点数据:
Stat stat = new Stat();
String re = new String(client1.getData().storingStatIn(stat)//在获取节点内容的同时把状态信息存入Stat对象
.forPath("/curator/3"));
System.out.println(re);
System.out.println(stat);
Thread.sleep(10000);
client1.setData().withVersion(stat.getVersion())//修改前获取一次节点数据得到版本信息
.forPath("/curator/3", "111".getBytes());
若线程在sleep时,在另一个客户端修改了该节点数据,会抛出异常:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /curator/3
异步调用:
ExecutorService es = Executors.newFixedThreadPool(5);//线程池
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.3:2181", 5000, 5000, retry);
client.start();
//Stat stat = client.checkExists().forPath("/node_1");//同步调用
client.checkExists().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curator, CuratorEvent event) throws Exception {//传入客户端对象和事件
System.out.println(event.getType());
int re = event.getResultCode();//执行成功为0
System.out.println(re);
String path = event.getPath();
System.out.println(path);
Stat stat = event.getStat();
System.out.println(stat);
}
},"123",es).forPath("/node_1");//把线程池es传给异步调用
List<String> list = client.getChildren().forPath("/");
System.out.println(list);
Thread.sleep(Integer.MAX_VALUE);
事件监听:
//节点监听
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.3:2181", 5000, 5000, retry);
client.start();
final NodeCache cache = new NodeCache(client,"/node_1");
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] res = cache.getCurrentData().getData();
System.out.println("data: " + new String(res));
}
});
Thread.sleep(Integer.MAX_VALUE);
//子节点监听
@SuppressWarnings("resource")
final PathChildrenCache cache = new PathChildrenCache(client,"/node_1",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("add:" + event.getData());
break;
case CHILD_UPDATED:
System.out.println("update:" + event.getData());
break;
case CHILD_REMOVED:
System.out.println("remove:" + event.getData());
break;
default:
break;
}
}
});
ACL权限:
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.3:2181", 5000, 5000, retry);
client.start();
//ACL有IP授权和用户名密码访问的模式
ACL aclRoot = new ACL(Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:root")));
List<ACL> aclList = new ArrayList<ACL>();
aclList.add(aclRoot);
String path = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.withACL(aclList)
.forPath("/node_3/node_ACL","2".getBytes());
System.out.println(path);
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.0.3:2181")
.sessionTimeoutMs(5000)//会话超时时间
.connectionTimeoutMs(5000)//连接超时时间
.authorization("digest","root:root".getBytes())//权限访问
.retryPolicy(retry)
.build();
client1.start();
String re = new String(client1.getData().forPath("/node_3/node_ACL"));
System.out.println(re);