Zookeeper 系列(五)Curator API

时间:2022-08-25 17:14:50

Zookeeper 系列(五)Curator API

一、Curator 使用

Curator 框架中使用链式编程风格,易读性更强,使用工程方法创建连接对象使用。

(1) CuratorFrameworkFactory :俩个静态工厂方法(参数不同)来实现

  • 参数1: connectString,连接串
  • 参数2: retryPolicy,重试连接策略。有四种实现分别为:ExponentialBackoffRetry、RetryNTimes. RetryOneTimes、RetryUntilElapsed
  • 参数3: sessionTimeoutMs 会话超时时间默认为 6000oms
  • 参数4: connectionTimeOutms 连接超时时间,默认为 15000ms

注意:对于 retrypolicy 策略通过一个接口来让用户自定义实现。

(2) create :创建节点,可选链式项

(3) delete :删除节点,可选链式项

(4) getdata、setdata :读取和修改数据

(5) 异步绑定回调方法 :比如创建节点时绑定一个回调函数,该回调函数可以输出服务器的状态码以及服务器事件类型。还可以加入一个线程池进行优化操作。

(6) getchildren :读取子节点方法

(7) checkexists :判断节点是否存在方法

示例

(1) 环境准备

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>

(2) Curator 操作 Zookeeper

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; /**
* @author: leigang
* @version: 2018-04-06
*/
public class ZkCuratorBase { private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SEESION_OUTTIME = 5 * 1000; @Test
public void test() throws Exception {
//1. 重试策略:初试时间为1s,重试10次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10); //2. 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SEESION_OUTTIME)
.retryPolicy(retryPolicy)
.build(); //3. 开启连接
cf.start(); //4. 创建
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath("/curator", "curator".getBytes()); //5. 获取
byte[] data = cf.getData().forPath("/curator");
System.out.println("/curator:" + new String(data)); //6. 修改
cf.setData().forPath("/curator", "curator-xxx".getBytes()); //7. 递归删除
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator"); //8. 回调函数
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework cf, CuratorEvent event) throws Exception {
System.out.println("code:" + event.getResultCode());
System.out.println("type:" + event.getType());
System.out.println("线程为:" + Thread.currentThread().getName());
}
}, pool)
.forPath("/curator", "curator-test".getBytes()); System.out.println("主线程为:" + Thread.currentThread().getName()); Thread.sleep(100 * 1000);
cf.close();
}
}

二、Curator Watcher

  • NodeCacheListener :监听节点的新增、修改操作
  • PathChildrenCacheListener :监听子节点的新增增、修改、别除操作

(1) 环境准备

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>

(2) NodeCacheListener

// 节点本身监听
public class ZkCuratorWatcher1 { private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SEESION_OUTTIME = 5 * 1000; @Test
public void test() throws Exception {
//1. 重试策略:初试时间为1s,重试10次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10); //2. 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SEESION_OUTTIME)
.retryPolicy(retryPolicy)
.build(); //3. 开启连接
cf.start(); //4. 建立一个cache缓存
NodeCache cache = new NodeCache(cf, "/curator", false);
cache.start(true); cache.getListenable().addListener(new NodeCacheListener() {
//触发事件为创建、更新、删除节点
@Override
public void nodeChanged() throws Exception {
ChildData data = cache.getCurrentData();
if (data != null) {
System.out.println("路径为:" + cache.getCurrentData().getPath());
System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
System.out.println("状态为:" + cache.getCurrentData().getStat());
} else {
System.out.println("删除节点");
}
System.out.println("================================================");
}
}); Thread.sleep(1000);
cf.create().forPath("/curator", "123".getBytes()); Thread.sleep(1000);
cf.setData().forPath("/curator", "12344".getBytes()); Thread.sleep(1000);
cf.delete().forPath("/curator");
Thread.sleep(10 * 1000); cf.close();
}
}

(3) PathChildrenCacheListener

// 子节点监听
public class ZkCuratorWatcher2 { private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SEESION_OUTTIME = 5 * 1000; @Test
public void test() throws Exception {
//1. 重试策略:初试时间为1s,重试10次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10); //2. 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SEESION_OUTTIME)
.retryPolicy(retryPolicy)
.build(); //3. 开启连接
cf.start(); //4. 建立一个 PathChildrenCache 缓存,第三个参数为是否接受节点数据内容,如果为 false 则不接受
PathChildrenCache cache = new PathChildrenCache(cf, "/curator", true);
//5. 在初始化的时候就进行缓存监听
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() {
// 监听子节点变量,新建、修改、删除
@Override
public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED:" + event.getData().getPath());
break;
default:
break;
}
}
}); cf.create().creatingParentsIfNeeded().forPath("/curator/c1", "c1".getBytes());
cf.create().creatingParentsIfNeeded().forPath("/curator/c2", "c2".getBytes()); Thread.sleep(1000);
cf.setData().forPath("/curator/c1", "c1-update".getBytes()); cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator"); cf.close();
}
}

三、Curator 场景应用

(一)分布式锁

在分布式场景中,我们为了保证数据的一性,经常在程序运行的某一个点需要进行同步操作(java 可提供 synchronized 或者 Reentrantlock 实现)比如我们看一个小示例,这个示例会出现分布式不同步的问题:因为我们之前所说的是在高并发下访问一个程序,现在我们则是在高并发下访问多个服务器节点(分布式)。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock; /**
* ReentrantLock
* @author: leigang
* @version: 2018-04-06
*/
public class Lock1 { private static ReentrantLock reentrantLock = new ReentrantLock();
private static int count = 10; public static void genarNo() {
try {
reentrantLock.lock();
count--;
System.out.println(count);
} finally {
reentrantLock.unlock();
}
} public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
genarNo();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
} Thread.sleep(1000);
countDownLatch.countDown();
}
}

我们使用 Curator 基于 zookeeper 的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper 本身的分布式是有写问题的,这里强烈推荐使用 Curator 的分布式锁!

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock; /**
* 由 Zookeeper 实现的分布式锁
* @author: leigang
* @version: 2018-04-06
*/
public class Lock2 { private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SEESION_OUTTIME = 5 * 1000; public static CuratorFramework createCuratorFramework() {
//1. 重试策略:初试时间为1s,重试10次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10); //2. 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SEESION_OUTTIME)
.retryPolicy(retryPolicy)
.build(); return cf;
} public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework cf = createCuratorFramework();
cf.start();
// 由 Zookeeper 实现的分布式锁
InterProcessMutex lock = new InterProcessMutex(cf, "/curator");
//ReentrantLock reentrantLock = new ReentrantLock(); try {
countDownLatch.await();
lock.acquire();
//reentrantLock.lock();
genarNo();
System.out.println(Thread.currentThread().getName() + "执行业务逻辑...");
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放
lock.release();
//reentrantLock.unlock();
} catch (Exception e) {
;
}
}
}
}).start();
} Thread.sleep(1000);
countDownLatch.countDown();
} private static int count = 10; public static void genarNo() {
System.out.println(--count);
}
}

(二)分布式计数器

分布式计数器功能一说到分布式计数器,你可能脑海里想到了 AtomicInteger 这种经典的方式如果针对于一个 JVM 的场景当然没有问题,但是我们现在是分布式场景下,就需要利用 Curator 框架的 DistributedAtomiclnteger 了。

public class ZkCuratorAtomicInteger {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SEESION_OUTTIME = 5 * 1000; @Test
public void test() throws Exception {
//1. 重试策略:初试时间为1s,重试10次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10); //2. 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SEESION_OUTTIME)
.retryPolicy(retryPolicy)
.build(); //3. 开启连接
cf.start(); //4. 使用 DistributedAtomicInteger
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/curator",
new RetryNTimes(3, 1000)); atomicInteger.forceSet(0);
atomicInteger.increment();
AtomicValue<Integer> value = atomicInteger.get(); System.out.println(value.succeeded());
System.out.println(value.postValue()); //最新值????????
System.out.println(value.preValue()); //原始值????????
}
}

(三)Barrier

public class ZkCuratorBarrier1 {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SEESION_OUTTIME = 5 * 1000; public static CuratorFramework createCuratorFramework() {
//1. 重试策略:初试时间为1s,重试10次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10); //2. 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SEESION_OUTTIME)
.retryPolicy(retryPolicy)
.build(); return cf;
} @Test
public void test() throws InterruptedException {
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
CuratorFramework cf = createCuratorFramework();
cf.start(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/barrier", 5);
Thread.sleep(1000 * (new Random().nextInt(3)));
System.out.println(Thread.currentThread().getName() + "已经准备"); barrier.enter();
System.out.println("同时开始运行...");
Thread.sleep(1000 * (new Random().nextInt(3)));
System.out.println(Thread.currentThread().getName() + "运行完毕");
barrier.leave();
System.out.println("同时退出运行...");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t" + i).start(); } Thread.sleep(100 * 1000);
}
}

运行结果:

t1已经准备
t3已经准备
t2已经准备
t4已经准备
同时开始运行...
同时开始运行...
同时开始运行...
同时开始运行...
同时开始运行...
t3运行完毕
t4运行完毕
t0运行完毕
t2运行完毕
t1运行完毕
同时退出运行...
同时退出运行...
同时退出运行...
同时退出运行...
同时退出运行...

(四)Cluster

(1) Zookeeper 监听器,用于监听子节点的变化:

public class ZkCuratorWatcher {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SEESION_OUTTIME = 5 * 1000; public ZkCuratorWatcher() {
try {
//1. 重试策略:初试时间为1s,重试10次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10); //2. 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SEESION_OUTTIME)
.retryPolicy(retryPolicy)
.build(); //3. 开启连接
cf.start(); //4. 创建根节点
if (cf.checkExists().forPath("/cluster") == null) {
cf.create().withMode(CreateMode.PERSISTENT).forPath("/cluster", "cluster".getBytes());
} //5. 建立一个 PathChildrenCache 缓存,第三个参数为是否接受节点数据内容,如果为 false 则不接受
PathChildrenCache cache = new PathChildrenCache(cf, "/cluster", true);
//6. 在初始化的时候就进行缓存监听
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() {
// 监听子节点变量,新建、修改、删除
@Override
public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED:" + event.getData().getPath());
break;
default:
break;
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}

(2) 启动两个客户端:

public class Client1 {
@Test
public void test() throws InterruptedException {
ZkCuratorWatcher zkCuratorWatcher = new ZkCuratorWatcher();
System.out.println(this.getClass().getSimpleName() + " start...");
Thread.sleep(Integer.MAX_VALUE);
}
} public class Client2 {
@Test
public void test() throws InterruptedException {
ZkCuratorWatcher zkCuratorWatcher = new ZkCuratorWatcher();
System.out.println(this.getClass().getSimpleName() + " start...");
Thread.sleep(Integer.MAX_VALUE);
}
}

(3) 第三个客户端用于修改 Zookeeper 的节点,观察 Client1 和 Client2 是否监听到了节点的改变:

public class Test {

    private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SEESION_OUTTIME = 5 * 1000; @org.junit.Test
public void test() throws Exception {
//1. 重试策略:初试时间为1s,重试10次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 10); //2. 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SEESION_OUTTIME)
.retryPolicy(retryPolicy)
.build(); //3. 开启连接
cf.start(); cf.create().creatingParentsIfNeeded().forPath("/cluster/c1", "c1".getBytes());
cf.create().creatingParentsIfNeeded().forPath("/cluster/c2", "c2".getBytes()); Thread.sleep(1000);
cf.setData().forPath("/cluster/c1", "c1-update".getBytes()); cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/cluster/c1");
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/cluster/c2"); cf.close();
}
}

(4) 运行结果,Client1 和 Client2 下都有输出如下结果,说明都监听到了 /cluster 下子节点的添加、修改和删除:

CHILD_ADDED:/cluster/c1
CHILD_ADDED:/cluster/c2
CHILD_UPDATED:/cluster/c1
CHILD_REMOVED:/cluster/c1
CHILD_REMOVED:/cluster/c2

(5) 下面做另外一个测试,在 Zookeeper 下手动创建 /cluster 节点后,再启动 Client1:

[zk: localhost:2181(CONNECTED) 3] ls /cluster
[]
[zk: localhost:2181(CONNECTED) 4] create /cluster/c1 c1
[zk: localhost:2181(CONNECTED) 6] ls /cluster
[c1]

运行 Client1 结果如下:

Client1 start...
CHILD_ADDED:/cluster/c1

以上测试说明 Curator 会自动将触发 CHILD_ADDED 事件,工作中可以用来自注册,不需要手动查询来注册。