服务端处理Watcher
上面主要讲解了客户端注册Watcher的过程,并且已经了解了最终客户端并不会将Watcher对象真正传递到服务端。那么服务端究竟是如何完成客户端的Watcher注册,又是如何来处理这个Watcher的呢?后面我们围绕这两个问题讲解。
ServerCnxn存储
我们首先看下服务端接收Watcher并将其存储起来的过程,如图7-9所示是ZooKeeper服务端处理Watcher的序列图。
图7-9 服务端处理Watcher的时序图
从图7-9我们可以看出,服务端收到来自客户端的请求后,在FinalRequestProcessor.processRequest()中会判断当前请求是否需要注册Watcher:
case OpCode.getData:{从getData请求的处理逻辑中,我们可以看到,当getDataRequest.getWatch()为true的时候,ZooKeeper就认为当前客户端请求需要进行Watcher注册,于是就会将当前的ServerCnxn对象和数据节点路径传入getData方法中去。那么为什么要传入ServerCnxn呢?ServerCnxn是一个ZooKeeper客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接。ServerCnxn接口的默认实现是NIOServerCnxn,同时从3.4.0版本开始,引入了基于Netty的实现:NettyServerCnxn。无论采用哪种实现方式,都实现了Watcher的process接口,因此我们可以把ServerCnxn看作是一个Watcher对象。数据节点的节点路径和ServerCnxn最终会被存储在WatcherManager的watchTable和watch2Paths中。
...
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch()?cnxn:null);
rsp = new GetDataResponse(b,stat);
break;
}
- WatchManager是ZooKeeper服务端Watcher的管理者,其内部管理的watchTable和watch2Pashs两个存储结构,分别从两个维度对Watcher进行存储。
- watchTable是从数据节点路径的粒度来托管Watcher
- watch2Paths是从Watcher的粒度来控制事件触发需要触发的数据节点。
同时,WatchManager还负责Watcher事件的触发,并移除那些已经被触发的Watcher。注意,WatchManager只是一个统称,在服务端,DataTree中会托管两个WatchManager,分别是dataWatches和childWatches,分别对应数据变更Watcher和子节点变更Watcher。在本例中,因为是getData接口,因此会被存储在dataWatches中。
Watcher触发
在上面的讲解中,我们了解了对于标记了Watcher注册的请求,ZooKeeper会将其对应的ServerCnxn存储到WatchManager中,下面我们来看看服务端是如何触发Watcher的。
NodeDataChanged事件的触发条件是“Watcher监听的对应数据节点的数据内容发生变更”,其具体实现如下:
public Stat setData(String path, byte data[] ,int version,long zxid,long time)throws KeeperException.NoNodeException{
Stat s = new Stat();
DataNode n = nodes.get(path);
if(n == null){
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized(n){
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
//...
dataWatches.triggerWatch(path,EventType.NodeDataChanged);
return s;
}
在对指定节点进行数据更新后,通过调用WatchManager的triggerWatch方法来触发相关的事件:
public Set<Watcher> triggerWatch(String path, EventType type){无论是dataWatches何时childWatches管理器,Watcher的触发逻辑都是一致的,基本步骤如下。
return triggerWatch(path,tyep,null);
}
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress){
WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized(this){
watchers = watchTable.remove(path);
//...
//如果不存在Watcher,直接返回
for(Watcher w :watchers){
HashSet<String> paths = watch2Paths.get(w);
if(paths != null){
paths.remove(path);
}
}
}
for(Watcher w : watches){
if(supress != null && supress.contains(w)){
continue;
}
w.process(e);
}
return watches;
}
1.封装WatchedEvent。
首先通知状态(KeeperState)、事件类型(EventType)以及节点路径(Path)封装成一个WatchedEvent对象。
2.查询Watcher。
根据数据节点的节点路径从watchTable中取出对应的Watcher。如果没有找到Watcher,说明没有任何客户端在该数据节点上注册过Watcher,直接退出。而如果找到了这个Watcher,会将其提取出来,同时会直接从watchTable和watch2Paths中将其删除——从这里我们也可以看出,Watcher在服务端是一次性的,即触发一次就失效了。
3.调用process方法来触发Watcher。
在这一步中,会逐个依次地调用从步骤2中找出的所有Water的process方法。那么这里的process方法究竟做了什么呢?在上文中我们已经提到,对于需要注册Watcher的请求,ZooKeeper会把当前请求对应的ServerCnxn作为一个Watcher进行存储,因此,这里的process方法,事实上就是ServerCnxn的对应方法:
public class NIOServerCnxn extends ServerCnxn{
//...
synchronized public void process(WatchedEvent evetn){
ReplyHeader h = new ReplyHeader(-1,-1L,0);
//...
//Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWarpper();
senResponse(h,e,"notification");
}
}
从上面的代码片段中,我们可以看出在process方法中,主要逻辑如下。
- 在请求头中标记“-1”,表明当前是一个通知。
- 将WawtchedEvent包装成WatcherEvent,以便进行网络传输序列化。
- 向客户端发送该通知。
从以上几个步骤中可以看出,ServerCnxn的process方法中的逻辑非常简单,本质上并不是处理客户端Watcher真正的业务逻辑,而是把当前客户端连接的ServerCnxn对象来实现对客户端的WatchedEvent传递,真正的客户端Watcher回调与业务逻辑执行都在客户端。
客户端回调Watcher
上面我们已经讲解到是如何进行Watcher触发的,并且知道了最终服务端会通过使用ServerCnxn对应的TCP连接来向客户端发送一个WatcherEvent事件,下面我们来看看客户端是如何处理这个事件的。
SendThread接收事件通知
首先我们来看下ZooKeeper客户端是如何接收这个客户端事件通知的:
class SendThread extends Thread(
//...
void readResponse(ByteBuffer incomingBuffer) throws IOException{
//...
if(replyHdr.getXid() == -1){
//-1 means notification
//...
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia,"response");
//convert from a server path to a client path
if(chrootPath != null){
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if(serverPath.length()>chrootPath.length())
event.setPath(serverPath.subString(chrootPath.length()));
//...
}
WatchedEvent we = new WatchedEvent(event);
//...
evetnThread.queueEvent(we);
return;
}
})
对于一个来自服务端的响应,客户端都是由SendThread。readResponse(ByteBuffer incomingBuffer)方法来统一进行处理的,如果响应头replyHdr中标识了XID为-1,表明这是一个通知类型的响应。
EventThread处理事件通知
在上文中,我们讲到SendThread接收到服务端的通知事件后,会通过调用EventThread.queueEvent方法将事件传给EventThread线程,其逻辑如下:
public void queueEvent(WatchedEvent event) {
if(event.getType() == EventType.None && sessionState == event.getState()){
return;
}
}
sessionState = event.getState();
//materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(),event.getType(),event.getPaht()),event);
//queue the pair(watch set & event) for later processing
waitingEvents.add(pair);
}queueEvent方法首先会根据该通知事件,从ZKWatchManager中取出所有相关的Watcher:
public Set<Watcher> materialize(Watcher.Event.KeeperState state,Watcher.Event.EventType type,String clientPath){
Set<Watcher> result = new HashSet<Watcher>();
switch(type){
//...
case NodeDataChanged:
case NodeCreated;
synchronized(dataWatches){
addTo(dataWatches.remove(clientPath),result);
}
synchronized(existWatches){
addTo(existWatches.remove(clientPath),result);
}
break;
//...
return result;
}
final private void addTo(Set<Watcher> from ,Set<Watcher> to){
if(from != null){
to.addAll(from);
}
}
}客户端在识别出事件类型EventType后,会从相应的Watcher存储(即dataWatches,existWatches或childWatches中的一个或多个,本例中就是从dataWatches和existWatches两个存储中获取)中去除对应的Watcher。注意,此处使用的是remove接口,因此也表明了客户端的Watcher机制同样也是一次性的,即一旦被触发后,该Watcher就失效了。
获取到相关的额所有Watcher后,会将其放入waitingEvents这个队列中去。WaitingEvents是一个待处理Watcher队列,EventThread的run方法会不断对该队列进行处理:
public void run(){
try{
isRunning = true;
while(true){
Object event = waitingEvents.take();
if(event == eventOfDeath){
wasKilled = true;
}else{
processEvent(event);
}
//...
}
}
}
private void processEvent(Object event){
try{
if(evemt instanceof WatcherSetEventPair){
//each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair)event;
for(Watcher watcher : pair.watchers){
\ try{
watcher.process(pair.event);
}catch(Throwable t){
//....
}
}
}
}
从上面的代码片段中我们可以看出,EventThread线程每次都会从waitingEvents队列中取出一个Watcher,并进行串行同步处理。注意,此处processEvent方法中的Watcher才是之前客户端真正注册的Watcher,调用其process方法就可以实现Watcher的回调了。
Watcher特性总结
到目前为止,相信大家已经了解了ZooKeeper中Watcher机制的相关接口定义以及Watcher的各类事件。同时,我们以ZooKeeper节点的数据内容获取接口为例,从ZooKeeper客户端进行Watcher注册、服务端处理Watcher以及客户端回调Wathcer三方面分阶段讲解了ZooKeeper的Watcher工作机制。
通过上面的内容的讲解,我们不难发现ZooKeeperWatcher具有如下特性。
一次性
从上面的介绍中可以看到,无论是服务端还是客户端,一旦一个Watcher被触发,ZooKeeper都会将其从相应的存储中移除。因此,开发人员在Watcher的使用上要记住的一点是需要反复注册。这样的设计有效地减轻了服务端的压力。试想,如果注册一个Watcher之后一直有效,那么,针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。
客户端串行执行
客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。
轻量
WatchedEvent是ZooKeeper整个Watcher通知机制的最小通知单元,这个数据结构中只包含三部分内容:通知状态、事件类型和节点路径。也就是说,Watcher通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。,例如针对NodeDataChanged事件,ZooKeeper的Watcher只会通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据——这也是ZooKeeper的Watcher机制的一个非常重要的特性。
另外,客户端向服务端注册Watcher的时候,并不会把客户端真实的Watcher对象传递到服务端,仅仅只是在客户端请求中使用boolean类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的ServerCnxn对象。
如此轻量的Watcher机制设计,在网络开销和服务端内存开销上都是非常廉价的。