zookeeper源码分析之五服务端(集群leader)处理请求流程

时间:2023-12-16 14:38:08

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer。它规定了请求到达leader时需要经历的路径:

PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor

具体情况可以参看代码:

@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); setupContainerManager();
}

让我们一步步分析这些RP都做了什么工作?其中PrepRequestProcessor、FinalRequestProcessor已经在上篇文章中做了分析:

zookeeper源码分析之四服务端(单机)处理请求流程

那我们就开始余下的RP吧

1. ProposalRequestProcessor

这个RP仅仅将请求转发到AckRequestProcessor和SyncRequestProcessor上,看具体代码:

public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop"); /* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/ if (request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
nextProcessor.processRequest(request);
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request);
}
}
}

SyncRequestProcessor 我们已经在上文中进行了分析,这里就不在赘述了,那就看看AckRequestProcessor的工作是什么吧?

AckRequestProcessor仅仅将发送过来的请求作为ACk转发给leader。代码见明细:

    /**
* Forward the request as an ACK to the leader
*/
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}

leader处理请求如下所示:

 /**
* Keep a count of acks that are received by the leader for a particular
* proposal
*
* @param zxid, the zxid of the proposal sent out
* @param sid, the id of the server that sent the ack
* @param followerAddr
*/
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (!allowedToCommit) return; // last op committed was a leader change - from now on
// the new leader should commit
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}",
Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
} if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack with this method. However,
* the learner sends an ack back to the leader after it gets
* UPTODATE, so we just ignore the message.
*/
return;
} if (outstandingProposals.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
}
return;
}
if (lastCommitted >= zxid) {
if (LOG.isDebugEnabled()) {
LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted), Long.toHexString(zxid));
}
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
Long.toHexString(zxid), followerAddr);
return;
} p.addAck(sid);
/*if (LOG.isDebugEnabled()) {
LOG.debug("Count for zxid: 0x{} is {}",
Long.toHexString(zxid), p.ackSet.size());
}*/ boolean hasCommitted = tryToCommit(p, zxid, followerAddr); // If p is a reconfiguration, multiple other operations may be ready to be committed,
// since operations wait for different sets of acks.
// Currently we only permit one outstanding reconfiguration at a time
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so its not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
// ops may already have enough acks and can be committed, which is what this code does. if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p!=null){
curZxid++;
p = outstandingProposals.get(curZxid);
if (p !=null) hasCommitted = tryToCommit(p, curZxid, null);
}
}
}

调用实现,最终由CommitProcessor 接着处理请求:

 /**
* @return True if committed, otherwise false.
* @param a proposal p
**/
synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
// make sure that ops are committed in order. With reconfigurations it is now possible
// that different operations wait for different sets of acks, and we still want to enforce
// that they are committed in order. Currently we only permit one outstanding reconfiguration
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
// pending all wait for a quorum of old and new config, so its not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen.
if (outstandingProposals.containsKey(zxid - 1)) return false; // getting a quorum from all necessary configurations
if (!p.hasAllQuorums()) {
return false;
} // commit proposals in order
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
+ " from " + followerAddr + " not first!");
LOG.warn("First is "
+ (lastCommitted+1));
} // in order to be committed, a proposal must be accepted by a quorum outstandingProposals.remove(zxid); if (p.request != null) {
toBeApplied.add(p);
} if (p.request == null) {
LOG.warn("Going to commmit null: " + p);
} else if (p.request.getHdr().getType() == OpCode.reconfig) {
LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); //if this server is voter in new config with the same quorum address,
//then it will remain the leader
//otherwise an up-to-date follower will be designated as leader. This saves
//leader election time, unless the designated leader fails
Long designatedLeader = getDesignatedLeader(p, zxid);
//LOG.warn("designated leader is: " + designatedLeader); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier(); self.processReconfig(newQV, designatedLeader, zk.getZxid(), true); if (designatedLeader != self.getId()) {
allowedToCommit = false;
} // we're sending the designated leader, and if the leader is changing the followers are
// responsible for closing the connection - this way we are sure that at least a majority of them
// receive the commit message.
commitAndActivate(zxid, designatedLeader);
informAndActivate(p, designatedLeader);
//turnOffFollowers();
} else {
commit(zxid);
inform(p);
}
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
} return true;
}

该程序第一步是发送一个请求到Quorum的所有成员

    /**
* Create a commit packet and send it to all the members of the quorum
*
* @param zxid
*/
public void commit(long zxid) {
synchronized(this){
lastCommitted = zxid;
}
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
}

发送报文如下:

    /**
* send a packet to all the followers ready to follow
*
* @param qp
* the packet to be sent
*/
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}

第二步是通知Observer

    /**
* Create an inform packet and send it to all observers.
* @param zxid
* @param proposal
*/
public void inform(Proposal proposal) {
QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
proposal.packet.getData(), null);
sendObserverPacket(qp);
}

发送observer程序如下:

    /**
* send a packet to all observers
*/
void sendObserverPacket(QuorumPacket qp) {
for (LearnerHandler f : getObservingLearners()) {
f.queuePacket(qp);
}
}

第三步到

 zk.commitProcessor.commit(p.request);

2. CommitProcessor

CommitProcessor是多线程的,线程间通信通过queue,atomic和wait/notifyAll同步。CommitProcessor扮演一个网关角色,允许请求到剩下的处理管道。在同一瞬间,它支持多个读请求而仅支持一个写请求,这是为了保证写请求在事务中的顺序。

1个commit处理主线程,它监控请求队列,并将请求分发到工作线程,分发过程基于sessionId,这样特定session的读写请求通常分发到同一个线程,因而可以保证运行的顺序。

  0~N个工作进程,他们在请求上运行剩下的请求处理管道。如果配置为0个工作线程,主commit线程将会直接运行管道。

  经典(默认)线程数是:在32核的机器上,一个commit处理线程和32个工作线程。

多线程的限制:

  每个session的请求处理必须是顺序的。

  写请求处理必须按照zxid顺序。

  必须保证一个session内不会出现写条件竞争,条件竞争可能导致另外一个session的读请求触发监控。

当前实现解决第三个限制,仅仅通过不允许在写请求时允许读进程的处理。

 @Override
public void run() {
Request request;
try {
while (!stopped) {
synchronized(this) {
while (
!stopped &&
((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&
(committedRequests.isEmpty() || isProcessingRequest()))) {
wait();
}
} /*
* Processing queuedRequests: Process the next requests until we
* find one for which we need to wait for a commit. We cannot
* process a read request while we are processing write request.
*/
while (!stopped && !isWaitingForCommit() &&
!isProcessingCommit() &&
(request = queuedRequests.poll()) != null) {
if (needCommit(request)) {
nextPending.set(request);
} else {
sendToNextProcessor(request);
}
} /*
* Processing committedRequests: check and see if the commit
* came in for the pending request. We can only commit a
* request when there is no other request being processed.
*/
processCommitted();
}
} catch (Throwable e) {
handleException(this.getName(), e);
}
LOG.info("CommitProcessor exited loop!");
}

主逻辑程序如下:

 /*
* Separated this method from the main run loop
* for test purposes (ZOOKEEPER-1863)
*/
protected void processCommitted() {
Request request; if (!stopped && !isProcessingRequest() &&
(committedRequests.peek() != null)) { /*
* ZOOKEEPER-1863: continue only if there is no new request
* waiting in queuedRequests or it is waiting for a
* commit.
*/
if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
return;
}
request = committedRequests.poll(); /*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
Request pending = nextPending.get();
if (pending != null &&
pending.sessionId == request.sessionId &&
pending.cxid == request.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
pending.setHdr(request.getHdr());
pending.setTxn(request.getTxn());
pending.zxid = request.zxid;
// Set currentlyCommitting so we will block until this
// completes. Cleared by CommitWorkRequest after
// nextProcessor returns.
currentlyCommitting.set(pending);
nextPending.set(null);
sendToNextProcessor(pending);
} else {
// this request came from someone else so just
// send the commit packet
currentlyCommitting.set(request);
sendToNextProcessor(request);
}
}
}

启动多线程处理程序

    /**
* Schedule final request processing; if a worker thread pool is not being
* used, processing is done directly by this thread.
*/
private void sendToNextProcessor(Request request) {
numRequestsProcessing.incrementAndGet();
workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
}

真实逻辑是

 /**
* Schedule work to be done by the thread assigned to this id. Thread
* assignment is a single mod operation on the number of threads. If a
* worker thread pool is not being used, work is done directly by
* this thread.
*/
public void schedule(WorkRequest workRequest, long id) {
if (stopped) {
workRequest.cleanup();
return;
} ScheduledWorkRequest scheduledWorkRequest =
new ScheduledWorkRequest(workRequest); // If we have a worker thread pool, use that; otherwise, do the work
// directly.
int size = workers.size();
if (size > 0) {
try {
// make sure to map negative ids as well to [0, size-1]
int workerNum = ((int) (id % size) + size) % size;
ExecutorService worker = workers.get(workerNum);
worker.execute(scheduledWorkRequest);
} catch (RejectedExecutionException e) {
LOG.warn("ExecutorService rejected execution", e);
workRequest.cleanup();
}
} else {
// When there is no worker thread pool, do the work directly
// and wait for its completion
scheduledWorkRequest.start();
try {
scheduledWorkRequest.join();
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
Thread.currentThread().interrupt();
}
}
}

请求处理线程run方法:

       @Override
public void run() {
try {
// Check if stopped while request was on queue
if (stopped) {
workRequest.cleanup();
return;
}
workRequest.doWork();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
workRequest.cleanup();
}
}

调用commitProcessor的doWork方法

        public void doWork() throws RequestProcessorException {
try {
nextProcessor.processRequest(request);
} finally {
// If this request is the commit request that was blocking
// the processor, clear.
currentlyCommitting.compareAndSet(request, null); /*
* Decrement outstanding request count. The processor may be
* blocked at the moment because it is waiting for the pipeline
* to drain. In that case, wake it up if there are pending
* requests.
*/
if (numRequestsProcessing.decrementAndGet() == 0) {
if (!queuedRequests.isEmpty() ||
!committedRequests.isEmpty()) {
wakeup();
}
}
}
}

将请求传递给下一个RP:Leader.ToBeAppliedRequestProcessor

3.Leader.ToBeAppliedRequestProcessor

Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表。

 /**
* This request processor simply maintains the toBeApplied list. For
* this to work next must be a FinalRequestProcessor and
* FinalRequestProcessor.processRequest MUST process the request
* synchronously!
*
* @param next
* a reference to the FinalRequestProcessor
*/
ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
if (!(next instanceof FinalRequestProcessor)) {
throw new RuntimeException(ToBeAppliedRequestProcessor.class
.getName()
+ " must be connected to "
+ FinalRequestProcessor.class.getName()
+ " not "
+ next.getClass().getName());
}
this.leader = leader;
this.next = next;
} /*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
*/
public void processRequest(Request request) throws RequestProcessorException {
next.processRequest(request); // The only requests that should be on toBeApplied are write
// requests, for which we will have a hdr. We can't simply use
// request.zxid here because that is set on read requests to equal
// the zxid of the last write op.
if (request.getHdr() != null) {
long zxid = request.getHdr().getZxid();
Iterator<Proposal> iter = leader.toBeApplied.iterator();
if (iter.hasNext()) {
Proposal p = iter.next();
if (p.request != null && p.request.zxid == zxid) {
iter.remove();
return;
}
}
LOG.error("Committed request not found on toBeApplied: "
+ request);
}
}

4. FinalRequestProcessor前文已经说明,本文不在赘述。

小结:从上面的分析可以知道,leader处理请求的顺序分别是:PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor。

请求先通过PrepRequestProcessor接收请求,并进行包装,然后请求类型的不同,设置同享数据;主要负责通知所有follower和observer;CommitProcessor 启动多线程处理请求;Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表;

FinalRequestProcessor来作为消息处理器的终结者,发送响应消息,并触发watcher的处理程序。