Zookeeper 系列(三)Zookeeper API
本节首先介绍 Zookeeper 的 Shell 命令,再对 Java 操作 Zookeeper 的三种方式进行讲解,本节先介绍 Zookeeper 的原生 API。
- Zookeeper API:Zookeeper 原生 api
- ZKClient API
- Curator API
一、Shell 命令
启动 Zookeeper 服务之后,输入以下命令,连接到 Zookeeper 服务:
zkCli.sh -server localhost:2181
注意: window 下直接 zkCli 启动,不要带参数,否则会报错,详见zookeeper启动抛出NumberFormatException 异常。
连接成功之后,输入 help 之后,屏幕会输出可用的 Zookeeper 命令,如下所示:
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
(1) 查询 ls
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /zookeeper
[quota]
(2) 创建新的 Znode 节点 create
[zk: localhost:2181(CONNECTED) 7] create /date 2018-04-05
Created /date
[zk: localhost:2181(CONNECTED) 8] ls /
[date, zookeeper]
(3) 获取 Znode 节点 get
[zk: localhost:2181(CONNECTED) 9] get /date
2018-04-05
cZxid = 0x7
ctime = Thu Apr 05 14:21:20 CST 2018
mZxid = 0x7
mtime = Thu Apr 05 14:21:20 CST 2018
pZxid = 0x7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0
(4) 修改 Znode 节点 set
set /date 2018-04-06
(4) 删除 Znode 节点 delete/rmr
delete /date # 不能删除非空目录
rmr /date # 递归删除
二、Zookeeper API
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.0-alpha</version>
</dependency>
示例:
package com.github.binarylei.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author: leigang
* @version: 2018-04-05
*/
public class ZookeeperBase {
/** zookeeper 地址,多个用 , 隔开 */
static final String CONNECT_ADDR = "127.0.0.1";
/** session 超时时间,单位:ms */
static final int SESSION_OUTTIME = 5000;
/** 阻塞程序执行,用于等待zookeeper 连接成功,发送成功信号 */
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zooKeeper = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() { // (1)
@Override
public void process(WatchedEvent event) {
// 获取事件的状态
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
// 如果是建立连接
if (state == Event.KeeperState.SyncConnected) {
if (type == Event.EventType.None) {
// 如果连接建立成功,则发送信号,让后续阻塞程序向下执行
connectedSemaphore.countDown();
System.out.println("zookeeper 连接建立");
}
}
}
});
// 进行阻塞,等待与 Zookeeper 的连接建立完成
connectedSemaphore.await();
System.out.println("============================");
//1. 创建节点
zooKeeper.create( // (2)
"/testRoot", // 节点路径,不允许递归创建节点
"testRoot".getBytes(), // 节点内容
ZooDefs.Ids.OPEN_ACL_UNSAFE, // 节点权限,一般情况下不用关注
CreateMode.PERSISTENT); // 节点类型
//2. 获取节点
byte[] data = zooKeeper.getData("/testRoot", false, null);
System.out.println("获取节点:" + new String(data));
//3. 获取子节点
List<String> nodes = zooKeeper.getChildren("/", false);
for (String node : nodes) {
System.out.println("获取" + node + "子节点:" +
new String(zooKeeper.getData("/" + node, false, null)));
}
//4. 修改节点的值
Stat stat = zooKeeper.setData("/testRoot", "111".getBytes(), -1);
//5. 判断节点是否存在
System.out.println("节点是否存在:" + zooKeeper.exists("/testRoot", false));
//6. 删除节点,不支持递归删除
zooKeeper.delete("/testRoot", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("响应码:" + rc);
System.out.println("路径:" + path);
System.out.println("上下文:" + ctx);
}
}, 1);
zooKeeper.close();
}
}
(1) 创建会话方法 :客户端可以通过创建一个 Zookeeper 实例来连接 Zookeeper 服务器。ZooKeeper 构造方法 有 4 个,参数说明如下:
- connectstring :连接服务器列表,已","分割。
- sessiontimeout :心跳检测时间周期期(秒)
- wather :事件处理通知器。
- canbereadonly :标识当前会话是否支持只读。
- session 和 sessionpasswd :提供连接 zookeeper 的 session 和密码,通过这俩个确定唯一台客户端,目的是可以提供重复会话。
注意: Zookeeper 客户端和服务器端会话的建立是一个异步的过程 ,程序方法在处理完客户端初始化后立即返回,也就是说程序继续往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的生命周期处于 "SyncConnected" 时才算真正建立完毕。解决方案是使用 CountDownLatch 阻塞,起到连接建立完成。
(2) 创建节点 :
create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
- path :节点路径,/nodeName。不允许递归创建节点。
- data :节点内容,要求类型是字节数组。
- acl :节点权限,一般使用 ZooDefs.Ids.OPEN_ACL_UNSAFE 即可。
- createMode :节点类型。PERSISTENT(持久节点)、PERSISTENT_SEQUENTIAL(持久顺序节点)、EPHEMERAL(临时节点)、EPHEMERAL_SEQUENTIAL(临时顺序节点)。
(3) 获取节点 :
getData(String path, boolean watch, Stat stat)
(4) 获取子节点 :
getChildren(String path, boolean watch)
(5) 修改节点的值 :
# version = -1 表示全部的历史版本,一般使用 -1 即可
setData(final String path, byte data[], int version)
(6) 判断节点是否存在 :
exists(String path, boolean watch)
(7) 删除节点 :
delete(final String path, int version)
(8) 所有的节点都有同步和异步区分,以delete为例 :
// 参数分别为:路径,版本(-1即可),回调函数,上下文环境
zooKeeper.delete("/testRoot", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("响应码:" + rc);
System.out.println("路径:" + path);
System.out.println("上下文:" + ctx);
}
}, 1);
注册一个异步回调函数,要实现 AsyncCallback.VoidCallback 接口,重写 processResult(int rc, String path, Object ctx) 方法,当节点创建完毕后执行此方法。
- rc :为服务端响应码 0(调用成功)、-4(端口连接)、-110(指定节点存在)、-112(会话已经过期)。
- path :接口调用时传入 API 的数据节点的路径参数
- ctx :为调用接口传入 API 的 ctx 值
三、Zookeeper Watcher
3.1 watcher 的概念
Zookeeper Watcher 事件触发机制详见 Watch 触发器。当 watch 监视的数据发生变化时,通知设置了该 watch 的 client,即 watcher。
同样,其 watcher 是监听数据发送了某些变化,那就一定会有对应的事件类型,和状态类型。
(1) 事件类型(znode节点相关):
- EventType.None :客户端连接成功
- EventType.NodeCreated :节点创建
- EventType.NodeDataChanged :节点变更
- EventType.NodeChildrenChanged :子节点变量
- EventType.NodeDeleted :节点删除
(2) 状态类型(客户端状态):
- KeeperState.Disconnected :客户端连接断开
- KeeperState.SyncConnected :客户端连接成功
- KeeperState.AuthFailed :客户端认证失败
- KeeperState.Expired :客户端连接过期
3.2 watcher 的特性
Watcher 的特性:一次性、客户端串行执行、轻量。
(1) 一次性
对于 ZK 的 watcher,你只需要记住一点: zookeeper 有 watch事件,是一次性触发的,当 watch 监视的数据发生变化时,通知设置了该 watch 的 client,即 watcher,由于 zookeeper 的监控都是一次性的所以每次必须设置监控。
(2) 客户端串行执行
客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时需要开发人员注意一点,千万不要因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调。
(3) 轻量
Watched Event 是 Zookeeper 整个 Watcher 通知机制的最小通知单元,整个结构只包含三部分:通知状态、事件类型和节点路径。也就是说 Watcher 通知非常的简单,只会告诉客户端发生了事件而不会告知其具体内容,需要客户自己去进行获取,比如 NodeDataChanged 事件, Zookeeper 只会通知客户端指定节点的数据发生了变更,而不会直接提供具体的数据内容。
我们通过一个示例,详细学习下 Watcher 的概念和其目的。
3.3 Watcher 示例
package com.github.binarylei.zookeeper.watcher;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
/**
* @author: leigang
* @version: 2018-04-05
*/
public class ConnectionWatcher implements Watcher, Closeable {
private static final int SESSION_TIMEOUT = 5000;
/** 定义原子变量,用于记录watcher数 */
private AtomicInteger seq = new AtomicInteger();
protected ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public void connect(String host) {
this.close();
try {
this.zk = new ZooKeeper(host, SESSION_TIMEOUT, this);
this.connectedSignal.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
if (event == null) {
return;
}
// 连接状态
KeeperState keeperState = event.getState();
// 事件类型
EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();
String logPrefix = "【Watcher-" + seq.incrementAndGet() + "】";
if(keeperState == KeeperState.SyncConnected) {
// 成功连接上服务器
if (eventType == EventType.None) {
System.out.println(logPrefix + "成功连接上服务器");
this.connectedSignal.countDown();
}
// 节点创建
else if (eventType == EventType.NodeCreated) {
System.out.println(logPrefix + "节点创建:" + path);
}
// 节点数据更新
else if (eventType == EventType.NodeDataChanged) {
System.out.println(logPrefix + "节点数据更新:"+ path);
}
// 节点删除
else if (eventType == EventType.NodeDeleted) {
System.out.println(logPrefix + "节点删除:" + path);
}
} else if (keeperState == KeeperState.Disconnected) {
System.out.println(logPrefix + "连接断开");
} else if (keeperState == KeeperState.AuthFailed) {
System.out.println(logPrefix + "权限认证失败");
} else if (keeperState == KeeperState.Expired) {
System.out.println(logPrefix + "连接过期");
}
}
// 关闭连接
public void close() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
}
public class ConnectionWatcherTest {
@Test
public void test() throws KeeperException, InterruptedException {
ConnectionWatcher zkWatcher = new ConnectionWatcher();
zkWatcher.connect("127.0.0.1:2181");
ZooKeeper zk = zkWatcher.getZk();
zk.create("/date", String.valueOf(System.currentTimeMillis()).getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//1. 注册watch事件
zk.exists("/date", true);
zk.setData("/date", String.valueOf(System.currentTimeMillis()).getBytes(), -1);
//2. watch的触发是一次性的,要想再次触发必须重新注册
zk.exists("/date", true);
zk.setData("/date", String.valueOf(System.currentTimeMillis()).getBytes(), -1);
zk.delete("/date", -1);
zkWatcher.close();
}
}
测试结果如下:
【Watcher-1】成功连接上服务器
【Watcher-2】节点数据更新:/date
# 当第二个 zk.exists("/date", true); 注释后,【Watcher-3】就不会触发了
【Watcher-3】节点数据更新:/date
四、Zookeeper 安全机制
ACL(Access Control List), Zookeeper 作为分布式协调框架,其内部存储的都是一些关乎分布式系统运行时状态的元数据,尤其是设计到分布式锁、 Master 选举和协调等应用场景。我们需要有效地保障 Zookeeper 中的数据安全,Zookeeper 提供一套完善的 ACL 权限控制机酮来保障数据的安全。
Zookeeper 提供了三种模式。权限模式、授权对象、权限。
(1) 权限模式: Scheme,开发人员最多使用的如下四种权限模式:
- IP :ip 模式通过 ip 地址粒度来进行控制权限,例如配置了 192.168.1.107 即表示权限控都针对这个 ip 地址的,同时也支持按网段分配,比如 192.168.1.*
- Diges :digest 是最常用的权限控制模式 ,也更符合我们对权限控制的认识,其类似于 "username: password" 形式的权限标识进行权限配置。zK 会对形成的权限标识先后进行俩次编码处理,分别是 SHA-1 加密算法、BASE64 编码。
- World :World 是一直最开放的权限控制模式。这种模式可以看做为特殊的 Digest。他仅仅是一个标识而已。
- Super :超级用户模式_在超级用户模式下可以对 ZK 任意进行操作。
(2) 权限对象:指的是权限赋予的用户或者一个指定的实体,例如 ip 地址或机器等。在不的模式下,授权对象是不同的。这种模式和权限对象一一对应。
(3) 权限:权限就是指那些通过权限检测后可以被允许执行的操作,在 ZK 中,对数据的操作权限分为以下五大类:
CREATE、DELETE、READ、WRITE、ADMIN
4.1 代码示例
// 修改connect连接,在实例化ZooKeeper后,添加认证信息
public class ZookeeperAuth implements Watcher, Closeable {
public void connect(String host) {
connect(host, null);
}
// 添加认证
public void connect(String host, String password) {
this.close();
try {
this.zk = new ZooKeeper(host, SESSION_TIMEOUT, this);
if (password != null) {
this.zk.addAuthInfo("digest", password.getBytes());
}
this.connectedSignal.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试代码如下:
public class ZookeeperAuthTest{
@Test
public void test1() throws KeeperException, InterruptedException { // (1)
ZookeeperAuth zkAuth = new ZookeeperAuth();
zkAuth.connect("127.0.0.1", "123456");
ZooKeeper zk = zkAuth.getZk();
zk.create("/testAuth", "test".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
zkAuth.close();
}
@Test
public void test2() throws KeeperException, InterruptedException { // (2)
ZookeeperAuth zkAuth = new ZookeeperAuth();
zkAuth.connect("127.0.0.1:2181", "1234568");
ZooKeeper zk = zkAuth.getZk();
byte[] data = zk.getData("/testAuth", false, null);
System.out.println(new String(data));
zkAuth.close();
}
}
(1) Client-1 使用密码 123456 创建一个连接,并创建一个 znode 节点 /testAuth,注意创建节点时权限使用 ZooDefs.Ids.CREATOR_ALL_ACL
(2) Client-2 使用密码 1234568 创建一个连接,并打算修改节点 /testAuth ,结果出现权限不足的错误。
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /testAuth
at org.apache.zookeeper.KeeperException.create(KeeperException.java:117)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1611)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1640)
at com.github.binarylei.zookeeper.auth.ZookeeperAuthTest.test2(ZookeeperAuthTest.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)