Zookeeper客户端源码分析

时间:2023-01-11 08:33:04

1. 从ZooKeeper构造方法开始:

    public ZooKeeper(String connectString, intsessionTimeout, Watcher watcher,

            long sessionId, byte[]sessionPasswd, boolean canBeReadOnly)

        throws IOException

    {

        LOG.info("Initiating clientconnection, connectString=" + connectString

                + " sessionTimeout="+ sessionTimeout

                + " watcher=" +watcher

                + " sessionId=" +Long.toHexString(sessionId)

                + " sessionPasswd="

                + (sessionPasswd == null ?"<null>" : "<hidden>"));

 

        watchManager.defaultWatcher = watcher;

 

        ConnectStringParser connectStringParser= new ConnectStringParser(

                connectString);

        HostProvider hostProvider = newStaticHostProvider(

               connectStringParser.getServerAddresses());

        cnxn = newClientCnxn(connectStringParser.getChrootPath(),

               hostProvider, sessionTimeout, this, watchManager,

               getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);

        cnxn.seenRwServerBefore = true; //since user has provided sessionId

        cnxn.start();

}

 

--构造方法中主要作了4件事:

(1)把参数中的watcher对象加入了ZooKeeper对象的成员watchManager中,关于ZKWatchManager这个类,实际上主要功能是承担watcher管理容器的作用,从它的成员变量可看出

                  private final Map<String,Set<Watcher>> dataWatches =

           new HashMap<String, Set<Watcher>>();

       private final Map<String, Set<Watcher>> existWatches =

           new HashMap<String, Set<Watcher>>();

       private final Map<String, Set<Watcher>> childWatches =

           new HashMap<String, Set<Watcher>>();

       private volatile Watcher defaultWatcher;

里面包含了三个map容器,分别保存data,exist,child类型的watcher,还有一个单独的defaultWatcher,它们的区别是前三者是一次性。后者是在zookeeper对象生命周期内都生效,并且是在构造方法时传入。

(2)创建一个ClientCnxn对象,这个对象承担了客户端的职能,看看里面的构造方法源码

                  this.zooKeeper = zooKeeper;

       this.watcher = watcher;

       this.sessionId = sessionId;

       this.sessionPasswd = sessionPasswd;

       this.sessionTimeout = sessionTimeout;

       this.hostProvider = hostProvider;

       this.chrootPath = chrootPath;

 

       connectTimeout = sessionTimeout / hostProvider.size();

       readTimeout = sessionTimeout * 2 / 3;

       readOnly = canBeReadOnly;

 

       sendThread = new SendThread(clientCnxnSocket);

       eventThread = new EventThread();

--可见里面先是根据用户传来的参数赋值给自己的各个成员,这些都是比较重要的配置项,还有new出了两个非常重要的线程sendThread和eventThread,前者负责跟服务端之间的io交互(也就是发送请求和接受回复),后者是一个轮询线程,用来回调watcher的处理方法。

(3)在构造方法中有个重要的参数getClientCnxnSocket(),返回的是一个ClientCnxnSocket对象,这个对象非常关键,是底层的和服务端进行网络io交互的类对象,看看它的源码

       String clientCnxnSocketName = System

               .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);

       if (clientCnxnSocketName == null) {

           clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();

       }

       try {

           return (ClientCnxnSocket)Class.forName(clientCnxnSocketName)

                    .newInstance();

       } catch (Exception e) {

           IOException ioe = new IOException("Couldn't instantiate "

                    + clientCnxnSocketName);

           ioe.initCause(e);

           throw ioe;

       }

--看红色部分,这是一个比较少见的利用Class对象调用构造方法创建类对象的用法,为啥能这样做呢?因为Class对象是这个类所有方法,成员等信息的入口,有了它就能访问这个类的相关方法或成员信息。为什么不直接使用ClientCnxnSocket类的构造方法而多此一举呢?这是为了运行时能根据需要来动态地加载相关类,构建该类的对象。在这里实际创建的是ClientCnxnSocket的子类ClientCnxnSocketNIO对象,这种套路有个高大上的名字叫“反射”。

来看看ClientCnxnSocketNIO的构造方法都干了什么事

private final Selectorselector = Selector.open();

   private SelectionKey sockKey;

   ClientCnxnSocketNIO() throws IOException {

       super();

}

--为什么除了构造方法里面的代码还把它前面的两行成员变量的定义贴出来呢?因为真正完整的构造方法是包括非静态成员的初始化动作的。所以这一步最关键的动作就是红色部分,

本人对socket方面不熟,估计作用就是创建socket连接之类的事,具体可见下面其他人的博客http://blog.csdn.net/u010412719/article/details/52809669

最后创建出来的这个ClientCnxnSocketNIO对象被SendThread的构造方法当实参使用了,看看里面干了什么

SendThread(ClientCnxnSocket clientCnxnSocket) {

           super(makeThreadName("-SendThread()"));

           state = States.CONNECTING;

           this.clientCnxnSocket = clientCnxnSocket;

           setDaemon(true);

       }

--其实也没干了什么,只是把clientCnxnSocket赋值给了SendThread类的clientCnxnSocket成员,也就是这个跟服务端交互的网络io类最终是被SendThread拿来在线程中使用了。

(4)启动上一步创建出来的sendThread和eventThread线程,源码很简单

                  sendThread.start();

       eventThread.start();

--到此为止,好像zookeeper的构造方法分析就结束了

 

2.sendThread线程启动后干了什么

       public void run() {

           clientCnxnSocket.introduce(this,sessionId);

           clientCnxnSocket.updateNow();

           clientCnxnSocket.updateLastSendAndHeard();

           int to;

           long lastPingRwServer = System.currentTimeMillis();

           final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds

           while (state.isAlive()) {

                try {

                    if(!clientCnxnSocket.isConnected()) {

                        if(!isFirstConnect){

                            try {

                               Thread.sleep(r.nextInt(1000));

                            } catch(InterruptedException e) {

                               LOG.warn("Unexpected exception", e);

                            }

                        }

                        // don't re-establishconnection if we are closing

                        if (closing ||!state.isAlive()) {

                            break;

                        }

                        startConnect();

                       clientCnxnSocket.updateLastSendAndHeard();

                    }

 

                    if (state.isConnected()) {

                        // determine whether weneed to send an AuthFailed event.

                        if (zooKeeperSaslClient!= null) {

                            booleansendAuthEvent = false;

                            if(zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {

                                try {

                                   zooKeeperSaslClient.initialize(ClientCnxn.this);

                                } catch (SaslException e) {

                                  LOG.error("SASL authentication with Zookeeper Quorum member failed:" + e);

                                    state =States.AUTH_FAILED;

                                    sendAuthEvent= true;

                                }

                            }

                            KeeperStateauthState = zooKeeperSaslClient.getKeeperState();

                            if (authState !=null) {

                                if (authState== KeeperState.AuthFailed) {

                                    // Anauthentication error occurred during authentication with the Zookeeper Server.

                                    state =States.AUTH_FAILED;

                                    sendAuthEvent = true;

                                } else {

                                    if(authState == KeeperState.SaslAuthenticated) {

                                       sendAuthEvent = true;

                                    }

                                }

                            }

 

                            if (sendAuthEvent== true) {

                               eventThread.queueEvent(new WatchedEvent(

                                     Watcher.Event.EventType.None,

                                     authState,null));

                            }

                        }

                        to = readTimeout -clientCnxnSocket.getIdleRecv();

                    } else {

                        to = connectTimeout -clientCnxnSocket.getIdleRecv();

                    }

                   

                    if (to <= 0) {

                        String warnInfo;

                        warnInfo = "Clientsession timed out, have not heard from server in "

                            +clientCnxnSocket.getIdleRecv()

                            + "ms"

                            + " forsessionid 0x"

                            +Long.toHexString(sessionId);

                        LOG.warn(warnInfo);

                        throw newSessionTimeoutException(warnInfo);

                    }

                    if (state.isConnected()) {

                             //1000(1 second) is to prevent race condition missing tosend the second ping

                             //also make sure not to send too many pings when readTimeoutis small

                        int timeToNextPing =readTimeout / 2 - clientCnxnSocket.getIdleSend() -

                                           ((clientCnxnSocket.getIdleSend() > 1000) ? 1000: 0);

                        //send a ping requesteither time is due or no packet sent out within MAX_SEND_PING_INTERVAL

                        if (timeToNextPing<= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {

                           sendPing();

                           clientCnxnSocket.updateLastSend();

                        } else {

                            if (timeToNextPing< to) {

                                to =timeToNextPing;

                            }

                        }

                    }

 

                    // If we are in read-onlymode, seek for read/write server

                    if (state ==States.CONNECTEDREADONLY) {

                        long now = System.currentTimeMillis();

                        int idlePingRwServer =(int) (now - lastPingRwServer);

                        if (idlePingRwServer>= pingRwTimeout) {

                            lastPingRwServer =now;

                            idlePingRwServer =0;

                            pingRwTimeout =

                               Math.min(2*pingRwTimeout, maxPingRwTimeout);

                            pingRwServer();

                        }

                        to = Math.min(to,pingRwTimeout - idlePingRwServer);

                    }

 

                   clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue,ClientCnxn.this);

                } catch (Throwable e) {

                    if (closing) {

                        if (LOG.isDebugEnabled()){

                            // closing so thisis expected

                            LOG.debug("Anexception was thrown while closing send thread for session 0x"

                                    +Long.toHexString(getSessionId())

                                    + " :" + e.getMessage());

                        }

                        break;

                    } else {

                        // this is ugly, youhave a better way speak up

                        if (e instanceofSessionExpiredException) {

                           LOG.info(e.getMessage() + ", closing socket connection");

                        } else if (e instanceofSessionTimeoutException) {

                            LOG.info(e.getMessage() + RETRY_CONN_MSG);

                        } else if (e instanceofEndOfStreamException) {

                           LOG.info(e.getMessage() + RETRY_CONN_MSG);

                        } else if (e instanceofRWServerFoundException) {

                           LOG.info(e.getMessage());

                        } else {

                            LOG.warn(

                                   "Session 0x"

                                            +Long.toHexString(getSessionId())

                                            +" for server "

                                            +clientCnxnSocket.getRemoteSocketAddress()

                                            +", unexpected error"

                                            + RETRY_CONN_MSG, e);

                        }

                        cleanup();

                        if (state.isAlive()) {

                           eventThread.queueEvent(new WatchedEvent(

                                   Event.EventType.None,

                                   Event.KeeperState.Disconnected,

                                    null));

                        }

                       clientCnxnSocket.updateNow();

                       clientCnxnSocket.updateLastSendAndHeard();

                    }

                }

           }

           cleanup();

           clientCnxnSocket.close();

           if (state.isAlive()) {

                eventThread.queueEvent(newWatchedEvent(Event.EventType.None,

                       Event.KeeperState.Disconnected, null));

           }

           ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),

                    "SendThread exitedloop for session: 0x"

                           + Long.toHexString(getSessionId()));

       }

--注意红色部分,如果socket还没连接到服务器,则进行连接,最终调了SocketChannel.connect(SocketAddress remote)进行连接,然后更新相关状态。如果已经连接上了,那么执行clientCnxnSocket.doTransport(to, pendingQueue,outgoingQueue,ClientCnxn.this);,看看doTransport方法里面的源码

voiddoTransport(int waitTimeOut, List<Packet> pendingQueue,LinkedList<Packet> outgoingQueue,

                    ClientCnxn cnxn)

           throws IOException, InterruptedException {

       selector.select(waitTimeOut);

       Set<SelectionKey> selected;

       synchronized (this) {

           selected = selector.selectedKeys();

       }

       // Everything below and until we get back to the select is

       // non blocking, so time is effectively a constant. That is

       // Why we just have to do this once, here

       updateNow();

       for (SelectionKey k : selected) {

           SocketChannel sc = ((SocketChannel) k.channel());

           if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {

                if (sc.finishConnect()) {

                    updateLastSendAndHeard();

                   sendThread.primeConnection();

                }

           } else if ((k.readyOps() & (SelectionKey.OP_READ |SelectionKey.OP_WRITE)) != 0) {

                doIO(pendingQueue, outgoingQueue, cnxn);

           }

       }

       if (sendThread.getZkState().isConnected()) {

           synchronized(outgoingQueue) {

                if (findSendablePacket(outgoingQueue,

                       cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {

                    enableWrite();

                }

           }

       }

       selected.clear();

}

--看红色部分,其中doIO(pendingQueue, outgoingQueue, cnxn)是进行网络io读写操作,而findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()是把请求从outgoingQueue对列中提取出来发送给服务端。看看doIO的源码

if (sockKey.isReadable()) {

int rc = sock.read(incomingBuffer);

……….

sendThread.readResponse(incomingBuffer);

}

if (sockKey.isWritable()) {

           synchronized(outgoingQueue) {

                Packet p =findSendablePacket(outgoingQueue,

                       cnxn.sendThread.clientTunneledAuthenticationInProgress());

...........

sock.write(p.bb);

..........

outgoingQueue.removeFirstOccurrence(p);

.........

pendingQueue.add(p);

}

--如果有可读事件,那么就读取到缓存中,然后调用readResponse方法,如果有可写事件,那么就是用socket发送到服务端,并且从outgoingQueue队列移出掉该事件包,并加入pendingQueue队列中。再看看readResponse方法的源码

WatchedEvent we = new WatchedEvent(event);

eventThread.queueEvent( we );

--主要就是构造了WatchedEvent对象,并加入了waitingEvents队列,供eventThread线程使用。


3.eventThread线程启动后干了什么

       public void run() {

          try {

              isRunning = true;

              while(true) {

                Object event = waitingEvents.take();

                 if (event == eventOfDeath) {

                    wasKilled = true;

                 } else {

                    processEvent(event);

                 }

                 if (wasKilled)

                    synchronized(waitingEvents) {

                       if(waitingEvents.isEmpty()) {

                          isRunning = false;

                          break;

                       }

                    }

              }

          } catch (InterruptedException e) {

              LOG.error("Event threadexiting due to interruption", e);

          }

 

           LOG.info("EventThread shut down for session: 0x{}",

                    Long.toHexString(getSessionId()));

       }

--注意红色部分,线程会循环去从waitingEvents对列中消费event对象(生产者是sendThread线程),并且调用processEvent方法来处理。看看processEvent的源码

       private void processEvent(Object event){

          try {

              if (event instanceof WatcherSetEventPair) {

                  // each watcher will processthe event

                  WatcherSetEventPair pair =(WatcherSetEventPair) event;

                  for (Watcher watcher :pair.watchers) {

                      try {

                         watcher.process(pair.event);

                      } catch (Throwable t) {

                          LOG.error("Errorwhile calling watcher ", t);

                      }

                  }

              } else {

                  Packet p = (Packet) event;

                  int rc = 0;

                  String clientPath =p.clientPath;

                  if (p.replyHeader.getErr() !=0) {

                      rc =p.replyHeader.getErr();

                  }

                  if (p.cb == null) {

                      LOG.warn("Somehow anull cb got to EventThread!");

                  } else if (p.responseinstanceof ExistsResponse

                          || p.responseinstanceof SetDataResponse

                          || p.responseinstanceof SetACLResponse) {

                      StatCallback cb =(StatCallback) p.cb;

                      if (rc == 0) {

                          if (p.responseinstanceof ExistsResponse) {

                             cb.processResult(rc, clientPath, p.ctx,

                                     ((ExistsResponse) p.response)

                                             .getStat());

                          } else if (p.responseinstanceof SetDataResponse) {

                              cb.processResult(rc,clientPath, p.ctx,

                                     ((SetDataResponse) p.response)

                                             .getStat());

                          } else if (p.responseinstanceof SetACLResponse) {

                             cb.processResult(rc, clientPath, p.ctx,

                                     ((SetACLResponse) p.response)

                                             .getStat());

                          }

                      } else {

                          cb.processResult(rc,clientPath, p.ctx, null);

                      }

                  } else if (p.responseinstanceof GetDataResponse) {

                      DataCallback cb =(DataCallback) p.cb;

                      GetDataResponse rsp =(GetDataResponse) p.response;

                      if (rc == 0) {

                          cb.processResult(rc,clientPath, p.ctx, rsp

                                  .getData(),rsp.getStat());

                      } else {

                          cb.processResult(rc,clientPath, p.ctx, null,

                                  null);

                      }

                  } else if (p.responseinstanceof GetACLResponse) {

                      ACLCallback cb =(ACLCallback) p.cb;

                      GetACLResponse rsp =(GetACLResponse) p.response;

                      if (rc == 0) {

                          cb.processResult(rc,clientPath, p.ctx, rsp

                                  .getAcl(),rsp.getStat());

                      } else {

                          cb.processResult(rc,clientPath, p.ctx, null,

                                  null);

                      }

                  } else if (p.responseinstanceof GetChildrenResponse) {

                      ChildrenCallback cb =(ChildrenCallback) p.cb;

                      GetChildrenResponse rsp =(GetChildrenResponse) p.response;

                      if (rc == 0) {

                          cb.processResult(rc,clientPath, p.ctx, rsp

                                 .getChildren());

                      } else {

                          cb.processResult(rc,clientPath, p.ctx, null);

                      }

                  } else if (p.responseinstanceof GetChildren2Response) {

                      Children2Callback cb =(Children2Callback) p.cb;

                      GetChildren2Response rsp= (GetChildren2Response) p.response;

                      if (rc == 0) {

                          cb.processResult(rc,clientPath, p.ctx, rsp

                                 .getChildren(), rsp.getStat());

                      } else {

                          cb.processResult(rc,clientPath, p.ctx, null, null);

                      }

                  } else if (p.responseinstanceof CreateResponse) {

                      StringCallback cb =(StringCallback) p.cb;

                      CreateResponse rsp =(CreateResponse) p.response;

                      if (rc == 0) {

                          cb.processResult(rc,clientPath, p.ctx,

                                  (chrootPath== null

                                          ?rsp.getPath()

                                          :rsp.getPath()

                                   .substring(chrootPath.length())));

                      } else {

                          cb.processResult(rc,clientPath, p.ctx, null);

                      }

                  } else if (p.responseinstanceof MultiResponse) {

                          MultiCallback cb =(MultiCallback) p.cb;

                          MultiResponse rsp =(MultiResponse) p.response;

                          if (rc == 0) {

                                 List<OpResult> results = rsp.getResultList();

                                  int newRc =rc;

                                  for (OpResult result :results) {

                                          if(result instanceof ErrorResult

                                             && KeeperException.Code.OK.intValue()

                                                  != (newRc = ((ErrorResult)result).getErr())) {

                                                 break;

                                          }

                                  }

                                 cb.processResult(newRc, clientPath, p.ctx, results);

                          } else {

                                 cb.processResult(rc, clientPath, p.ctx, null);

                          }

                  }  else if (p.cb instanceof VoidCallback) {

                      VoidCallback cb =(VoidCallback) p.cb;

                      cb.processResult(rc,clientPath, p.ctx);

                  }

              }

          } catch (Throwable t) {

              LOG.error("Caught unexpectedthrowable", t);

          }

       }

}

--对event事件的处理实际分两大类,一类是我们很熟悉的watcher事件,调用我们重写后的process方法处理。另一大类是不大熟悉的AsyncCallback事件,而对于这种事件,要根据具体回复包里面的内容选择执行AsyncCallback类的子类的processResult方法处理该事件。对于后一大类没用过,引用网上的一些知识点

Watcher AsyncCallback的区别

Watcher Watcher是用于监听节点,session 状态的,比如 getData 对数据节点 a设置了 watcher ,那么当 a 的数据内容发生改变时,客户端会收到 NodeDataChanged通知,然后进行 watcher 的回调。

AsyncCallback : AsyncCallback 是在以异步方式使用 ZooKeeper API时,用于处理返回结果的。例如:getData同步调用的版本是: byte[] getData(String path, boolean watch,Statstat),异步调用的版本是: voidgetData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx),可以看到,前者是直接返回获取的结果,后者是通过 AsyncCallback回调处理结果的。

--可见AsyncCallback是用于异步调用的回调方法。

 

--到此,zookeeper客户端源码就算分析完了,由于本人对socket方面不熟悉,所以这块讲的不是太清晰,后续要把这块知识点认真学习好。