ZooKeeper源码分析:Quorum请求的整个流程

时间:2021-04-26 08:24:41

Quorum请求是转发给Leader处理,并且需要得一个Follower Quorum确认的请求。这些请求包括:

1)znode的写操作(OpCode.create,OpCode.delete,OpCode.setData,OpCode.setACL)

2)Session的创建和关闭操作(OpCode.createSession和OpCode.closeSession)

3)OpCode.multi操作。

本博文分析了Client, Follower和Leader协同完成Quorum请求的过程。另外需注意的是OpCode.sync请求也需要转发给Leader, 但不需要得到一个Follower Quorum确认。本文也会提到OpCode.sync操作。

 

数据结构

Request类型对象:Server内部传递的数据结构。

属性 说明
sessionId  会话ID
cxid 客户端事务ID
type 操作类型, 如OpCode.setData
request 请求Record对象,如SetDataRequest
cnxn Server和Client端的连接对象
hdr 请求事务头TxnHeader
txn 请求事务体Record,如OpCode.setData请求,则是SetDataTxn类型对象
zxid ZooKeeper事务ID
authInfo 认证信息
createTime 创建时间
owner 所有者
e 处理过程中的异常

QuorumPacket类型对象:用于ZooKeeper服务器之间传递的数据包。

属性 说明
type QuorumPacket类型,如Leader.REQUEST和Leader.ACK等
zxid ZooKeeper事务ID
data 数据包的数据:
在Leader.REQUEST中,数据依次如下:
Request.sessionId
Request.cxid
Request.type
Request.request
在Leader.PROPOSAL中,数据依次如下:
Request.hdr
Request.txn
在Leader.ACK中,为null
在Leader.COMMIT中,为null
authinfo 认证信息

 

Quorum请求流程

假设拓扑结构如下图,Client A和Follower A建立连接。

ZooKeeper源码分析:Quorum请求的整个流程

 

数据流程图如下。在图中,连接线说明前的数字表示事件发的生时序,主时序是直接使用一个数字表示,并且数字越小表示越早发生(如1 Client Request是在2 Request之前发生)。对于和主时序并发的操作使用主时序序号后加上一个括号括起来的数字表示,如7(1)-n Request指和7 Request是并发的。7(1)-n中n表示以7(1)开头的操作时序。

ZooKeeper源码分析:Quorum请求的整个流程

 

我们从数据流程图中Step 1讲起:Client A 发起一个Quorum请求给Follower A。

【Client A, Step 1】Client A调用Quorum请求对应的方法:

如调用Zookeeper的构造函数,会发起OpCode.createSession请求,

如调用Zookeeper.setData方法,会发起OpCode.setData操作。

最终会调用ClientCnxn.submitRequest方法将请求放入outgoingQueue队列中,并阻塞等待Follower A反馈。而ClientCnxn.SendThread线程会从outgoingQueue中取出请求,并发送给Follower A。

下面代码Zookeeper.setData方法: Client A构建对象发送给Follower A

    public   Stat setData(   final   String path,   byte   data[],   int   version)
         throws   KeeperException, InterruptedException
    {
         final   String clientPath = path;
        PathUtils. validatePath(clientPath);

        //通过传入的path构造完整serverPath
         final   String serverPath = prependChroot(clientPath);

        //构造一个Request头
        RequestHeader h =   new   RequestHeader();
        //设置类型为setData
        h.setType(ZooDefs.OpCode. setData );
        //构造一个SetData请求体
        SetDataRequest request =   new   SetDataRequest();
        //设置需要修改node的serverPath
        request.setPath(serverPath);
        //设置需要修改的node的data
        request.setData(data);
        //设置需要修改的node的version
        request.setVersion(version);
       
        //构建SetDataResponse对象
        SetDataResponse response =   new   SetDataResponse();

        //提交请求,并等待返回结果
        ReplyHeader r =   cnxn .submitRequest(h, request, response,   null );
        //如果r.getErr()不能0,则表示有错误,抛出异常
         if   (r.getErr() != 0) {
             throw   KeeperException.create(KeeperException.Code. get(r.getErr()),
                    clientPath);
        }
         return   response.getStat();
    }

【Follower A, Step 2,3】Follower A的NIOServerCnxn类接到了Client A的请求,会调用ZookeeperServer.processPacket方法。该方法会构建一个Request对象,并调用第一个处理器FollowerRequestProcessor的processRequest方法。该方法将Request对象放入FollowerRequestProcessor.queuedRequests队列中。FollowerRequestProcessor处理器线程会循环从FollowerRequestProcessor.queuedRequests队列中取出Request对象,并继续下面步骤:

1)调用下一个处理器CommitProcessor的processRequest方法。该方法将Request对象放入CommitProcessor.queuedRequests队列中;

2)通过Request.type判断Request类型。若发现是一个Quorum请求,会直接调用Learner.request(request)方法。该方法将Request对象封装成一个Leader.Request的Quorum数据包,并发送给Leader。

OpCode.sync操作也将调用Learner.request方法将请求转发给Leader,但在这之前会先将Request对象加入到pendingSyncs队列中。

FollowerRequestProcessor的run方法如下:

     public   void  run() {
         try  {
             while  (!finished ) {
                 //从queuedRequests队列中取出Request对象
                Request request = queuedRequests .take();
                 if  (LOG .isTraceEnabled()) {
                    ZooTrace. logRequest( LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK ,
                             'F'  , request,  ""   );
                }
                 //当request是Request.requestOfDeath,一个poison pill, 就退出while循环,
                //并结束FollowerRequestProcessor线程
                 if  (request == Request.requestOfDeath) {
                     break  ;
                }

                 //我们在提交这个request到leader之前,把这个request传递到下一个处理器。
                 //这样我们就准备好从Leader那得到Response
                nextProcessor.processRequest(request);

                //只有Quorum操作和sync操作才会调用Follower.request方法, 转发Leader.REQUEST数据包给Leader
                //sync操作和 Quorum操作有一些不同,
                //我们需要保持跟踪这个sync操作对于的Follower已经挂起,所有我们将它加入pendingSyncs队列中。
                 switch  (request.type ) {
                 case  OpCode.sync:
                     //将OpCode.sync放入pendingSyncs队列中
                    zks.pendingSyncs .add(request);
                    zks.getFollower().request(request);
                     break  ;
                 case  OpCode.create:
                 case  OpCode.delete:
                 case  OpCode.setData:
                 case  OpCode.setACL:
                 case  OpCode.createSession:
                 case  OpCode.closeSession:
                 case  OpCode.multi:
                    //Quorum请求,直接调用Folloer.request方法
                    zks.getFollower().request(request);
                     break  ;
                }
            }
        }  catch  (Exception e) {
            LOG.error(  "Unexpected exception causing exit"  , e);
        }
        LOG.info(  "FollowerRequestProcessor exited loop!"  );
    }

【Leader A, Step 4】Leader A的LearnerHandler线程会循环读取从Learner获得的Quorum数据包。如果数据包是Learner.REQUEST类型,则会解析Quorum数据包的内容,检查操作类型。
如果操作类型不是OpCode.sync, 则会构造Request对象。并调用ZooKeeperServer.submitRequest方法(和上面Follower接收到请求所使用的submitRequest方法是同一个方法),并最终会调用第一个处理器PrepRequestProcessor的submitRequest方法,将Request对象放入PrepRequestProcessor.submittedRequests队列中。

如果操作类型是OpCode.sync, 会构造Request类型的子类LearnerSyncRequest对象,并同样调用PrepRequestProcessor的submitRequest方法。

LearnerHandler.run方法中对Leader.REQUEST数据包的处理代码如下:

    public   void  run () {
        ......
         case  Leader.REQUEST :
           bb = ByteBuffer. wrap(qp .getData());
            //从QuorumPacket中读取sesssionId
           sessionId = bb.getLong();
            //从QuorumPacket中读取 cxid
           cxid = bb.getInt();
            //从QuorumPacket中读取操作类型
           type = bb.getInt();
           bb = bb.slice();
           Request si;
            //如果操作Code的类型是OpCode.sync,则构造LearnerSyncRequest对象
            if  (type == OpCode.sync){
               si =  new  LearnerSyncRequest(   this  , sessionId, cxid, type , bb, qp.getAuthinfo());
           }
             //如果操作Code的类型不是OpCode.sync, 则构造Request对象
            else  {
               si =  new  Request(   null  , sessionId, cxid, type , bb, qp.getAuthinfo());
           }

            //设置owner
           si.setOwner(   this  );
            //提交请求
           leader.zk .submitRequest(si);
            break  ;
       ......
   }

PrepRequestProcessor处理器线程会从PrepRequestProcessor.submittedRequests队列中取出Request对象,并根据Request类型构建TxnHeader和Record对象,然后分别赋给Request.hdr和Request.txn。之后会调用下一个处理器ProposalRequestProcessor的processRequest方法,将Request对象传递给处理器ProposalRequestProcessor。(如果发现有异常会则会创建一个错误Record类型对象)

PrepRequestProcessor的run方法如下:

     public   void  run() {
         try  {
             while  ( true   ) {
                 //从submittedRequests队列中取去第一个request对象
                Request request = submittedRequests .take();
                 long  traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                 //如果是OpCode.ping操作,则将traceMask设置成ZooTrace. CLIENT_PING_TRACE_MASK
                 if  (request.type == OpCode.ping) {
                    traceMask = ZooTrace. CLIENT_PING_TRACE_MASK;
                }
                 if  (LOG .isTraceEnabled()) {
                    ZooTrace. logRequest( LOG, traceMask,  'P'   , request,  ""  );
                }
                 //如果request是一个requestOfDeath, 则退出while循环。
                 if  (Request.requestOfDeath == request) {
                     break  ;
                }
                 //处理请求
                pRequest(request);
            }
        }  catch  (InterruptedException e) {
            LOG.error(  "Unexpected interruption"  , e);
        }  catch  (RequestProcessorException e) {
             if  (e.getCause()  instanceof  XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            LOG.error(  "Unexpected exception"  , e);
        }  catch  (Exception e) {
            LOG.error(  "Unexpected exception"  , e);
        }
        LOG.info(  "PrepRequestProcessor exited loop!"  );
    }  
     

PrepRequestProcessor的pRequest2Txn方法,该方法会在pRequest方法中调用,构建TxnHeader和Record对象。下面是关于OpCode.setData请求的代码:

    protected   void  pRequest2Txn(  int  type,  long  zxid, Request request, Record record,  boolean  deserialize)
         throws  KeeperException, IOException, RequestProcessorException
    {
        request.hdr =  new  TxnHeader(request.sessionId , request.cxid, zxid,
                                    zks.getTime(), type);

         switch  (type) {
            .....
             case  OpCode.setData:
                 //检查session
                zks.sessionTracker .checkSession(request.sessionId, request.getOwner());
                 //将record转成SetDataRequest类型
                SetDataRequest setDataRequest = ( SetDataRequest)record;
                 if  (deserialize)
                     //将Request.reques数据反序列化成setDataRequest对象
                    ByteBufferInputStream.byteBuffer2Record(request. request, setDataRequest);
                 //获取需要需要修改的znode的path
                path = setDataRequest.getPath();
                 //获取内存数据中获取path对于的znode信息
                nodeRecord = getRecordForPath( path);
                 //检查对 znode是否有写权限
                checkACL( zks, nodeRecord .acl , ZooDefs.Perms.WRITE,
                        request.authInfo);
                 //获取客户端设置的版本号
                version = setDataRequest.getVersion();
                 //获取节点当前版本号
                 int  currentVersion = nodeRecord.stat.getVersion();
                 //如果客户端设置的版本号不是-1,且不等于当前版本号,则抛出KeeperException.BadVersionException异常
                 if  (version != -1 && version != currentVersion) {
                     throw   new   KeeperException .BadVersionException(path);
                }
                 //version等于当前版本加1
                version = currentVersion + 1;
                 //构建SetDataTxn对象,并赋给request.txn
                request. txn =  new  SetDataTxn( path, setDataRequest.getData(), version);
                 //拷贝nodeRecord
                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                 //将nodeRecord的当前版本号设置为version
                nodeRecord.stat.setVersion( version);
                 //将nodeRecord放入outstandingChanges
                //path和nodeRecord map放入outstandingChangesForPath
                addChangeRecord( nodeRecord);
                 break  ;
           ......
        }
    }

【Leader A, Step 5,6】处理器ProposalRequestProcessor会先判断Request对象是否是LearnerSyncRequest类型。

如果不是LearnerSyncRequest类型(也就是Quorum请求),会按如下步骤执行:

1)调用下一个处理器CommitProcessor的processRequest方法,将Request对象放入CommitProcessor.queuedRequests队列中;

2)将proposal发送到所有的Follower;

3)调用SyncRequestProcessor处理器的processRequest方法。该方法会将请求放入SyncRequestProcessor.queuedRequests队列中。(【Leader A, Step 7(1)】SyncRequestProcessor线程会记录Log, 然后传递给SendAckRequestProcessor。SendAckRequestProcessor会发送一个Leader.ACK的Quorum数据包给自己)

如果是LearnerSyncRequest类型,说明该请求是OpCode.sync操作,则会直接调用Leader.processSync方法。

ProposalRequestProcessor的processRequest方法如下:

     public   void  processRequest(Request request)  throws  RequestPrzocessorException {
         //如果是sync操作,则调用Leader.processSync方法
         if  (request  instanceof  LearnerSyncRequest){
            zks.getLeader().processSync(( LearnerSyncRequest)request);
        }
         //如果不是sync操作
         else  {
            //传递到下一个处理器
            nextProcessor.processRequest(request);
             if  (request.hdr !=  null ) {
                 // We need to sync and get consensus on any transactions
                 try  {
                     //发送proposal给所有的follower
                    zks.getLeader().propose(request);
                }  catch  (XidRolloverException e) {
                     throw   new   RequestProcessorException (e.getMessage(), e);
                }
                 //调用SyncRequestProcessor处理器的processRequest方法
                syncProcessor.processRequest(request);
            }
        }
    }
Leader的propose方法如下:
     /**
     * 创建Proposal,并发送给所有的members
     *  @param  request
     *  @return  the proposal that is queued to send to all the members
     */
     public  Proposal propose(Request request)  throws  XidRolloverException {

         //解决 rollover的问题,所有低32位重置表示一个新的leader选择。强制重新选择Leader。
         //See ZOOKEEPER- 1277
         if  ((request.zxid & 0xffffffffL) == 0xffffffffL) {
            String msg =
                     "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start" ;
            shutdown(msg);
             throw   new   XidRolloverException (msg);
        }
     
         //将request.hdr和request.txn序列化到boa
        ByteArrayOutputStream baos =  new  ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive. getArchive(baos);
         try  {
            request.hdr.serialize(boa,  "hdr"  );
             if  (request.txn !=  null ) {
                request. txn.serialize(boa,  "txn"  );
            }
            baos.close();
        }  catch  (IOException e) {
            LOG.warn(  "This really should be impossible"  , e);
        }
         //构造Leader.PROPOSAL的QuorumPacket
        QuorumPacket pp =  new  QuorumPacket(Leader.PROPOSAL, request.zxid,
                baos.toByteArray(),  null  );
         //构造Proposal对象
        Proposal p =  new  Proposal();
        p.packet = pp;
        p.request = request;
         synchronized  ( this   ) {
             if  (LOG .isDebugEnabled()) {
                LOG.debug(  "Proposing:: "  + request);
            }

             //获得packet的 zxid, 并放入outstandingProposals 未完成Proposal Map中
            lastProposed = p.packet.getZxid();
             //将p加入到outstandingProposals Map中
            outstandingProposals.put( lastProposed , p);
             //发送给所有的Follower
            sendPacket(pp);
        }
         return  p;
    }    

Follower.processPacket方法如下:

     /**
     * 检查在qp中接收到的packet, 并根据它的内容进行分发
     *  @param  qp
     *  @throws  IOException
     */
     protected   void  processPacket(QuorumPacket qp)  throws  IOException{
         switch  (qp.getType()) {
         case  Leader.PING:
            ping(qp);
             break  ;
         case  Leader.PROPOSAL:
            TxnHeader hdr =  new  TxnHeader();
             //从数据包 qp中反序列化出 txn
            Record txn = SerializeUtils . deserializeTxn(qp.getData(), hdr);
             if  (hdr.getZxid() != lastQueued + 1) {
                LOG.warn(  "Got zxid 0x"
                        + Long. toHexString(hdr.getZxid())
                        +  " expected 0x"
                        + Long. toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            fzk.logRequest(hdr, txn);
             break  ;
         case  Leader.COMMIT:
            fzk.commit(qp.getZxid());
             break  ;
         case  Leader.UPTODATE:
            LOG.error(  "Received an UPTODATE message after Follower started" );
             break  ;
         case  Leader.REVALIDATE:
            revalidate(qp);
             break  ;
         case  Leader.SYNC:
            fzk.sync();
             break  ;
        }
        }    

FollowerZooKeeperServer的logRequest方法如下:

     public   void  logRequest(TxnHeader hdr, Record txn) {
         //构建Request对象
        Request request =  new  Request(  null , hdr.getClientId(), hdr.getCxid(),
                hdr.getType(),  null  ,  null   );
        request.hdr = hdr;
        request.txn = txn;
        request.zxid = hdr.getZxid();
         //如果request.zxid的低32为不全为0, 则加入pendingTxns队列中
         if  ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
         //调用SyncRequestProcessor处理这个request
        syncProcessor.processRequest(request);
        }  

【All Followers, Step 8】处理器SyncRequestProcessor的功能和Leader的SyncRequestProcessor一样,将请求记录到日志中,然后将Request请求传递给下一个处理器。不过Follower的下一个处理器是SendAckRequestProcessor。该处理器会构建一个Leader.ACK的Quorum数据包,并发送给Leader。

SendAckRequestProcessor的processRequest方法如下:

     public   void  processRequest(Request si) {
         if  (si.type != OpCode.sync){
             //构建Leader.ACK Quorum
            QuorumPacket qp =  new  QuorumPacket(Leader.ACK, si.hdr.getZxid(),  null   ,
                 null  );
             try  {
                 //将Leader.ACK Quorum数据包发送给Leader
                learner.writePacket(qp,  false );
            }  catch  (IOException e) {
                LOG.warn(  "Closing connection to leader, exception during packet send" , e);
                 try  {
                     if  (!learner .sock .isClosed()) {
                        learner.sock .close();
                    }
                }  catch  (IOException e1) {
                     // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug(  "Ignoring error closing the connection"  , e1);
                }
            }
        }
    }

【Leader A, Step 9】LearnerHandler线程循环读取从Learner那获得的Quorum数据包。当发现是从Follower传输过来的Leader.ACK类型数据包,则会调用Leader.processAck方法进行处理。在Leader.processAck方法中,若已经有一个Follower Quorom发送了Leader.ACK数据包,则会执行下列三步骤:

1)调用Leader.commit方法,发送Leader.COMMIT类型Quorum数据包给所有 Follower;

2)调用Leader.inform 方法,通知所有的Observer;

3)调用处理器CommitRequestProcessor.commit 方法,将Request对象放到CommitRequestProcessor.committedRequests队列中。(【Leader A, Step 10(1)-1,10(1)-2】CommitProcessor线程会从CommitRequestProcessor.committedRequests队列中取出提交的Request对象,发现是和nextPending是一致的,然后提交的Request对象内容替换nextPending的内容,并将nextPending放入到toProcess队列中。下一次循环会从toProcess队列中取出nextPending,然后调用下一个处理器Leader.ToBeAppliedRequestProcessor的processRequest方法。该方法会调用下一个处理器FinalRequestProcessor的processRequest方法。FinalRequestProcessor.processRequest方法并根据Request对象中的操作更新内存中Session信息或者znode数据。)

Leader的processAck方法如下:

     /**
     * 保存某个proposal接收到的Ack数量
     *
     *  @param  zxid
     *                被发送的proposal的zxid
     *  @param  followerAddr
     */
     synchronized   public   void   processAck(  long  sid,  long  zxid, SocketAddress followerAddr) {
         if  (LOG .isTraceEnabled()) {
            LOG.trace(  "Ack zxid: 0x{}"  , Long.toHexString (zxid));
             for  (Proposal p : outstandingProposals .values()) {
                 long  packetZxid = p.packet.getZxid();
                LOG.trace(  "outstanding proposal: 0x{}"  ,
                        Long. toHexString(packetZxid));
            }
            LOG.trace(  "outstanding proposals all"  );
        }

         //如果 zxid的低32位都是0, 则直接return
         if  ((zxid & 0xffffffffL) == 0) {
             /*
             * We no longer process NEWLEADER ack by this method. However,
             * the learner sends ack back to the leader after it gets UPTODATE
             * so we just ignore the message.
             */
             return  ;
        }
         //如果没有未完成的proposal, 则直接return
         if  (outstandingProposals .size() == 0) {
             if  (LOG .isDebugEnabled()) {
                LOG.debug(  "outstanding is 0"  );
            }
             return  ;
        }
         //如果最近提交的proposal的 zxidack 的proposal的zxid大,说明 ack的proposal已经提交了, 则直接return
         if  (lastCommitted >= zxid) {
             if  (LOG .isDebugEnabled()) {
                LOG.debug(  "proposal has already been committed, pzxid: 0x{} zxid: 0x{}" ,
                        Long. toHexString( lastCommitted), Long.toHexString(zxid));
            }
             // The proposal has already been committed
             return  ;
        }
         //根据 zxid取出proposal对象
        Proposal p = outstandingProposals .get(zxid);
         //如果在未完成列表outstandingProposal中没有找到 zxid对于的proposal, 则说明该 zxid对于的Proposal还没有处理。
         if  (p ==  null ) {
            LOG.warn(  "Trying to commit future proposal: zxid 0x{} from {}" ,
                    Long. toHexString(zxid), followerAddr );
             return  ;
        }
        //将发送 ack的Follower的sid放入Proposal.ackSet集合中
        p. ackSet.add(sid);
         if  (LOG .isDebugEnabled()) {
            LOG.debug(  "Count for zxid: 0x{} is {}"  ,
                    Long. toHexString(zxid), p.ackSet.size());
        }
         //如果ackSet集合中已经包含了一个 Quorum
         if  (self .getQuorumVerifier().containsQuorum(p.ackSet)){
             if  (zxid != lastCommitted +1) {
                LOG.warn(  "Commiting zxid 0x{} from {} not first!"  ,
                        Long. toHexString(zxid), followerAddr );
                LOG.warn(  "First is 0x{}"  , Long.toHexString (lastCommitted + 1));
            }
             //从outstandingProposals中删除掉这个 zxid对于的proposal对象
            outstandingProposals.remove(zxid);
             //如果p.request不等于null, 则将这个proposal放入toBeApplied列表中
             if  (p.request !=  null ) {
                toBeApplied.add(p);
            }

             if  (p.request ==  null ) {
                LOG.warn(  "Going to commmit null request for proposal: {}" , p);
            }
             //发送Leader.COMMIT 包给所有的Follower
            commit(zxid);
             //通知所有的Observer
            inform(p);
             //调用处理器CommitProcessor的commit方法
            zk. commitProcessor.commit(p.request );
             //如果有sync等着等待这个commit的 zxid,发送Leader.SYNC数据包给对应的Follower
             if  (pendingSyncs .containsKey(zxid)){
                 for  (LearnerSyncRequest r: pendingSyncs .remove(zxid)) {
                    sendSync(r);
                }
            }
        }
    }

【All Follower, Step 10】Follower.followLeader方法会循环读取从Leader的传输过来的Quorum数据包,并调用Follower.processPacket方法。该方法会根据数据的内容来分发。当发现是Leader.COMMIT类型的Quorum数据包,则会根据Quorum数据包的内容构造一个Request对象,并调用FollowerZooKeeperServer.commit方法。该方法最终会调用处理器CommitRequestProcessor.commit方法,将Request对象放到CommitRequestProcessor.committedRequests队列中。

FollowerZooKeeperServer.commit方法如下:

    /**
     *当接收到一个COMMIT消息,这个方法会被调用。该方法会将COMMIT消息
     *中的zxid和pendingTxns队列中的第一个对象的zxid进行匹配。如何相同,则
     *传递给处理器CommitProcessor进行commit
     *  @param  zxid  -   must correspond to the head of pendingTxns if it exists
     */
     public   void  commit(  long  zxid ) {
         if  (pendingTxns .size() == 0) {
            LOG.warn(  "Committing "  + Long. toHexString (zxid)
                    +  " without seeing txn"  );
             return  ;
        }
         //取出pendingTxns第一个元素的 zxid
         long  firstElementZxid = pendingTxns .element().zxid;
         //如果第一个元素的 zxid不等于COMMIT消息中的 zxid, 则退出程序
         if  (firstElementZxid != zxid) {
            LOG.error(  "Committing zxid 0x"  + Long. toHexString (zxid)
                    +  " but next pending txn 0x"
                    + Long. toHexString(firstElementZxid));
            System. exit(12);
        }
         //pendingTxns取出,并删除第一个元素
        Request request = pendingTxns .remove();
         //将从pendingTxns队列中取出的第一个 reqeust对象传递给CommitProcessor处理器进行commit
        commitProcessor.commit(request);
    }

【All Follower, Step 11】处理器CommitProcessor线程会处理提交的Request对象。

如果是Follower A, nextPending对象是和提交Request对象是一致的,所以将提交Request对象内容替换nextPending中的内容,并放入toProcess队列中。在下一个循环会从toProcess队列中取出并传递到下一个迭代器FinalRequestProcessor中。(和Leader中的CommitProcessor线程处理逻辑是一样的)

如果不是Follower A, 则可能有下面两种情况:

1)queuedRequest队列为empty且nextPending为null, 也就是这个Follower没有自己转发的request正在处理;

2)nextPending不为null, 也就是有转发的request正在处理。但nextPending对象一定和提交的Request对象是不一致的。

不管是哪一种,都会直接将提交的Request对象加入到toProcess队列中。处理器CommitProcessor线程会从中取出并传递到下一个迭代器FinalRequestProcessor中。

CommitProcessor.run方法如下:

     public   void  run() {
         try  {
            Request nextPending =  null ;
             while  (!finished ) {
                 int  len = toProcess .size();
                 for  ( int   i = 0; i < len; i++) {
                    nextProcessor.processRequest( toProcess .get(i));
                }
                 //当将所有的request传递到下一个处理器FinalRequestProcessor后,将toProcess清空
                toProcess.clear();
                 synchronized  ( this   ) {
                     //如果queuedRequests队列为空,或者nextPending为null, 或者committedRequest队列为控股,则等待。
                     if  ((queuedRequests .size() == 0 || nextPending !=  null  )
                            && committedRequests.size() == 0) {
                        wait();
                         continue  ;
                    }
                    //第一步,检查这个commit是否为了pending request而来
                    //如果commit request到来,但是queuedRequests为空,或者nextPending为null
                     if  ((queuedRequests .size() == 0 || nextPending !=  null  )
                            && committedRequests.size() > 0) {
                        Request r = committedRequests .remove();
                         /*
                         * We match with nextPending so that we can move to the
                         * next request when it is committed. We also want to
                         * use nextPending because it has the cnxn member set
                         * properly.
                         */
                         //如果nextPending不等于null,
                         if  (nextPending !=  null
                                && nextPending. sessionId == r.sessionId
                                && nextPending. cxid == r.cxid ) {
                             // we want to send our version of the request.
                             // the pointer to the connection in the request
                            nextPending.hdr = r. hdr;
                            nextPending. txn = r.txn ;
                            nextPending. zxid = r.zxid ;
                            toProcess.add(nextPending);
                            nextPending =  null  ;
                        }  else  {
                             // this request came from someone else so just
                             // send the commit packet
                            //如果这个请求来自于其他人,则直接加入到toProcess中
                            //sync请求,或者不是Follower发起的请求
                            toProcess.add(r);
                        }
                    }
                }

                 //如果我们还没有匹配上pending request, 则返回继续等待
                 if  (nextPending !=  null ) {
                     continue  ;
                }

                 synchronized  ( this   ) {
                     //处理queuedRequests中下一个请求
                     while  (nextPending ==  null  && queuedRequests.size() > 0) {
                         //从queuedRequests中取出第一个,并将其从队列中删除
                        Request request = queuedRequests .remove();
                         switch  (request.type ) {
                         case  OpCode.create:
                         case  OpCode.delete:
                         case  OpCode.setData:
                         case  OpCode.multi:
                         case  OpCode.setACL:
                         case  OpCode.createSession:
                         case  OpCode.closeSession:
                             //如果不是OpCode.sync操作,则将request对象赋予nextPending
                            nextPending = request;
                             break  ;
                         case  OpCode.sync:
                             if  (matchSyncs ) {
                                nextPending = request;
                            }
                             //如果matchSyncs等于false, 则直接加入到toProcess, 不等待Commit
                             else  {
                                toProcess.add(request);
                            }
                             break  ;
                         default  :
                            toProcess.add(request);
                        }
                    }
                }
            }
        }  catch  (InterruptedException e) {
            LOG.warn(  "Interrupted exception while waiting"  , e);
        }  catch  (Throwable e) {
            LOG.error(  "Unexpected exception causing CommitProcessor to exit" , e);
        }
        LOG.info(  "CommitProcessor exited loop!"  );
    }

 

【All Follower, Step 12】处理器FinalRequestProcessor更新内存中Session信息或者znode数据。

对于Follower A,将会构建Reponse,并返回Response给Client A;

对于其它的Follower, 不需要返回Response给客户端,直接返回。

FinalRequestProcessor.processRequest方法如下。其中构造Response部分,只给出了SetData请求相关的代码。

     public   void  processRequest(Request request) {
         if  (LOG .isDebugEnabled()) {
            LOG.debug(  "Processing request:: "  + request);
        }
         // request.addRQRec(">final");
         long  traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
         if  (request.type == OpCode.ping) {
            traceMask = ZooTrace. SERVER_PING_TRACE_MASK;
        }
         if  (LOG .isTraceEnabled()) {
            ZooTrace. logRequest( LOG, traceMask,  'E'   , request,  ""  );
        }
        ProcessTxnResult rc =  null   ;
         synchronized  (zks.outstandingChanges ) {
            //循环从outstandingChanges中取出小于等于request.zxid的ChangeRecord,并删除
             while  (!zks .outstandingChanges .isEmpty()
                    && zks.outstandingChanges .get(0).zxid <= request.zxid) {
                ChangeRecord cr = zks.outstandingChanges .remove(0);
                 if  (cr.zxid < request.zxid) {
                    LOG.warn(   "Zxid outstanding "
                            + cr. zxid
                            +  " is less than current "  + request.zxid );
                }
                 if  (zks .outstandingChangesForPath .get(cr.path) == cr) {
                    zks.outstandingChangesForPath .remove(cr.path);
                }
            }
  
             //如果request.hdr不等于null, 则在内存 Datatree中处理这个请求
             if  (request.hdr !=  null ) {
               TxnHeader hdr = request. hdr;
               Record txn = request. txn;
  
               rc = zks.processTxn(hdr, txn);
            }
             //检测这个request的类型是否是需要 Quorum Ack requrest
             //如果是,加入到committedProposal中
             if  (Request. isQuorum(request.type)) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }
  
         if  (request.hdr !=  null  && request.hdr.getType() == OpCode.closeSession ) {
            ServerCnxnFactory scxn = zks.getServerCnxnFactory();
             if  (scxn !=  null  && request.cnxn ==  null  ) {
                scxn.closeSession(request. sessionId);
                 return  ;
            }
        }
         //如果request的 cnxn为null, 则直接return
         if  (request.cnxn ==  null ) {
             return  ;
        }
  
         //下面是构造response
        ServerCnxn cnxn = request. cnxn;
  
        String lastOp =  "NA"  ;
        zks.decInProcess();
        Code err = Code . OK;
        Record rsp =  null ;
         boolean  closeSession =  false ;
         try  {
             if  (request.hdr !=  null  && request.hdr.getType() == OpCode.error) {
                 throw  KeeperException.create( KeeperException.Code. get( (
                        (ErrorTxn) request. txn) .getErr()));
            }
  
            KeeperException ke = request.getException();
             if  (ke !=  null  && request.type != OpCode. multi) {
                 throw  ke;
            }
  
             if  (LOG .isDebugEnabled()) {
                LOG.debug(  "{}"  ,request);
            }
             switch  (request.type ) {
            ......
             case  OpCode.setData: {
                lastOp =  "SETD"  ;
                 //构建SetDataResponse
                rsp =  new  SetDataResponse( rc.stat);
                err = Code. get(rc .err);
                 break  ;
            }
            ......
        }  catch  (SessionMovedException e) {
            cnxn.sendCloseSession();
             return  ;
        }  catch  (KeeperException e) {
             //如果有KeeperException,则设置err
            err = e.code();
        }  catch  (Exception e) {
             // log at error level as we are returning a marshalling
             // error to the user
            LOG.error(   "Failed to process "  + request, e);
            StringBuilder sb =  new  StringBuilder();
            ByteBuffer bb = request. request;
            bb.rewind();
             while  (bb.hasRemaining()) {
                sb.append(Integer. toHexString(bb.get() & 0xff));
            }
            LOG.error(   "Dumping request buffer: 0x"   + sb.toString());
            err = Code. MARSHALLINGERROR;
        }
         //读取最后 zxid
         long  lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
        ReplyHeader hdr =
             new  ReplyHeader(request. cxid, lastZxid, err.intValue());
  
        zks.serverStats().updateLatency(request.createTime);
        cnxn.updateStatsForResponse(request. cxid, lastZxid, lastOp,
                    request. createTime, System.currentTimeMillis());
  
         try  {
             //发送Response给客户端
            cnxn.sendResponse(hdr, rsp,  "response"  );
             if  (closeSession) {
                cnxn.sendCloseSession();
            }
        }  catch  (IOException e) {
            LOG.error(   "FIXMSG"  ,e);
        }
     }