Zookeeper(四)Java客户端API使用

时间:2022-11-06 08:24:53



ZooKeeper作为一个分布式服务框架,主要用来解决分布式数据一致性问题,它提供了简单的分布式原语,并且对多种编程语言提供了API。下面我们重点来看下ZooKeeper的Java客户端API使用方式。在此之前,请在博文视点网站(http://www.broadview.com.cn)的“下载专区”下载源代码:ZooKeeper、本届的所有样例代码都在book.chapter05包中。

1.创建会话

客户端可以通过创建一个Zookeeper(org.apache.zookeeper.ZooKeeper)实例来连接ZooKeeper服务器。ZooKeeper的4种构造方法如下。

ZooKeeper的构造方法:

ZooKeeper(String connectString, int sesssionTimeout, Watcher watcher);

ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly);

ZooKeeper(String connectString,int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd);

ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd,boolean canBeReadOnly);

使用任意一个构造方法都可以顺利完成与ZooKeeper服务器的会话(Session)创建,下面列出了对每个参数的说明。

清单5-1

参数名 说明
connectString 指ZooKeeper服务器列表,由英文状态逗号host:port字符串组成,每一个都代表一台ZooKeeper机器,例如,192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181,这样就为客户端指定了三台服务器地址。另外,也可以在connectString中设置客户端连接上ZooKeeper后的根目录,方法是在host:port字符串之后添加上这个根目录,例如:192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181/zk-book,这样就指定了该客户端连接上ZooKeeper服务器之后,所有对ZooKeeper的操作,都会基于这个根目录。例如,客户端对/foo/bar的操作,都会指向节点/zk-book/foo/bar——这个目录也叫Chroot,即客户端隔离命名空间,关于ZooKeeper中Chrooot的用法和左右,将在后面详细解释
 sessionTimeout 指会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务端之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。关于ZooKeeper的会话和心跳检测后面做详细解释
watcher 在4.1.3节中我们已经提到了ZooKeeper中Watcher的概念。ZooKeeper允许客户端在构造方法中传入一个接口Watcher(org.apache.zookeeper.Watcher)的实现类对象来作为默认的Watcher事件通知处理器。当然,该参数可以设置为null以表明不需要设置默认的Watcher处理器。关于ZooKeeper的Watcher机制和实现原理后面会详细解释
canBeReadOnly 这是一个boolean类型的参数,用于标识当前会话是否支持“read-only”模式。默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请求)。但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我们还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供)——这就是ZooKeeper的“read-only”模式。
sessionId和sessionPasswd 分别代表会话ID和会话秘钥。这两个参数能够唯一确定一个会话,同时客户端使用这两个参数实现客户端会话复用,从而达到恢复会话的效果,具体使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象实现的一下两个接口,即可获得当前会话的ID和秘钥:
long getSessionId(); byte[] getSessionPasswd();
获取到这两个参数值后,就可以在下次创建ZooKeeper对象实例的时候传入构造方法了
注意,ZooKeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理客户端初始化工作后立即返回,在大多说情况下,此时并名优真正建立好一个可用的会话,在会话的生命周期中处于“CONNECTING”的状态。

当该会话真正创建完毕后,ZooKeeper服务端会向会话对应客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话。

该构造方法内部实现了与ZooKeeper服务器之间的TCP连接创建,负责维护客户端会话的生命周期,现在暂不对这些细节做更多详解,关于ZooKeeper客户端与服务端之间的连接创建过程及其内部原理,将在后面详细介绍。


创建一个最基本的ZooKeeper会话实例

Package book.chapter05.$5_3_1
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatcherEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
//Chapter:5.3.1 Java API->创建连接->创建一个最基本的ZooKeeper会话实例
public class ZooKeeper_Constructor_Usage_Simple implements Watcher{
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181",5000,//
new ZooKeeper_Constructor_Usage_simple());
System.out.println(zookeeper.getState());
try{
connectedSemaphore.await();
}catch(InterruptedException e){}
System.out.println("ZooKeeper session established.");
}
public void process(WatchedEvent event){
System.out.println("Receive watched event:"+event);
if(KeeperState.SyncConnected == event.getState()){
connectedSemaphore.countDown();
}
}
}
运行程序,输出结果如下:

CONNECTING

Receive watched event:WatchedEvent state:SyncConnected type:None path:null ZooKeeper session established.

在上面这个程序片段中,我们使用第一种构造方法(ZooKeeper(String connectString,int sessionTimeout,Watcher watcher))来实例化了一个ZooKeeper对象,从而建立了会话。

另外,ZooKeeper_Constructor_Usage类实现了Watcher接口,重写了process方法,该方法负责处理来自ZooKeeper服务端的Watcher通知,在收到服务端发来的SyncConnected事件后,解除主程序在CountDownLatch上的等待阻塞。至此,客户端会话创建完毕。

创建一个复用sessionId和sessionPasswd的ZooKeeper对象实例

在清单5-1中列出的ZooKeeper客户端构造方法中,我们看到ZooKeeper构造方法允许传入sessionId和sessionPasswd——客户端传入sessionId和sessionPasswd的目的是为了复用会话,以维持之前会话的有效性。清单5-3是一个复用sessionId和sessionPasswd来创建ZooKeeper对象实例的示例。

package book.chapter;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
//创建一个最基本的ZooKeeper对象实例,复用sessionId和sessionPasswd
public class ZooKeeper_Constructor_Usage_With_SID_PASSWD implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) {
ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181",5000,//
new ZooKeeper_Constructor_Usage_With_SID_PASSWD());
connectedSemaphore.await();
long sessionId = zookeeper.getSessionId();
byte[] passwd = zookeeper.getSessionPasswd();

//Use illegal sessionId and sessionPassWd
zookeeper = new ZooKeeper("domain1.book.zookeeper:2181",5000,//
new ZooKeeper_Constructor_Usage_With_SID_PASSWD(),//
1l,//
"test".getBytes());
//Use correct sessionId and sessionPassWd
zookeeper = new ZooKeeper("domain1.book.zookeeper:2181",5000,//
new ZooKeeper_Constructor_Usage_With_SID_PASSWD(),//
sessionId,//
passwd);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event){
System.out.println("Receive watched event:"+event);
if(KeeperState.SyncConnected == event.getState()){
connectedSemaphore.countDown();
}
}

}

运行程序,输出结果如下:

Receive watched event:WatchedEvent state:SyncConnected type:None path:null

Receive watched event:WatchedEvent state:Expired tyep:None path:null

Receive watched event:WatchedEvent state:SyncConnected type:None path:null

从上面这个示例程序和结果输出中,我们可以看出,第一次使用了错误的sessionId和sessionPasswd来创建ZooKeeper客户端的实例,结果客户端接收到了服务端的Expired事件通知;而第二次则使用正确的sessionId和sessionPasswd来创建客户端的实例,结果连接成功。


2.创建节点

客户端可以通过ZooKeeper的API来创建一个数据节点,有如下两个接口:

String create(final String path,byte data[],List<ACL> acl,CreateMode createMode)

void create(final String path,byte date[],List<ACL> acl,CreateMode createMode,StringCallback cb,Object ctx)

这两个接口分别以同步和异步方式创建节点,API方法的参数说明如表5-3所示。

表5-3 ZooKeeper create API方法参数说明

参数名 说明
path 需要创建的数据节点的节点路径,录入,/zk-book/foo
data[] 一个字节数组,是节点创建后的初始内容
acl 节点的ACL策略
createMode 节点类型,是一个枚举类型,通常有4种可选的节点类型:
  • 持久(PERSISTENT)
  • 持久顺序(PERSISTENT_SEQUENTIAL)
  • 临时(EPHEMERAL)
  • 临时顺序(EPHEMERAL_SEQUENTIAL)
关于ZNode的节点特性,将在后面做详解介绍
cb 注册一个异步回调函数。开发人员需要实现StringCallBack接口,主要是对下面这个方法的重写:
void processResult(int rc,String path,Object ctx,String name);
当服务端节点创建完毕后,ZooKeeper客户端就会自动调用这个方法,这样就可以处理相关的业务逻辑了
ctx 用于传递一个对象,可以在毁掉方法执行的时候使用,通常是一个上下文(Context)信息
需要注意几点,无论是同步还是异步接口,ZooKeeper都不支持递归创建,即无法再父节点不存在的情况下创建一个子节点。另外,如果一个节点已经存在了,那么创建同名节点的时候,会抛出NodeExistException异常。

目前ZooKeeper的节点内容只支持字节数组(byte[])类型,也就是说,ZooKeeper不负责为节点内容进行序列化,开发人员需要自己使用序列化工具将节点内容进行序列化和反序列化。对于字符串,可以简单地使用“String”.getBytes()来生成一个字节数组;对于其他复杂对象,可以使用Hessian或是Kryo等专门的序列化工具来进行序列化。

关于权限控制,如果你的应用场景没有太高的权限要求,那么可以不关注这个参数,只需要在acl参数中传入参数Ids.OPEN_ACL_UNSAFE,这就表明之后对这个节点的任何操作都不受权限控制。关于ZooKeeper的权限控制,将在后面做纤细介绍。

由于创建,删除节点等操作代码与介绍篇幅较多,下一节里我们一起探讨