转载请注明:@ni掌柜
本文重点围绕ZooKeeper的Watcher,介绍通知的状态类型和事件类型,以及这些事件通知的触发条件。
1、浅谈Watcher接口
在ZooKeeper中,接口类Watcher定义了事件通知相关的逻辑,包含了KeeperState和EventType两个枚举类,分别代表通知状态和事件类型。还有一个比较重要的接口方法:
- abstract public void process(WatchedEvent event);
这个方法用于处理事件通知,每个实现类都应该自己实现合适的处理逻辑。参数WatchedEvent类封装了上面提到的两个枚举类,以及触发事件对应的ZK节点path,当然,这个path不一定每次通知都有,例如会话建立,会话失效或连接断开等通知类型,就不是针对某一个单独path的。
2、如何注册Watcher上面已经提到,Watcher接口已经提供了基本的回调方法用于处理来自服务器的通知。因此,我们只要在合适的地方实现这个接口,并传给服务器即可。下面来看看哪些是合适的地方: A、构造方法上面这个是ZooKeeper的一个构造方法,与ZK创建连接的时候会用到这个。这里我们重点关注第三个参数:Watcher,很显然在,这个就是一个注册Watcher的地方,传入的参数就是开发者自己Watcher接口实现。需要注意的是,这个地方注册的Watcher实现,会成为当前ZK会话的默认Watcher实现。也就是说,其它地方如果也想注册一个Watcher,那么是可以默认使用这个实现的。具体下面会涉及到。B、API的读写接口中
- ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
- public Stat exists(String path, boolean watch)throws KeeperException, InterruptedException
- public List<String> getChildren(String path, boolean watch)throws KeeperException,InterruptedException
- public byte[] getData(String path,boolean watch,Stat stat)throws KeeperException,InterruptedException
- public void register(Watcher watcher)
3、通知的状态类型与事件类型
在Watcher接口类中,已经定义了所有的状态类型和事件类型,这里把各个状态和事件类型之间的关系整理一下。
3.1状态:KeeperState.Disconnected(0)
此时客户端处于断开连接状态,和ZK集群都没有建立连接。
3.1.1事件:EventType.None(-1)
触发条件:一般是在与服务器断开连接的时候,客户端会收到这个事件。
3.2状态:KeeperState. SyncConnected(3)
3.2.1事件:EventType.None(-1)
触发条件:客户端与服务器成功建立会话之后,会收到这个通知。
3.2.2事件:EventType. NodeCreated (1)
触发条件:所关注的节点被创建。
3.2.3事件:EventType. NodeDeleted (2)
触发条件:所关注的节点被删除。
3.2.4事件:EventType. NodeDataChanged (3)
触发条件:所关注的节点的内容有更新。注意,这个地方说的内容是指数据的版本号dataVersion。因此,即使使用相同的数据内容来更新,还是会收到这个事件通知的。无论如何,调用了更新接口,就一定会更新dataVersion的。
3.2.5事件:EventType. NodeChildrenChanged (4)
触发条件:所关注的节点的子节点有变化。这里说的变化是指子节点的个数和组成,具体到子节点内容的变化是不会通知的。
3.3状态 KeeperState. AuthFailed(4)
3.3.1事件:EventType.None(-1)
3.4状态 KeeperState. Expired(-112)
3.4.1事件:EventType.None(-1)
具体代码如下:
package com.taobao.taokeeper.research.watcher;
import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.PropertyConfigurator;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import common.toolkit.java.util.ObjectUtil;import common.toolkit.java.util.ThreadUtil;
/** * 《ZooKeeper 事件类型详解》 * @author nileader/nileader@gmail.com * */public class AllZooKeeperWatcher implements Watcher{
private static final Logger LOG = LoggerFactory.getLogger( NodeDataChangedEvent.class );AtomicInteger seq = new AtomicInteger();private static final int SESSION_TIMEOUT = 10000;private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181," + "test.zookeeper.connection_string2:2181," + "test.zookeeper.connection_string3:2181";private static final String ZK_PATH = "/nileader";private static final String CHILDREN_PATH = "/nileader/ch";private static final String LOG_PREFIX_OF_MAIN = "【Main】";
private ZooKeeper zk = null;
private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
/** * 创建ZK连接 * @param connectString ZK服务器地址列表 * @param sessionTimeout Session超时时间 */public void createConnection( String connectString, int sessionTimeout ) {this.releaseConnection();try {zk = new ZooKeeper( connectString, sessionTimeout,this );LOG.info( LOG_PREFIX_OF_MAIN + "开始连接ZK服务器" );connectedSemaphore.await();} catch ( Exception e ) {}}
/** * 关闭ZK连接 */public void releaseConnection() {if ( !ObjectUtil.isBlank( this.zk ) ) {try {this.zk.close();} catch ( InterruptedException e ) {}}}
/** * 创建节点 * @param path 节点path * @param data 初始数据内容 * @return */public boolean createPath( String path, String data ) {try {this.zk.exists( path, true );LOG.info( LOG_PREFIX_OF_MAIN + "节点创建成功, Path: "+ this.zk.create( path, // data.getBytes(), // Ids.OPEN_ACL_UNSAFE, // CreateMode.PERSISTENT )+ ", content: " + data );} catch ( Exception e ) {}return true;}
/** * 读取指定节点数据内容 * @param path 节点path * @return */public String readData( String path, boolean needWatch ) {try {return new String( this.zk.getData( path, needWatch, null ) );} catch ( Exception e ) {return "";}}
/** * 更新指定节点数据内容 * @param path 节点path * @param data 数据内容 * @return */public boolean writeData( String path, String data ) {try {LOG.info( LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " + this.zk.setData( path, data.getBytes(), -1 ) );} catch ( Exception e ) {}return false;}
/** * 删除指定节点 * @param path 节点path */public void deleteNode( String path ) {try {this.zk.delete( path, -1 );LOG.info( LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path );} catch ( Exception e ) {//TODO}}
/** * 删除指定节点 * @param path 节点path */public Stat exists( String path, boolean needWatch ) {try {return this.zk.exists( path, needWatch );} catch ( Exception e ) {return null;}}
/** * 获取子节点 * @param path 节点path */private List<String> getChildren( String path, boolean needWatch ) {try {return this.zk.getChildren( path, needWatch );} catch ( Exception e ) {return null;}}
public void deleteAllTestPath(){this.deleteNode( CHILDREN_PATH );this.deleteNode( ZK_PATH );}
public static void main( String[] args ) {
PropertyConfigurator.configure("src/main/resources/log4j.properties");
AllZooKeeperWatcher sample = new AllZooKeeperWatcher();sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT );//清理节点sample.deleteAllTestPath();if ( sample.createPath( ZK_PATH, System.currentTimeMillis()+"" ) ) {ThreadUtil.sleep( 3000 );//读取数据sample.readData( ZK_PATH, true );//读取子节点sample.getChildren( ZK_PATH, true );
//更新数据sample.writeData( ZK_PATH, System.currentTimeMillis()+"" );ThreadUtil.sleep( 3000 );//创建子节点sample.createPath( CHILDREN_PATH, System.currentTimeMillis()+"" );}ThreadUtil.sleep( 3000 );//清理节点sample.deleteAllTestPath();ThreadUtil.sleep( 3000 );sample.releaseConnection();}
/** * 收到来自Server的Watcher通知后的处理。 */@Overridepublic void process( WatchedEvent event ) {
ThreadUtil.sleep( 200 );if ( ObjectUtil.isBlank( event ) ) {return;}// 连接状态KeeperState keeperState = event.getState();// 事件类型EventType eventType = event.getType();// 受影响的pathString path = event.getPath();String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
LOG.info( logPrefix + "收到Watcher通知" );LOG.info( logPrefix + "连接状态:\t" + keeperState.toString() );LOG.info( logPrefix + "事件类型:\t" + eventType.toString() );
if ( KeeperState.SyncConnected == keeperState ) {// 成功连接上ZK服务器if ( EventType.None == eventType ) {LOG.info( logPrefix + "成功连接上ZK服务器" );connectedSemaphore.countDown();} else if ( EventType.NodeCreated == eventType ) {LOG.info( logPrefix + "节点创建" );this.exists( path, true );} else if ( EventType.NodeDataChanged == eventType ) {LOG.info( logPrefix + "节点数据更新" );LOG.info( logPrefix + "数据内容: " + this.readData( ZK_PATH, true ) );} else if ( EventType.NodeChildrenChanged == eventType ) {LOG.info( logPrefix + "子节点变更" );LOG.info( logPrefix + "子节点列表:" + this.getChildren( ZK_PATH, true ) );} else if ( EventType.NodeDeleted == eventType ) {LOG.info( logPrefix + "节点 " + path + " 被删除" );}
} else if ( KeeperState.Disconnected == keeperState ) {LOG.info( logPrefix + "与ZK服务器断开连接" );} else if ( KeeperState.AuthFailed == keeperState ) {LOG.info( logPrefix + "权限检查失败" );} else if ( KeeperState.Expired == keeperState ) {LOG.info( logPrefix + "会话失效" );}
LOG.info( "--------------------------------------------" );
}
}
4、程序实例
这里有一个可以用来演示“触发事件通知”和“如何处理这些事件通知”的程序AllZooKeeperWatcher.java。
在这里:https://github.com/alibaba/taokeeper/blob/master/taokeeper-research/src/main/java/com/taobao/taokeeper/research/watcher/AllZooKeeperWatcher.java
运行结果如下:
- 2012-08-05 06:35:23,779 - 【Main】开始连接ZK服务器
- 2012-08-05 06:35:24,196 - 【Watcher-1】收到Watcher通知
- 2012-08-05 06:35:24,196 - 【Watcher-1】连接状态: SyncConnected
- 2012-08-05 06:35:24,196 - 【Watcher-1】事件类型: None
- 2012-08-05 06:35:24,196 - 【Watcher-1】成功连接上ZK服务器
- 2012-08-05 06:35:24,196 - --------------------------------------------
- 2012-08-05 06:35:24,354 - 【Main】节点创建成功, Path: /nileader, content: 1353337464279
- 2012-08-05 06:35:24,554 - 【Watcher-2】收到Watcher通知
- 2012-08-05 06:35:24,554 - 【Watcher-2】连接状态: SyncConnected
- 2012-08-05 06:35:24,554 - 【Watcher-2】事件类型: NodeCreated
- 2012-08-05 06:35:24,554 - 【Watcher-2】节点创建
- 2012-08-05 06:35:24,582 - --------------------------------------------
- 2012-08-05 06:35:27,471 - 【Main】更新数据成功,path:/nileader,
- 2012-08-05 06:35:27,667 - 【Watcher-3】收到Watcher通知
- 2012-08-05 06:35:27,667 - 【Watcher-3】连接状态: SyncConnected
- 2012-08-05 06:35:27,667 - 【Watcher-3】事件类型: NodeDataChanged
- 2012-08-05 06:35:27,667 - 【Watcher-3】节点数据更新
- 2012-08-05 06:35:27,696 - 【Watcher-3】数据内容: 1353337467434
- 2012-08-05 06:35:27,696 - --------------------------------------------
- 2012-08-05 06:35:30,534 - 【Main】节点创建成功, Path: /nileader/ch, content: 1353337470471
- 2012-08-05 06:35:30,728 - 【Watcher-4】收到Watcher通知
- 2012-08-05 06:35:30,728 - 【Watcher-4】连接状态: SyncConnected
- 2012-08-05 06:35:30,728 - 【Watcher-4】事件类型: NodeCreated
- 2012-08-05 06:35:30,728 - 【Watcher-4】节点创建
- 2012-08-05 06:35:30,758 - --------------------------------------------
- 2012-08-05 06:35:30,958 - 【Watcher-5】收到Watcher通知
- 2012-08-05 06:35:30,958 - 【Watcher-5】连接状态: SyncConnected
- 2012-08-05 06:35:30,958 - 【Watcher-5】事件类型: NodeChildrenChanged
- 2012-08-05 06:35:30,958 - 【Watcher-5】子节点变更
- 2012-08-05 06:35:30,993 - 【Watcher-5】子节点列表:[ch]
- 2012-08-05 06:35:30,993 - --------------------------------------------
- 2012-08-05 06:35:33,618 - 【Main】删除节点成功,path:/nileader/ch
- 2012-08-05 06:35:33,756 - 【Main】删除节点成功,path:/nileader
- 2012-08-05 06:35:33,817 - 【Watcher-6】收到Watcher通知
- 2012-08-05 06:35:33,817 - 【Watcher-6】连接状态: SyncConnected
- 2012-08-05 06:35:33,817 - 【Watcher-6】事件类型: NodeDeleted
- 2012-08-05 06:35:33,817 - 【Watcher-6】节点 /nileader/ch 被删除
- 2012-08-05 06:35:33,817 - --------------------------------------------
- 2012-08-05 06:35:34,017 - 【Watcher-7】收到Watcher通知
- 2012-08-05 06:35:34,017 - 【Watcher-7】连接状态: SyncConnected
- 2012-08-05 06:35:34,017 - 【Watcher-7】事件类型: NodeChildrenChanged
- 2012-08-05 06:35:34,017 - 【Watcher-7】子节点变更
- 2012-08-05 06:35:34,109 - 【Watcher-7】子节点列表:null
- 2012-08-05 06:35:34,109 - --------------------------------------------
- 2012-08-05 06:35:34,309 - 【Watcher-8】收到Watcher通知
- 2012-08-05 06:35:34,309 - 【Watcher-8】连接状态: SyncConnected
- 2012-08-05 06:35:34,309 - 【Watcher-8】事件类型: NodeDeleted
- 2012-08-05 06:35:34,309 - 【Watcher-8】节点 /nileader 被删除
- 2012-08-05 06:35:34,309 - --------------------------------------------