package bjsxt.curator.base; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryLoop; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; public class CuratorBase { /** zookeeper地点 */ // static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; static final String CONNECT_ADDR = "115.159.41.97:2181"; /** session超不时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { // RetryLoop 无限重试 一直重试欠好, //1 重试计谋:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) // .namespace("super") .build(); //3 开启连接 cf.start(); // System.out.println(States.CONNECTED); // System.out.println(cf.getState()); // 新加、删除 //4 成立节点 指定节点类型(不加withMode默认为长期类型节点)、路径、数据内容 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes()); //5 删除节点 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); // 读取、改削 //创建节点 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes()); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2内容".getBytes()); //读取节点 String ret1 = new String(cf.getData().forPath("/super/c2")); System.out.println(ret1); //改削节点 cf.setData().forPath("/super/c2", "改削c2内容".getBytes()); String ret2 = new String(cf.getData().forPath("/super/c2")); System.out.println(ret2); // 绑定回调函数 ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { System.out.println("code:" + ce.getResultCode()); System.out.println("type:" + ce.getType()); System.out.println("线程为:" + Thread.currentThread().getName()); } }, pool) .forPath("/super/c3","c3内容".getBytes()); Thread.sleep(Integer.MAX_VALUE); // 读取子节点getChildren要领 和 判断节点是否存在checkExists要领 List<String> list = cf.getChildren().forPath("/super"); for(String p : list){ System.out.println(p); } Stat stat = cf.checkExists().forPath("/super/c3"); System.out.println(stat); Thread.sleep(2000); cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); //cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); } }
,