zookeeper源码分析之四服务端(单机)处理请求流程

时间:2022-05-22 15:28:43

上文:

zookeeper源码分析之一服务端启动过程

中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程:

前文可以看到在

1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息:

    public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors(); registerJMX(); state = State.RUNNING;
notifyAll();
}

消息处理器的调用如下:

    protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}

我们看到启动两个消息处理器来处理请求:第一个同步消息处理器预消息服务器,最后一个同步请求处理器和异步请求处理器。

  1.1  第一个消息服务器处理器预消息服务器PrepRequestProcessor

  

 @Override
public void run() {
try {
while (true) {
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
pRequest(request);
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
LOG.info(e.getCause().getMessage());
}
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}

可以看到,while(true)是一个一直循环处理的过程,其中红色的部分为处理的主体。

/**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
*
* @param request
*/
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null); try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
case OpCode.deleteContainer:
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch(IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null; //Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest); for(Op op: multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn; /* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} /* Prep the request and convert to a Txn */
else {
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = request.getHdr().getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue()); LOG.info("Got user-level KeeperException when processing "
+ request.toString() + " aborting remaining multi ops."
+ " Error Path:" + e.getPath()
+ " Error:" + e.getMessage()); request.setException(e); /* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
} //FIXME: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request") ;
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); txns.add(new Txn(type, bb.array()));
} request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
request.setTxn(new MultiTxn(txns)); break; //create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request,
null, true);
}
break; //All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.checkWatches:
case OpCode.removeWatches:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
}
LOG.info("Got user-level KeeperException when processing "
+ request.toString()
+ " Error Path:" + e.getPath()
+ " Error:" + e.getMessage());
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process " + request, e); StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if(bb != null){
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
} LOG.error("Dumping request buffer: 0x" + sb.toString());
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}

排除异常的逻辑,该方法是处理不同类型的request,根据type选择一个处理分支,ProcessRequestThread内部调用该方法,它是单例的,因此只有一个单线程调用此代码。以create请求为例(红色部分),了解工作机制:

                CreateRequest createRequest = (CreateRequest)record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
}
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
validateCreateRequest(createMode, request);
String path = createRequest.getPath();
String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, createRequest.getAcl());
ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
int parentCVersion = parentRecord.stat.getCversion();
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);
try {
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
boolean ephemeralParent = (parentRecord.stat.getEphemeralOwner() != 0) &&
(parentRecord.stat.getEphemeralOwner() != DataTree.CONTAINER_EPHEMERAL_OWNER);
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
int newCversion = parentRecord.stat.getCversion()+1;
if (type == OpCode.createContainer) {
request.setTxn(new CreateContainerTxn(path, createRequest.getData(), listACL, newCversion));
} else {
request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(),
newCversion));
}
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
break;

调用方法,处理变化:

    private void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
}
}

继续向下

    private void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
}
}

其中:outstandingChanges 是一组ChangeRecord,outstandingChangesForPath是map的ChangeRecord,如下定义:

final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
// this data structure must be accessed under the outstandingChanges lock
final HashMap<String, ChangeRecord> outstandingChangesForPath =
new HashMap<String, ChangeRecord>();

ChangeRecord是一个数据结构,方便PrepRP和FinalRp共享信息。

        ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,
List<ACL> acl) {
this.zxid = zxid;
this.path = path;
this.stat = stat;
this.childCount = childCount;
this.acl = acl;
}

  1.2 先看一下同步请求处理器FinalRequestProcessor,这个请求处理器实际上应用到一个请求的所有事务,针对任何查询提供服务。它通常处于请求处理的最后(不会有下一个消息处理器),故此得名。 它是如何处理请求呢?

public void processRequest(Request request) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
// Need to process local session requests
rc = zks.processTxn(request); // request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (request.getHdr() != null) {
TxnHeader hdr = request.getHdr();
Record txn = request.getTxn();
long zxid = hdr.getZxid();
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < zxid) {
LOG.warn("Zxid outstanding " + cr.zxid
+ " is less than current " + zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
} // do not add non quorum packets to the queue.
if (request.isQuorum()) {
zks.getZKDatabase().addCommittedProposal(request);
}
} // ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
// was not being queued — ZOOKEEPER-558) properly. This happens, for example,
// when the client closes the connection. The server should still close the session, though.
// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
// We need to check if we can close the session id.
// Sometimes the corresponding ServerCnxnFactory could be null because
// we are just playing diffs from the leader.
if (closeSession(zks.serverCnxnFactory, request.sessionId) ||
closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
return;
}
} if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn; String lastOp = "NA";
zks.decInProcess();
Code err = Code.OK;
Record rsp = null;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
/*
* When local session upgrading is disabled, leader will
* reject the ephemeral node creation due to session expire.
* However, if this is the follower that issue the request,
* it will have the correct error code, so we should use that
* and report to user
*/
if (request.getException() != null) {
throw request.getException();
} else {
throw KeeperException.create(KeeperException.Code
.get(((ErrorTxn) request.getTxn()).getErr()));
}
} KeeperException ke = request.getException();
if (ke != null && request.type != OpCode.multi) {
throw ke;
} if (LOG.isDebugEnabled()) {
LOG.debug("{}",request);
}
switch (request.type) {
case OpCode.ping: {
zks.serverStats().updateLatency(request.createTime); lastOp = "PING";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, Time.currentElapsedTime()); cnxn.sendResponse(new ReplyHeader(-2,
zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
return;
}
case OpCode.createSession: {
zks.serverStats().updateLatency(request.createTime); lastOp = "SESS";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, Time.currentElapsedTime()); zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi: {
lastOp = "MULT";
rsp = new MultiResponse() ; for (ProcessTxnResult subTxnResult : rc.multiResult) { OpResult subResult ; switch (subTxnResult.type) {
case OpCode.check:
subResult = new CheckResult();
break;
case OpCode.create:
subResult = new CreateResult(subTxnResult.path);
break;
case OpCode.create2:
case OpCode.createContainer:
subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
break;
case OpCode.delete:
case OpCode.deleteContainer:
subResult = new DeleteResult();
break;
case OpCode.setData:
subResult = new SetDataResult(subTxnResult.stat);
break;
case OpCode.error:
subResult = new ErrorResult(subTxnResult.err) ;
break;
default:
throw new IOException("Invalid type of op");
} ((MultiResponse)rsp).add(subResult);
} break;
}
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
break;
}
case OpCode.create2:
case OpCode.createContainer: {
lastOp = "CREA";
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
err = Code.get(rc.err);
break;
}
case OpCode.setData: {
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.reconfig: {
lastOp = "RECO";
rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.setACL: {
lastOp = "SETA";
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.closeSession: {
lastOp = "CLOS";
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
break;
}
case OpCode.check: {
lastOp = "CHEC";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
break;
}
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclL;
synchronized(n) {
aclL = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// XXX We really should NOT need this!!!!
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(), cnxn);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getACLRequest);
Stat stat = new Stat();
List<ACL> acl =
zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
rsp = new GetACLResponse(acl, stat);
break;
}
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildrenRequest);
DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclG;
synchronized(n) {
aclG = n.acl; }
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
ZooDefs.Perms.READ,
request.authInfo);
List<String> children = zks.getZKDatabase().getChildren(
getChildrenRequest.getPath(), null, getChildrenRequest
.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
break;
}
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildren2Request);
Stat stat = new Stat();
DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclG;
synchronized(n) {
aclG = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
ZooDefs.Perms.READ,
request.authInfo);
List<String> children = zks.getZKDatabase().getChildren(
getChildren2Request.getPath(), stat, getChildren2Request
.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
break;
}
case OpCode.checkWatches: {
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
boolean containsWatcher = zks.getZKDatabase().containsWatcher(
checkWatches.getPath(), type, cnxn);
if (!containsWatcher) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
new Object[] { checkWatches.getPath(), type });
throw new KeeperException.NoWatcherException(msg);
}
break;
}
case OpCode.removeWatches: {
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
boolean removed = zks.getZKDatabase().removeWatch(
removeWatches.getPath(), type, cnxn);
if (!removed) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
new Object[] { removeWatches.getPath(), type });
throw new KeeperException.NoWatcherException(msg);
}
break;
}
}
} catch (SessionMovedException e) {
// session moved is a connection level error, we need to tear
// down the connection otw ZOOKEEPER-710 might happen
// ie client on slow follower starts to renew session, fails
// before this completes, then tries the fast follower (leader)
// and is successful, however the initial renew is then
// successfully fwd/processed by the leader and as a result
// the client and leader disagree on where the client is most
// recently attached (and therefore invalid SESSION MOVED generated)
cnxn.sendCloseSession();
return;
} catch (KeeperException e) {
err = e.code();
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process " + request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.error("Dumping request buffer: 0x" + sb.toString());
err = Code.MARSHALLINGERROR;
} long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
ReplyHeader hdr =
new ReplyHeader(request.cxid, lastZxid, err.intValue()); zks.serverStats().updateLatency(request.createTime);
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
request.createTime, Time.currentElapsedTime()); try {
cnxn.sendResponse(hdr, rsp, "response");
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
}

  第一步,根据共享的outstandingChanges,

先处理事务后处理session:

private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
Record txn) {
ProcessTxnResult rc;
int opCode = request != null ? request.type : hdr.getType();
long sessionId = request != null ? request.sessionId : hdr.getClientId();
if (hdr != null) {
rc = getZKDatabase().processTxn(hdr, txn);
} else {
rc = new ProcessTxnResult();
}
if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
} else if (request != null && request.isLocalSession()) {
request.request.rewind();
int timeout = request.request.getInt();
request.request.rewind();
sessionTracker.addSession(request.sessionId, timeout);
} else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}

处理事务,本地和数据库的不同分支, DataTree创建节点

                    CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime(), null);
break;

新增一个节点的逻辑是:

 /**
* Add a new node to the DataTree.
* @param path
* Path for the new node.
* @param data
* Data to store in the node.
* @param acl
* Node acls
* @param ephemeralOwner
* the session id that owns this node. -1 indicates this is not
* an ephemeral node.
* @param zxid
* Transaction ID
* @param time
* @param outputStat
* A Stat object to store Stat output results into.
* @throws NodeExistsException
* @throws NoNodeException
* @throws KeeperException
*/
public void createNode(final String path, byte data[], List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)
throws KeeperException.NoNodeException,
KeeperException.NodeExistsException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
Set<String> children = parent.getChildren();
if (children != null && children.contains(childName)) {
throw new KeeperException.NodeExistsException();
} if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
}
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
Long longval = convertAcls(acl);
DataNode child = new DataNode(data, longval, stat);
parent.addChild(childName);
nodes.put(path, child);
if (ephemeralOwner == CONTAINER_EPHEMERAL_OWNER) {
containers.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
if (outputStat != null) {
child.copyStat(outputStat);
}
}
// now check if its one of the zookeeper node child
if (parentName.startsWith(quotaZookeeper)) {
// now check if its the limit node
if (Quotas.limitNode.equals(childName)) {
// this is the limit node
// get the parent and add it to the trie
pTrie.addPath(parentName.substring(quotaZookeeper.length()));
}
if (Quotas.statNode.equals(childName)) {
updateQuotaForPath(parentName
.substring(quotaZookeeper.length()));
}
}
// also check to update the quotas for this node
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
// ok we have some match and need to update
updateCount(lastPrefix, 1);
updateBytes(lastPrefix, data == null ? 0 : data.length);
}
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
}

最后的逻辑是触发创建节点和子节点改变事件。

    Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}

WatcherManager调用定义的watcher进行事件处理。

  1.3. 再看异步消息处理器SyncRequestProcessor

@Override
public void run() {
try {
int logCount = 0; // we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}

  异步处理日志和快照,启动ZooKeeperThread线程来生成快照。

    public void takeSnapshot(){
try {
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
} catch (IOException e) {
LOG.error("Severe unrecoverable error, exiting", e);
// This is a severe error that we cannot recover from,
// so we need to exit
System.exit(10);
}
}

FileTxnSnapLog是个工具类,帮助处理txtlog和snapshot。

 /**
* save the datatree and the sessions into a snapshot
* @param dataTree the datatree to be serialized onto disk
* @param sessionsWithTimeouts the sesssion timeouts to be
* serialized onto disk
* @throws IOException
*/
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); }

持久化为文件

    /**
* serialize the datatree and session into the file snapshot
* @param dt the datatree to be serialized
* @param sessions the sessions to be serialized
* @param snapShot the file to store snapshot into
*/
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
throws IOException {
if (!close) {
OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
serialize(dt,sessions,oa, header);
long val = crcOut.getChecksum().getValue();
oa.writeLong(val, "val");
oa.writeString("/", "path");
sessOS.flush();
crcOut.close();
sessOS.close();
}
}

至此,整个流程已经走完。

  2. 集群情况下

 集群情况和单机略有不同,集群中使用QuorumPeer来启动ServerCnxnFactory,绑定本地地址

    @Override
public void start() {
LOG.info("binding to port " + localAddress);
parentChannel = bootstrap.bind(localAddress);
}

限于篇幅,后面的逻辑将在下篇中详细描述。

 

小结

  从上面的代码流程中,我们可以看出服务器处理请求要么通过Noi要不通过框架Netty来处理请求,请求通过先通过PrepRequestProcessor接收请求,并进行包装,然后请求类型的不同,设置同享数据;然后通过SyncRequestProcessor来序列化快照和事务日志,并根据命令类型改变db的内容,在日志和快照没有写入前不会进行下一个消息处理器;最后调用FinalRequestProcessor来作为消息处理器的终结者,发送响应消息,并触发watcher的处理程序 。