为了更好的实现java操作zookeeper服务器。后来出现Curator框架,非常强大,目前已经是Apache的*项目,有丰富的操作,,例如:session超时重连,主从选举。分布式计数器,分布式锁,等等适用于各种复杂的zookeeper场景api封装
maven依赖
<dependency>
<groupld>org.apache.curator</groupld>
<artifactld>curator-framewprk</artifactld>
<version>2.4.2</version>
</dependency>
Curator 框架中使用链式变成风格,易读性更强,使用工程方法创建连接对象。
1.使用CuratorFrameworkFactory 的两个静态工厂方法(参数不同) 来实现:
参数1:connectString ,连接串
参数2:retryPolicy 重试连接策略,有四种实现,分别为:ExponentialBackoffRetry, RetryNtimes,RetryOneTimes ,RetryUntilElapsed
参数3:sessionTimeoutMs 会话超时时间,默认为60 000ms
参数4:connectionTimeoutMs 连接超时时间,默认为15 000ms
注意,对于RetryPolicy策略通过一个接口来让用户自定义实现
2.创建节点create方法,可选链式:
creatingParentslfNeeded、withMode、forPath、withACL等
3、删除节点delete方法,可选链式项
deletingClildrenlfNeeded、guaranteed、withVersion 、forPath等
4、读取和修改数据getData、setData方法
5、异步绑定回调方法,比如创建节点时绑定一个回调函数,该回调函数可以输出服务器状态码以及服务器事件类型,还可以加入一个线程池进行优化操作,
6、读取子节点方法getChildren
7、判断节点是否存在方法checkExists
1 package bjsxt.curator.base; 2 3 import java.util.List; 4 5 import org.apache.curator.RetryPolicy; 6 import org.apache.curator.framework.CuratorFramework; 7 import org.apache.curator.framework.CuratorFrameworkFactory; 8 import org.apache.curator.retry.ExponentialBackoffRetry; 9 import org.apache.zookeeper.ZooKeeper.States; 10 import org.apache.zookeeper.data.Stat; 11 12 public class CuratorBase { 13 14 /** zookeeper地址 */ 15 static final String CONNECT_ADDR = "192.168.2.2:2181"; 16 /** session超时时间 */ 17 static final int SESSION_OUTTIME = 5000;// ms 18 19 public static void main(String[] args) throws Exception { 20 21 // 1 重试策略:初试时间为1s 重试10次 22 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 23 // 2 通过工厂创建连接 24 CuratorFramework cf = CuratorFrameworkFactory.builder() 25 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME) 26 .retryPolicy(retryPolicy) 27 // .namespace("super") 28 .build(); 29 // 3 开启连接 30 cf.start(); 31 32 System.out.println(States.CONNECTED); 33 System.out.println(cf.getState()); 34 35 // 新加、删除 36 37 // 4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容 38 /** 39 * cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) 40 * .forPath("/super/c1", "c1内容".getBytes()); 41 */ 42 // 5 删除节点 43 /** 44 * cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super") 45 * ; 46 */ 47 // 读取、修改 48 49 // 创建节点 50 /** 51 * cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) 52 * .forPath("/super/c1", "c1内容".getBytes()); 53 * cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) 54 * .forPath("/super/c2", "c2内容".getBytes()); 55 */ 56 // 读取节点 57 /** 58 * String ret1 = new String(cf.getData().forPath("/super/c2")); 59 * System.out.println(ret1); // 修改节点 cf.setData().forPath("/super/c2", 60 * "修改c2内容".getBytes()); String ret2 = new 61 * String(cf.getData().forPath("/super/c2")); System.out.println(ret2); 62 */ 63 64 // 绑定回调函数 65 66 /** 67 * ExecutorService pool = Executors.newCachedThreadPool(); 68 * cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) 69 * .inBackground(new BackgroundCallback() { 70 * 71 * @Override public void processResult(CuratorFramework cf, CuratorEvent 72 * ce) throws Exception { System.out.println("code:" + 73 * ce.getResultCode()); System.out.println("type:" + 74 * ce.getType()); System.out.println("线程为:" + 75 * Thread.currentThread().getName()); } }, 76 * pool).forPath("/super/c3", "c3内容".getBytes()); 77 * Thread.sleep(Integer.MAX_VALUE); 78 */ 79 80 // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法 81 82 List<String> list = cf.getChildren().forPath("/super"); 83 for (String p : list) { 84 System.out.println(p); 85 } 86 87 Stat stat = cf.checkExists().forPath("/super/c3"); 88 System.out.println(stat); 89 90 Thread.sleep(2000); 91 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); 92 93 // cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); 94 95 } 96 }