package com.ilike.testCurator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
/**
* zookeeper的开源客户端curator的使用
* 创建,删除,修改,读取节点, 获取子节点,判断节点是否存在,监听节点变化,监听子节点变化
*
* @author 桑伟东
*
*/
public class TestCurator {
// curator 的操作连接对象
final CuratorFramework client2;
// 创建线程池
ExecutorService es = Executors.newFixedThreadPool(5);
{
/**
* 重试策略1(使用Curator自带的重试策略)
* 第一个参数为基本重试时间,指的是第一次重试的间隔时间为1秒,之后会越来越久,但是最大重试次数为三次 第二个参数为最大重试次数
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
/**
* 重试策略2(使用Curator自带的重试策略) 第一个参数为共重试的次数 第二个参数为两次重试之间的时间间隔
*/
RetryPolicy retryPolicy2 = new RetryNTimes(3, 1000);
/**
* 重试策略3(使用Curator自带的重试策略) 第一个参数为从开始重试到结束重试不能超过这么长时间 第二个参数为两次重试之间的时间间隔
*/
RetryPolicy retryPolicy3 = new RetryUntilElapsed(5000, 1000);
// 第一种连接方式 最后一个参数为连接失败后的重试策略
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.111.129:2181", 5000, 5000, retryPolicy);
// 第二种连接方式
client2 = CuratorFrameworkFactory.builder().connectString("192.168.111.129:2181").sessionTimeoutMs(5000)
.connectionTimeoutMs(5000).retryPolicy(retryPolicy3).build();
client2.start();
}
/**
* 创建节点
*
* @throws Exception
*/
@Test
public void testCreateNode() throws Exception {
client2.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/node_20/node_20_1",
"7899".getBytes());
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 删除节点
*
* @throws Exception
*/
@Test
public void testDelNode() throws Exception {
// 删除单节点
// client2.delete().withVersion(-1).forPath("/node_20/node_20_1");
// 删除带有子节点的节点
client2.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/node_20");
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 获取子节点
*
* @throws Exception
*/
@Test
public void testGetChild() throws Exception {
// 获取子节点
List<String> childs = client2.getChildren().forPath("/");
System.out.println(childs);
}
/**
* 获取节点数据
*
* @throws Exception
*/
@Test
public void testGetData() throws Exception {
// 获取节点的数据内容
byte[] bs = client2.getData().forPath("/node_4");
System.out.println(new String(bs));
// 获取节点的数据及其状态信息
Stat stat = new Stat();
byte[] bs2 = client2.getData().storingStatIn(stat).forPath("/node_4");
System.out.println(new String(bs2));
System.out.println(stat);
}
/**
* 判断节点是否存在
*
* @throws Exception
*/
@Test
public void testNodeExists() throws Exception {
// 判断节点是否存在(同步调用),返回zookeeper的stat对象,如果节点不存在,返回null
// Stat stat=client2.checkExists().forPath("/node_40");
// System.out.println(stat);
// 判断节点是否存在(异步调用)
client2.checkExists().inBackground(new BackgroundCallback() {
/**
* 异步调用会增加系统开销,如果在系统中存在大量的异步调用,就会非常耗费系统资源,因此需要创建线程池
*/
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
// TODO Auto-generated method stub
// 获取事件的类型
CuratorEventType type = event.getType();
// 获取操作的返回码
int code = event.getResultCode();// 执行成功,得到0,否则就是非0
// 获取异步调用的上下文
Object context = event.getContext();
// 获取触发该事件的节点路径
String path = event.getPath();
// 获取子节点的列表
List<String> childs = event.getChildren();
// 获取节点的数据
byte[] bs = event.getData();
String data = new String(bs);
StringBuilder sb = new StringBuilder();
sb.append("type:" + type).append("\n");
sb.append("code:" + code).append("\n");
sb.append("context:" + context).append("\n");
sb.append("path:" + path).append("\n");
sb.append("childs:" + childs).append("\n");
sb.append("data:" + data).append("\n");
System.out.println(sb.toString());
}
}, "异步调用判断节点是否存在", es).forPath("/node_4");
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 修改节点数据
*
* @throws Exception
*/
@Test
public void testUpdateData() throws Exception {
Stat stat = new Stat();
client2.getData().storingStatIn(stat).forPath("/node_4");
client2.setData().withVersion(stat.getVersion()).forPath("/node_4", "sda89ss".getBytes());
}
/**
* 监听节点数据的变化
*
* @throws Exception
*/
@Test
public void testNodeListener() throws Exception {
// 删除单节点
// client2.delete().withVersion(-1).forPath("/node_20/node_20_1");
// 删除带有子节点的节点
client2.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/node_20");
Thread.sleep(Integer.MAX_VALUE);
// 修改节点数据
Stat stat = new Stat();
client2.getData().storingStatIn(stat).forPath("/node_4");
client2.setData().withVersion(stat.getVersion()).forPath("/node_4", "sda89ss".getBytes());
}
/**
* 监听子节点的变化
*
* @throws Exception
*/
@Test
public void testChildListener() throws Exception {
// 监听子节点,第三个参数为监听到节点变化时,是否获取子节点内容
final PathChildrenCache cache = new PathChildrenCache(client2, "/node_4", true);
// 开启监听
cache.start();
// 注册监听器
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework clent, PathChildrenCacheEvent event) throws Exception {
// TODO Auto-generated method stub
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("增加了子节点:" + event.getData());
break;
case CHILD_UPDATED:
System.out.println("子节点的数据内容发生改变:" + event.getData());
break;
case CHILD_REMOVED:
System.out.println("子节点被删除:" + event.getData());
break;
default:
break;
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}