curator是Netflix公司开源的一套ZooKeeper客户端,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作。包括连接重连,反复注册Watcher等。实现了Fluent风格的API接口,目前已经为Apache的*项目,是全世界使用最广泛的ZooKeeper客户端之一
第一:maven依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
第二:测试的代码
package com.yeepay.sxf.testrurator; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat; /**
* 测试Rurator客户端测试
* @author sxf
*
*/
public class TestRurator { //创建一个线程池,让异步调用的的回调方法,申请线程池中的资源
private static ExecutorService es=Executors.newFixedThreadPool(5);
public static void main(String[] args) throws Exception {
//重试策略
RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3); //客户端构建器
Builder builder=CuratorFrameworkFactory.builder(); builder.connectString("10.151.30.75:2181");//客户端链接地址:端口号
builder.sessionTimeoutMs(5000);//会话的超时时间
builder.connectionTimeoutMs(5000);//链接的超时时间
builder.retryPolicy(retryPolicy);//指定重试策略 //构建客户端
CuratorFramework client=builder.build();
client.start(); //测试创建节点
//testCreateNode(client); //测试删除节点
//testDeleteNode(client); //获取子节点列表
//getChildList(client); //获取节点中存取的值
//getNodeValue(client); //修改节点中存储数据的值
//updateNodeValue(client); //判断一个节点是否存在
//isExistsNode(client); //测试异步调用使用方法
//testSynCallBack(client); //测试节点数据变动和节点被创建的事件足册
//nodeValueChangeListern(client); //给某个节点注册监听其子节点的活动
nodeChildListernDothing(client);
Thread.sleep(Integer.MAX_VALUE);
} /**
* 建立客户端链接的第一种方法
* @throws InterruptedException
*/
public void testCreateSession() throws InterruptedException{
//重试策略可以实现该接口。通过自定义规则,进行重试
//重试链接的策略的接口
//(最开始时间间隔,最多重试次数) 随着次数递增,时间间隔还会增加
RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3);
//(最多重试次数,每次重试间隔的时间长度)
RetryPolicy retryPolicy2 =new RetryNTimes(5, 1000);
//(重试时间的总长度,每次重试的时间间隔) 这个基本可重试10次
RetryPolicy retryPolicy3 =new RetryUntilElapsed(10000, 1000); //(ip地址端口,会话超时时间,建立连接超时时间,重试链接的策略)
CuratorFramework client=CuratorFrameworkFactory.newClient("10.151.30.75:2181", 5000,5000,retryPolicy);
//启动链接
client.start(); Thread.sleep(Integer.MAX_VALUE); } /**
* 建立客户端链接的第二种方法
*/
public static void testCreateSession2(){
//重试策略
RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3); //客户端构建器
Builder builder=CuratorFrameworkFactory.builder(); builder.connectString("10.151.30.75:2181");//客户端链接地址:端口号
builder.sessionTimeoutMs(5000);//会话的超时时间
builder.connectionTimeoutMs(5000);//链接的超时时间
builder.retryPolicy(retryPolicy);//指定重试策略
builder.authorization(null);//给客户端添加权限AuthInfo 相关的知识在原生api里有讲过 // AuthInfo a=new AuthInfo("ip", "10.151.30.75:2181".getBytes());
// AuthInfo b=new AuthInfo("digest", "shangxiaofei:shangxiaofei".getBytes()); //构建客户端
CuratorFramework client=builder.build(); //建立链接
client.start(); } /**
* 测试创建节点(调用顺序)
* 两种创建方式都可以
* @throws Exception
*/
public static void testCreateNode(CuratorFramework client) throws Exception{
//创建节点构建器
// String path=client.create()
// .creatingParentsIfNeeded()//防止创建节点的父亲节点不存在,导致创建节点失败。如果父节点不存在,先创建父节点,再创建子节点
// .withMode(CreateMode.PERSISTENT)//指定节点的类型(持久节点,临时节点,持久节并顺序节点,临时并顺序节点)
// .forPath("/node_134", "xiaoshuai".getBytes());//节点路径,节点中存储的数据
//
CreateBuilder builder=client.create();
builder.creatingParentsIfNeeded();
builder.withMode(CreateMode.PERSISTENT);
builder.withACL(null);//添加这个节点的权限。相关内容,在原生zookeeper客户端有讲解。上上篇博客
/**
* //基于ip的权限,意味着这个ip的客户端对此节点有读取权限
* ACL ipacl=new ACL(Perms.READ, new Id("ip", "10.151.30.75"));
* //基于digest的权限,意味着只有这个用户名和密码的客户端才能读取和写的权限
* ACL digetacl=new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei")));
*
* List<ACL> myaclAcls=new ArrayList<ACL>();
* myaclAcls.add(ipacl);
* myaclAcls.add(digetacl);
*/
String pathString=builder.forPath("/node_135/node_135_2", "zhengjiuzhonghua".getBytes()); System.out.println("TestRurator.testCreateNode()"+pathString);
} /**
* 测试删除节点(调用顺序不能乱)
* @param client
* @throws Exception
*/
public static void testDeleteNode(CuratorFramework client) throws Exception{
DeleteBuilder delteBuilder=client.delete();
delteBuilder.guaranteed();//保证节点必须删除,如果删除出现错误,则后台程序会不断去尝试删除。
delteBuilder.deletingChildrenIfNeeded();//如果存在子节点,先删除子节点
delteBuilder.withVersion(-1);//指定版本号
delteBuilder.forPath("/node_135");//指定路径
} /**
* 获取子节点列表
* @param client
* @throws Exception
*/
public static void getChildList(CuratorFramework client) throws Exception{
GetChildrenBuilder getChildBuilder=client.getChildren();
List<String> nodelist=getChildBuilder.forPath("/");
System.out.println("TestRurator.getChildList()"+nodelist); }
/**
* 获取节点中存取的值
* @throws Exception
*/
public static void getNodeValue(CuratorFramework client) throws Exception{ GetDataBuilder dataBuilder= client.getData(); Stat stat=new Stat();
dataBuilder.storingStatIn(stat);
byte[] bytes=dataBuilder.forPath("/node_135/node_135_2");
System.out.println("TestRurator.getNodeValue(data==>)"+new String(bytes));
System.out.println("TestRurator.getNodeValue()"+stat);
} /**
* 修改节点中的数据
* @param client
* @throws Exception
*/
public static void updateNodeValue(CuratorFramework client) throws Exception{
SetDataBuilder setDataBuilder=client.setData();
Stat stat=setDataBuilder.withVersion(-1).forPath("/node_135/node_135_2", "tianxiawushuang".getBytes());
System.out.println("TestRurator.updateNodeValue()"+stat);
} /**
* 判断一个节点是否存在
* @param client
* @throws Exception
*/
public static void isExistsNode(CuratorFramework client) throws Exception{
ExistsBuilder existsBuilder=client.checkExists();
Stat stat=existsBuilder.forPath("/node_135/node_135_2");
if(stat!=null){
System.out.println("TestRurator.isExistsNode()节点存在");
}else{
System.out.println("TestRurator.isExistsNode()节点不存在");
}
}
/**
* 以判断节点是否存在,讲解异步调用的使用
* @param client
* @throws Exception
*/
public static void testSynCallBack(CuratorFramework client) throws Exception{
ExistsBuilder existsBuilder=client.checkExists();
existsBuilder.inBackground(new myBalckCallBack(),"sxf",es).forPath("/node_135/node_135_2");
//System.out.println("TestRurator.testSynCallBack(异步调用返回)"+stat);
} /**
* 异步调用回调接口的实现类
* @author sxf
*
*/
private static class myBalckCallBack implements BackgroundCallback{ @Override
public void processResult(CuratorFramework client, CuratorEvent event)
throws Exception {
//回调类型
CuratorEventType curatorEventType=event.getType();
//节点路径
String path=event.getPath();
//节点列表
List<String> list=event.getChildren();
//获取上下文
Object context=event.getContext();
//获取返回碼 异步操作成功,返回0
int code=event.getResultCode();
//获取数据内容
byte[] data=event.getData(); StringBuffer sb=new StringBuffer();
sb.append("curatorEventType="+curatorEventType).append("\n");
sb.append("path="+path).append("\n");
sb.append("list="+list).append("\n");
sb.append("context="+context).append("\n");
sb.append("data="+new String(data)).append("\n"); System.out.println(sb.toString());
} } /**
*节点事件监听(节点被创建,节点中存储的数据被修改)
* @param client
* @throws Exception
*/
public static void nodeValueChangeListern(CuratorFramework client) throws Exception{
//注册监听
final NodeCache nodeCache =new NodeCache(client, "/node_135/node_135_2");;
//开启监听
nodeCache.start();
//添加监听器(实现指定接口的实例对象)
nodeCache.getListenable().addListener(new NodeCacheListener() { @Override
public void nodeChanged() throws Exception {
//获取当前节点的最近数据
byte[] bytes=nodeCache.getCurrentData().getData();
System.out.println(new String(bytes)); }
});
} /**
* 监听节点的子节点事件注册
* @param client
* @throws Exception
*/
public static void nodeChildListernDothing(CuratorFramework client) throws Exception{
//注册监听器
//第三个参数:系统在监听到子节点列表发生变化时同时获取子节点的数据内容
final PathChildrenCache cache=new PathChildrenCache(client, "/node_135", true);
//开启监听
cache.start();
//注册监听器
cache.getListenable().addListener(new PathChildrenCacheListener() { @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
Type type=event.getType();
switch (event.getType()) {
case CHILD_ADDED://添加子节点
System.out.println("新添加子节点中存储的值==>"+new String(event.getData().getData()));
break; case CHILD_UPDATED://子节点被修改
System.out.println("被修改的子节点中存储的值==>"+new String(event.getData().getData()));
break;
case CHILD_REMOVED://子节点被删除
System.out.println("被删除的字节点的路径==>"+event.getInitialData());
break; default:
break;
} }
});
} }