前言
我们在前面介绍zookeeper server和client端的相关源码
zookeeper client 启动源码分析
zookeeper server 启动源码分析
现在我们分析下创建一个节点的过程
客户端创建节点请求
我们分析zookeeper.create方法源代码,客户端节点创建请求在这个方法中形成
public void create(
final String path,
byte[] data,
List<ACL> acl,
CreateMode createMode,
Create2Callback cb,
Object ctx,
long ttl) {
//path是用户设置的节点路径
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
EphemeralType.validateTTL(createMode, ttl);
//如何设置了chrootPath,那么根据chrootPath和clientPath形成最终节点在服务端的路径
final String serverPath = prependChroot(clientPath);
//生成请求头
RequestHeader h = new RequestHeader();
//设置请求头的type信息,请求头不同的type信息表上不同的请求,将来服务端就是根据这个type信息来决定做如何处理
setCreateHeader(createMode, h);
//生成返回消息头对象
ReplyHeader r = new ReplyHeader();
//创建返回响应对象
Create2Response response = new Create2Response();
//创建请求消息体,下面解析
Record record = makeCreateRecord(createMode, serverPath, data, acl, ttl);
//通过客户端端连接对象把请求消息发送出去
cnxn.queuePacket(h, r, record, response, cb, clientPath, serverPath, ctx, null);
}
makeCreateRecord
创建新建节点请求体
private Record makeCreateRecord(CreateMode createMode, String serverPath, byte[] data, List<ACL> acl, long ttl) {
Record record;
if (createMode.isTTL()) {
//下面是新建TTL类型的节点操作逻辑
CreateTTLRequest request = new CreateTTLRequest();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
request.setAcl(acl);
request.setTtl(ttl);
record = request;
} else {
//CreateRequest就是创建一个新建节点请求
CreateRequest request = new CreateRequest();
//设置节点数据
request.setData(data);
//设置节点的类型(持久化,顺序节点,container...)
request.setFlags(createMode.toFlag());
//设置节点的路径
request.setPath(serverPath);
//设置节点的acl
request.setAcl(acl)
record = request;
}
return record;
}
我们继续分析客户端如何将请求发送服务端
cnxn.queuePacket
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Record request,
Record response,
AsyncCallback cb,
String clientPath,
String serverPath,
Object ctx,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
//生成新建节点请求Packet对象
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
//如果客户端和服务端连接已经关闭,那么通知客户具体情况
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//把请求消息放入到outgoingQueue中等待发送到服务端
outgoingQueue.add(packet);
}
}
//叫醒sendThread,让其从outgoingQueue继续处理消息
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
当sendThread从selector.select醒来之后,会执行doIO操作,在doIO中新建节点请求对象packet会被设置xid,然后会被转化成ByteBuffer对象通过socket发送给服务端
client_process.png
服务端的处理
服务端的处理流程如下图
SelectorThread.handleIO方法我们已经在zookeeper server 启动源码分析中解析了,接下来我们就一步一步的解析每一个流程的源代码
IOWorkRequest.doWork
当客户端发送的请求到达服务端的时候,SelectorThread会获取到对应的事件监听 selectionKey,selectionKey会被包装成IOWorkRequest提交给io worker线程池,线程池会分配一个线程给对应的IOWorkRequest用来执行IOWorkRequest的doWork方法,我们先分析下doWork方法
public void doWork() throws InterruptedException {
if (!key.isValid()) {
//如果selectionKey不合法,那么把它cancel掉
selectorThread.cleanupSelectionKey(key);
return;
}
//如果是读写类型的io那么执行doIO
if (key.isReadable() || key.isWritable()) {
cnxn.doIO(key);
// Check if we shutdown or doIO() closed this connection
if (stopped) {
//如果服务端停止了,那么通过close方法来处理关闭底层的socket,处理应用层的会话等等
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
return;
}
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
//更新连接的最新状态,这个状态会被ConnectionManager使用
touchCnxn(cnxn);
}
// Mark this connection as once again ready for selection
//设置连接状态为可读
cnxn.enableSelectable();
// Push an update request on the queue to resume selecting
// on the current set of interest ops, which may have changed
// as a result of the I/O operations we just performed.
//这里就是唤醒selectorThread,可以继续干活了
if (!selectorThread.addInterestOpsUpdateRequest(key)) {
cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
}
}
NioServerCnxn.doIO
void doIO(SelectionKey k) throws InterruptedException {
try {
if (!isSocketOpen()) {
LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));
return;
}
if (k.isReadable()) {
//处理读io事件
int rc = sock.read(incomingBuffer);
if (rc < 0) {
handleFailedRead();
}
//如果incomingBuffer已经读满了数据
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
//incomingBuffer这个时候存储的是请求消息的长度
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
if (isPayload) { // not the case for 4letterword
//读取请求消息体
readPayload();
} else {
// four letter words take care
// need not do anything else
return;
}
}
}
if (k.isWritable()) {
//处理写事件,后面在response返回给client端会详细解析
handleWrite(k);
if (!initialized && !getReadInterest() && !getWriteInterest()) {
throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);
}
}
} catch (CancelledKeyException e) {
LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));
LOG.debug("CancelledKeyException stack trace", e);
close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
} catch (CloseRequestException e) {
// expecting close to log session closure
close();
} catch (EndOfStreamException e) {
LOG.warn("Unexpected exception", e);
// expecting close to log session closure
close(e.getReason());
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);
close(DisconnectReason.CLIENT_CNX_LIMIT);
} catch (IOException e) {
LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);
close(DisconnectReason.IO_EXCEPTION);
}
}
NIOServerCnxn.readPayload
从socket中读取请求的消息
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
//incomingBuffer没有读满,那么继续读
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
handleFailedRead();
}
}
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
incomingBuffer.flip();
packetReceived(4 + incomingBuffer.remaining());
if (!initialized) {
//如果连接还没完成,那么说明这个请求是一个会话创建请求
readConnectRequest();
} else {
//读取请求消息体
readRequest();
}
//重置lenBuffer,incomingBuffer为下次读取数据做准备
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
接下来就进入请求消息的处理过程了
NIOServerCnxn.readRequest会直接调用zookeeperServer.processPacket
ZookeeperServer.processPacket
用来把请求消息流转化成请求对象
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
//根据incomingBuffer创建输入流对象
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
//首先反序列化请求头
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
//
// Be aware that we're actually checking the global outstanding
// request before this request.
//
// It's fine if the IOException thrown before we decrease the count
// in cnxn, since it will close the cnxn anyway.
//zk事物处理链具有控制流量的功能,这个地方就是增加正在处理的事物数和检查是不是需要限流
cnxn.incrOutstandingAndCheckThrottle(h);
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();‘
//下面就是根据请求头中的请求类型来做不同的处理逻辑
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
//消息体反序列化成认证Packet
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
//根据scheme获取对应的服务器认证类
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
// a different server to connect to.
//进行认证
authReturn = ap.handleAuthentication(
new ServerAuthenticationProvider.ServerObjs(this, cnxn),
authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
//认证成功,返回认证成功结果给客户端
LOG.debug("Authentication succeeded for scheme: {}", scheme);
LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn(
"No authentication provider for scheme: {} has {}",
scheme,
ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: {}", scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
//认证失败,发送closeConn消息来关闭连接
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
//认证失败,服务端连接不再接受客户端的任何请求
cnxn.disableRecv();
}
return;
} else if (h.getType() == OpCode.sasl) {
processSasl(incomingBuffer, cnxn, h);
} else {
if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
cnxn.sendResponse(replyHeader, null, "response");
cnxn.sendCloseSession();
cnxn.disableRecv();
} else {
//创建请求消息对象request
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
int length = incomingBuffer.limit();
//如果请求特别的大(用户可以设置一个阈值,用来指定什么是大的请求)
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
//根据系统配置的能容忍的最大的最大请求数据量(100k),来决定是不是需要拒绝当前的大请求
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
//设置请求的owner
si.setOwner(ServerCnxn.me);
//提交请求给到requestThrottler
submitRequest(si);
}
}
}
上面经过processPacket的处理之后,请求对象会进入到请求处理链进行处理,对于单机版本的zookeeper而言,所有的事物请求都会被请求处理链处理,zookeeper server 启动源码分析中对此有简短的分析,
那么one by one 的来分析链上的这些处理器
RequestThrottler
作为处理器链上的第一个处理器它的作用是用来控制zookeeper处理事物的数量从而达到限流的目的包装服务端不会过载,它作为一个单独的线程运行,因此我们看下的run 方法
public void run() {
try {
while (true) {
if (killed) {
break;
}
//从请求队列中取得一个请求
Request request = submittedRequests.take();
if (Request.requestOfDeath == request) {
break;
}
if (request.mustDrop()) {
continue;
}
// Throttling is disabled when maxRequests = 0
if (maxRequests > 0) {
while (!killed) {
if (dropStaleRequests && request.isStale()) {
// Note: this will close the connection
dropRequest(request);
ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1);
request = null;
break;
}
//没有达到限流的条件
if (zks.getInProcess() < maxRequests) {
break;
}
//如果目前在处理链上正在处理的请求数大于maxRequests,RequestThrottler休眠一会
throttleSleep(stallTime);
}
}
//线程收到了停止工作的要求,所以退出
if (killed) {
break;
}
// A dropped stale request will be null
if (request != null) {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
}
//把请求提交给处理链上的下一个处理器FirstRequestProcessor处理
zks.submitRequestNow(request);
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected interruption", e);
}
int dropped = drainQueue();
LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
}
PreRequestProcessor
作为请求处理链上第一个做业务逻辑的处理器,它的内部发生了什么呢?
这个处理器也是作为一个单独的线程运行所以我们看下的它的run方法
public void run() {
LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
try {
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
//取得一个请求
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
request.prepStartTime = Time.currentElapsedTime();
//pRequest是PreRequestProcessor逻辑处理的核心
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
pRequest
pRequest代码很长,主要就是把request根据请求头的类型转化成具体的请求,我们主要分析新建节点的请求
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null);
try {
//根据请求的类型做不同的处理
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
//创建新建节点请求对象
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = new CreateTTLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
break;
case OpCode.deleteContainer:
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiOperationRecord multiRequest = new MultiOperationRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;
//Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
for (Op op : multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn;
/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} else {
/* Prep the request and convert to a Txn */
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = op.getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());
if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting"
+ " remaining multi ops. Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}
// TODO: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
txns.add(new Txn(type, bb.array()));
}
}
request.setTxn(new MultiTxn(txns));
if (digestEnabled) {
setTxnDigest(request);
}
break;
//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
}
break;
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getAllChildrenNumber:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.setWatches2:
case OpCode.checkWatches:
case OpCode.removeWatches:
case OpCode.getEphemerals:
case OpCode.multiRead:
case OpCode.addWatch:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
LOG.warn("unknown type {}", request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
}
if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info(
"Got user-level KeeperException when processing {} Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if (bb != null) {
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
}
LOG.error("Dumping request buffer: 0x{}", sb.toString());
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
//别忘这里还有:设置请求的zxid
request.zxid = zks.getZxid();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
//交给下一个处理器SyncRequestProcessor处理
nextProcessor.processRequest(request);
}
我们分析下对于新建节点来说request是如何转化成新建节点请求的,上面忘记说了,具体的请求类型被转化后存在Request对象的record属性中
pRequest2Txn--》pRequest2TxnCreate
pRequest2Txn根据请求类型做不同的逻辑处理,因为代码比较长我们就不分析了,搞懂了pRequest2TxnCreate,pRequest2Txn中的代码自然也能很好的理解。我们分析pRequest2TxnCreate的源代码,这个方法是用来处理新建新节点请求
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
if (deserialize) {
//把ByteBuffer反序列化成具体的请求对象record
ByteBufferInputStream.byteBuffer2Record(request.request, record);
}
int flags;
String path;
List<ACL> acl;
byte[] data;
long ttl;
if (type == OpCode.createTTL) {
//createTTL类型的请求
CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
flags = createTtlRequest.getFlags();
path = createTtlRequest.getPath();
acl = createTtlRequest.getAcl();
data = createTtlRequest.getData();
ttl = createTtlRequest.getTtl();
} else {
//新建节点类型的请求
CreateRequest createRequest = (CreateRequest) record;
//下面设置新建节点的几个参数(flags:节点类型,path:节点路径,acl:节点的访问控制,data:节点的值)
flags = createRequest.getFlags();
path = createRequest.getPath();
acl = createRequest.getAcl();
data = createRequest.getData();
ttl = -1;
}
CreateMode createMode = CreateMode.fromFlag(flags);
validateCreateRequest(path, createMode, request, ttl);
//获取父节点的路径
String parentPath = validatePathForCreate(path, request.sessionId);
//获取acl信息
List<ACL> listACL = fixupACL(path, request.authInfo, acl);
//获取父节点的状态信息,ChangeRecord是事物处理链用来跟踪节点状态信息的对象,在事物处理完成之后,对应节点以及父节点的ChangeRecord也会被清理掉
ChangeRecord parentRecord = getRecordForPath(parentPath);
//检查父节点设置的acl是否允许当前节点去添加
zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
//获取父节点的cversion,父节点的cversion就是zookeeper用来实现顺序节点的关键所在
int parentCVersion = parentRecord.stat.getCversion();
if (createMode.isSequential()) {
//如果是顺序型的节点,那么根据cversion形成新的path的名称
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);
try {
//获取当前新加节点的ChangeRecord,正常来说如果节点不存在抛出NoNodeException,表示可以正常添加,
//如何节点在服务端已经存在,抛出NodeExistsException,告知用户节点不能重复创建
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
//如果父节点是瞬时类型的节点,那么不能添加子节点,给客户端抛出相应的异常
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
//更新父节点的cversion
int newCversion = parentRecord.stat.getCversion() + 1;
if (type == OpCode.createContainer) {
request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
} else if (type == OpCode.createTTL) {
request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
} else {
//生成节点的信息体结构:路径,数据,acl信息,节点属性,cversion
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
}
TxnHeader hdr = request.getHdr();
long ephemeralOwner = 0;
if (createMode.isContainer()) {
ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
} else if (createMode.isTTL()) {
ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
} else if (createMode.isEphemeral()) {
//如果节点类型是瞬时节点,那么设置ephemeralOwner=sessionId
ephemeralOwner = request.sessionId;
}
//设置节点的状态信息
StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
//更新父节点包含的子节点的个数
parentRecord.childCount++;
//更新父节点的cversion
parentRecord.stat.setCversion(newCversion);
//更新子节点的最新事物id
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
//把父节点的更改信息加入到outstandingChanges队列中
addChangeRecord(parentRecord);
//生成本节点的更改信息
ChangeRecord nodeRecord = new ChangeRecord(
request.getHdr().getZxid(), path, s, 0, listACL);
//设置节点的data
nodeRecord.data = data;
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.ADD, path, nodeRecord.data, s);
//生成摘要信息
setTxnDigest(request, nodeRecord.precalculatedDigest);
//把节点的更改信息加入到outstandingChanges队列中
addChangeRecord(nodeRecord);
}
简单总结下对于新建节点请求来说PreRequestProcessor发生了什么?
PreRequestProcessor把用户的请求反序列化成对应的新建节点对象,然后根据子节点的路径获取到父节点的信息,然后更新父节点的状态信息(父节点的子节点个数,父节点的Pzxid)并且加入到outstandingChanges队列中,形成本节点的状态信息并且加入到outstandingChanges中,接下来请求会被继续传递给下一个SyncRequestProcessor处理。
SyncRequestProcessor
SyncRequestProcessor作为一个单独的线程运行,我们分析下它的run方法
public void run() {
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
//为了防止zookeeper所有的server在同时做的快照,SyncRequestProcessor会通过resetSnapshotStats随机生成randRoll和randSize
//randRoll和randSize是zookeeper生成快照的判断条件
resetSnapshotStats();
//初始化lastFlushTime
lastFlushTime = Time.currentElapsedTime();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
//pollTime表示系统多久之后就应该做快照操作了
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
//从消息队列中获取一个Request,如果等待了pollTime都没有获取到,那么直接返回null
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
//下面的注释很棒,解析清楚了这段代码的逻辑
/* We timed out looking for more writes to batch, go ahead and flush immediately */
flush();
//flush之后从queuedRequests获取请求的方式变成take,如果没有请求就一直等待
si = queuedRequests.take();
}
if (si == REQUEST_OF_DEATH) {
//收到了一个停止本线程的请求
break;
}
long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
// track the number of records written to the log
//这里关于事物日志的处理逻辑,我们放在下面详细解析
if (zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
//更新日志的状态,刷到磁盘
zks.getZKDatabase().rollLog();
// take a snapshot
if (!snapThreadMutex.tryAcquire()) {
LOG.warn("Too busy to snap, skipping");
} else {
//启动一个线程来做快照操作
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
snapThreadMutex.release();
}
}
}.start();
}
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
//把请求加入到flush队列
toFlush.add(si);
if (shouldFlush()) {
//下面会详细解析flush
flush();
}
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
} catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
}
zkDatabase.append(si)
zkDatabase.append(si)发生了什么呢
public boolean append(Request si) throws IOException {
//更新内存中事物的数量
txnCount.incrementAndGet();
return this.snapLog.append(si);
}
snapLog.append()
FileTxnSnapLog.append会把请求交给log处理对象FileTxnLog处理
public boolean append(Request si) throws IOException {
return txnLog.append(si.getHdr(), si.getTxn(), si.getTxnDigest());
}
FileTxnLog.append
一番波折终于来到了真正处理日志的地方,下面的代码逻辑会把请求事物加入到log日志中
public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {
if (hdr == null) {
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn(
"Current zxid {} is <= {} for {}",
hdr.getZxid(),
lastZxidSeen,
hdr.getType());
} else {
//设置lastZxidSeen为当前请求的事物id
lastZxidSeen = hdr.getZxid();
}
if (logStream == null) {
LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));
//如果日志输出流不存在,那么创建一个,日志文件的名称为logDir+当前请求事物id
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
//创建文件输出流
fos = new FileOutputStream(logFileWrite);
logStream = new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
//序列化文件头信息到日志流中
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
//文件头信息刷到文件中
logStream.flush();
//设置filePadding的currentSize值,为下面给日志文件提前分配磁盘空间做准备
filePadding.setCurrentSize(fos.getChannel().position());
//把输出流加入到streamsToFlush中,等待将来在合适的时候冲刷到磁盘
streamsToFlush.add(fos);
}
//给新的log文件预先分配磁盘空间,这个知识点我们在https://www.jianshu.com/p/f10ffc0ff861 中有详细的描述,预分配的磁盘大小是64M
filePadding.padFile(fos.getChannel());
//把请求转化成二进制数据
byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " + "and txn");
}
Checksum crc = makeChecksumAlgorithm();
//计算本次事物的crc码,然后写入log
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
//把本次事物写入log文件
Util.writeTxnBytes(oa, buf);
return true;
}
到此事物操作的日志信息已经写入到文件输入流了,这个时候的日志数据还存在内存中,还没有刷新到磁盘,接下来我们分析flush
SyncRequestProcessor.flush
private void flush() throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
}
ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
long flushStartTime = Time.currentElapsedTime();
//事物日志提交到磁盘的方法入口,下面会单独解析
zks.getZKDatabase().commit();
ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
if (this.nextProcessor == null) {
this.toFlush.clear();
} else {
while (!this.toFlush.isEmpty()) {
//从toFlush中取出请求
final Request i = this.toFlush.remove();
long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
//把请求交给下一个processor处理
this.nextProcessor.processRequest(i);
}
if (this.nextProcessor instanceof Flushable) {
((Flushable) this.nextProcessor).flush();
}
lastFlushTime = Time.currentElapsedTime();
}
}
FileTxnLog.commit
ZKDatabase().commit()最终会调用FileTxnLog.commit来实现把事物日志刷新到磁盘
/**
* commit the logs. make sure that everything hits the
* disk
*/
//注释说的很明白,提交事物日志,确保所有日志可以持久化到磁盘
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
//遍历文件输出流
for (FileOutputStream log : streamsToFlush) {
//调用文件输出流flush
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
if (serverStats != null) {
serverStats.incrementFsyncThresholdExceedCount();
}
LOG.warn(
"fsync-ing the write ahead log in {} took {}ms which will adversely effect operation latency."
+ "File size is {} bytes. See the ZooKeeper troubleshooting guide",
Thread.currentThread().getName(),
syncElapsedMS,
channel.size());
}
ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS);
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.poll().close();
}
// Roll the log file if we exceed the size limit
if (txnLogSizeLimit > 0) {
long logSize = getCurrentLogSize();
if (logSize > txnLogSizeLimit) {
LOG.debug("Log size limit reached: {}", logSize);
rollLog();
}
}
}
ZookeeperServer.takeSnapshot
上面分析了事物日志的处理过程,接下分析的是快照发生成的过程,takeSnapshot会调用FileTxnSnapLog.save方法
public void save(
DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap) throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
//根据最新事物id生成新snapshot名字,然后创建新快照的名字
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
try {
//把zookeeper server端内存数据库序列化到snapshot文件中,关于snapLog.serialize方法我们就不做具体解析了,可以参考https://www.jianshu.com/p/f10ffc0ff861文章中的讲解
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
} catch (IOException e) {
if (snapshotFile.length() == 0) {
/* This may be caused by a full disk. In such a case, the server
* will get stuck in a loop where it tries to write a snapshot
* out to disk, and ends up creating an empty file instead.
* Doing so will eventually result in valid snapshots being
* removed during cleanup. */
if (snapshotFile.delete()) {
LOG.info("Deleted empty snapshot file: {}", snapshotFile.getAbsolutePath());
} else {
LOG.warn("Could not delete empty snapshot file: {}", snapshotFile.getAbsolutePath());
}
} else {
/* Something else went wrong when writing the snapshot out to
* disk. If this snapshot file is invalid, when restarting,
* ZooKeeper will skip it, and find the last known good snapshot
* instead. */
}
throw e;
}
}
FinalRequestProcessor
经过SyncRequestProcessor对日志和snapshot的处理,保证了事物的安全性,接下来我们看FinalRequestProcessor中发生了什么,FinalRequestProcessor不是单独的线程,它的processRequest方法会被SyncRequestProcessor调用,该方法巨长,但都是针对不同的请求类型做不同处理逻辑
public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request);
// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
//zookeeper把请求事物应用到内存数据库,下面我们会详细解析
ProcessTxnResult rc = zks.processTxn(request);
// ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
// was not being queued — ZOOKEEPER-558) properly. This happens, for example,
// when the client closes the connection. The server should still close the session, though.
// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
// We need to check if we can close the session id.
// Sometimes the corresponding ServerCnxnFactory could be null because
// we are just playing diffs from the leader.
if (closeSession(zks.serverCnxnFactory, request.sessionId)
|| closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
return;
}
}
if (request.getHdr() != null) {
/*
* Request header is created only by the leader, so this must be
* a quorum request. Since we're comparing timestamps across hosts,
* this metric may be incorrect. However, it's still a very useful
* metric to track in the happy case. If there is clock drift,
* the latency can go negative. Note: headers use wall time, not
* CLOCK_MONOTONIC.
*/
long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
if (propagationLatency >= 0) {
ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
}
}
if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn;
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
String lastOp = "NA";
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits
zks.decInProcess();
zks.requestFinished(request);
Code err = Code.OK;
Record rsp = null;
String path = null;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
AuditHelper.addAuditLog(request, rc, true);
/*
* When local session upgrading is disabled, leader will
* reject the ephemeral node creation due to session expire.
* However, if this is the follower that issue the request,
* it will have the correct error code, so we should use that
* and report to user
*/
if (request.getException() != null) {
throw request.getException();
} else {
throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
}
}
KeeperException ke = request.getException();
if (ke instanceof SessionMovedException) {
throw ke;
}
if (ke != null && request.type != OpCode.multi) {
throw ke;
}
LOG.debug("{}", request);
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REPLIES.add(1);
}
AuditHelper.addAuditLog(request, rc);
switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
updateStats(request, lastOp, lastZxid);
cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
return;
}
case OpCode.createSession: {
lastOp = "SESS";
updateStats(request, lastOp, lastZxid);
zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi: {
lastOp = "MULT";
rsp = new MultiResponse();
for (ProcessTxnResult subTxnResult : rc.multiResult) {
OpResult subResult;
switch (subTxnResult.type) {
case OpCode.check:
subResult = new CheckResult();
break;
case OpCode.create:
subResult = new CreateResult(subTxnResult.path);
break;
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
break;
case OpCode.delete:
case OpCode.deleteContainer:
subResult = new DeleteResult();
break;
case OpCode.setData:
subResult = new SetDataResult(subTxnResult.stat);
break;
case OpCode.error:
subResult = new ErrorResult(subTxnResult.err);
if (subTxnResult.err == Code.SESSIONMOVED.intValue()) {
throw new SessionMovedException();
}
break;
default:
throw new IOException("Invalid type of op");
}
((MultiResponse) rsp).add(subResult);
}
break;
}
case OpCode.multiRead: {
lastOp = "MLTR";
MultiOperationRecord multiReadRecord = new MultiOperationRecord();
ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord) {
try {
Record rec;
switch (readOp.getType()) {
case OpCode.getChildren:
rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
break;
case OpCode.getData:
rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
GetDataResponse gdr = (GetDataResponse) rec;
subResult = new GetDataResult(gdr.getData(), gdr.getStat());
break;
default:
throw new IOException("Invalid type of readOp");
}
} catch (KeeperException e) {
subResult = new ErrorResult(e.code().intValue());
}
((MultiResponse) rsp).add(subResult);
}
break;
}
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
lastOp = "CREA";
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.setData: {
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.reconfig: {
lastOp = "RECO";
rsp = new GetDataResponse(
((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(),
rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.setACL: {
lastOp = "SETA";
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.closeSession: {
lastOp = "CLOS";
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
}
case OpCode.check: {
lastOp = "CHEC";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// TODO we really should not need this
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
.setWatches(
relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
Collections.emptyList(),
Collections.emptyList(),
cnxn);
break;
}
case OpCode.setWatches2: {
lastOp = "STW2";
SetWatches2 setWatches = new SetWatches2();
// TODO we really should not need this
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
setWatches.getPersistentWatches(),
setWatches.getPersistentRecursiveWatches(),
cnxn);
break;
}
case OpCode.addWatch: {
lastOp = "ADDW";
AddWatchRequest addWatcherRequest = new AddWatchRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
addWatcherRequest);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
rsp = new ErrorResponse(0);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path,
null);
Stat stat = new Stat();
List<ACL> acl = zks.getZKDatabase().getACL(path, stat);
requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());
try {
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.ADMIN,
request.authInfo,
path,
null);
rsp = new GetACLResponse(acl, stat);
} catch (KeeperException.NoAuthException e) {
List<ACL> acl1 = new ArrayList<ACL>(acl.size());
for (ACL a : acl) {
if ("digest".equals(a.getId().getScheme())) {
Id id = a.getId();
Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
acl1.add(new ACL(a.getPerms(), id1));
} else {
acl1.add(a);
}
}
rsp = new GetACLResponse(acl1, stat);
}
break;
}
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo,
path,
null);
int number = zks.getZKDatabase().getAllChildrenNumber(path);
rsp = new GetAllChildrenNumberResponse(number);
break;
}
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo, path,
null);
List<String> children = zks.getZKDatabase()
.getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.checkWatches: {
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
if (!containsWatcher) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
break;
}
case OpCode.removeWatches: {
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
if (!removed) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
break;
}
case OpCode.getEphemerals: {
lastOp = "GETE";
GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
List<String> ephemerals = new ArrayList<>();
if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) {
ephemerals.addAll(allEphems);
} else {
for (String p : allEphems) {
if (p.startsWith(prefixPath)) {
ephemerals.add(p);
}
}
}
rsp = new GetEphemeralsResponse(ephemerals);
break;
}
}
} catch (SessionMovedException e) {
// session moved is a connection level error, we need to tear
// down the connection otw ZOOKEEPER-710 might happen
// ie client on slow follower starts to renew session, fails
// before this completes, then tries the fast follower (leader)
// and is successful, however the initial renew is then
// successfully fwd/processed by the leader and as a result
// the client and leader disagree on where the client is most
// recently attached (and therefore invalid SESSION MOVED generated)
cnxn.sendCloseSession();
return;
} catch (KeeperException e) {
err = e.code();
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.error("Dumping request buffer: 0x{}", sb.toString());
err = Code.MARSHALLINGERROR;
}
ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
updateStats(request, lastOp, lastZxid);
try {
//经过上面各种case 最后会到这里,下面的逻辑就是把操作的状态根据不同的操作类型发送给客户端
if (path == null || rsp == null) {
cnxn.sendResponse(hdr, rsp, "response");
} else {
int opCode = request.type;
Stat stat = null;
// Serialized read and get children responses could be cached by the connection
// object. Cache entries are identified by their path and last modified zxid,
// so these values are passed along with the response.
switch (opCode) {
case OpCode.getData : {
GetDataResponse getDataResponse = (GetDataResponse) rsp;
stat = getDataResponse.getStat();
cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
case OpCode.getChildren2 : {
GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;
stat = getChildren2Response.getStat();
cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
default:
cnxn.sendResponse(hdr, rsp, "response");
}
}
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG", e);
}
}
我们看下新建节点的请求是如何被写入到zookeeper内存数据库的,这个过是一个很长的方法调用链我们一个一个分析
ZookeeperServer.processTxn
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn());
final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum();
// return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
//把事物作用到内存数据库
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
//下面把changeRecord从outstandingChanges中删除
while (!outstandingChanges.isEmpty()
&& outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
}
DataTree.processTxn
把事物作用到数据库最终会调用DataTree.processTxn
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
//下面有很多case判断,都是根据操作的不同做不同的操作,我们只分析create类型
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
//在内存数据库执行新建节点操作
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
null);
break;
case OpCode.create2:
CreateTxn create2Txn = (CreateTxn) txn;
rc.path = create2Txn.getPath();
Stat stat = new Stat();
createNode(
create2Txn.getPath(),
create2Txn.getData(),
create2Txn.getAcl(),
create2Txn.getEphemeral() ? header.getClientId() : 0,
create2Txn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
rc.stat = stat;
break;
...........
/*
* Things we can only update after the whole txn is applied to data
* tree.
*
* If we update the lastProcessedZxid with the first sub txn in multi
* and there is a snapshot in progress, it's possible that the zxid
* associated with the snapshot only include partial of the multi op.
*
* When loading snapshot, it will only load the txns after the zxid
* associated with snapshot file, which could cause data inconsistency
* due to missing sub txns.
*
* To avoid this, we only update the lastProcessedZxid when the whole
* multi-op txn is applied to DataTree.
*/
if (!isSubTxn) {
/*
* A snapshot might be in progress while we are modifying the data
* tree. If we set lastProcessedZxid prior to making corresponding
* change to the tree, then the zxid associated with the snapshot
* file will be ahead of its contents. Thus, while restoring from
* the snapshot, the restore method will not apply the transaction
* for zxid associated with the snapshot file, since the restore
* method assumes that transaction to be present in the snapshot.
*
* To avoid this, we first apply the transaction and then modify
* lastProcessedZxid. During restore, we correctly handle the
* case where the snapshot contains data ahead of the zxid associated
* with the file.
*/
//更新lastProcessedZxid
if (rc.zxid > lastProcessedZxid) {
lastProcessedZxid = rc.zxid;
}
if (digestFromLoadedSnapshot != null) {
compareSnapshotDigests(rc.zxid);
} else {
// only start recording digest when we're not in fuzzy state
logZxidDigest(rc.zxid, getTreeDigest());
}
}
return rc;
}
createNode
public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws KeeperException.NoNodeException, KeeperException.NodeExistsException {
int lastSlash = path.lastIndexOf('/');
//父节点的名称
String parentName = path.substring(0, lastSlash);
//子节点的名称
String childName = path.substring(lastSlash + 1);
//创建本节点的状态信息
StatPersisted stat = createStat(zxid, time, ephemeralOwner);
//在zookeeper内存数据库中获取父节点
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
// Add the ACL to ACL cache first, to avoid the ACL not being
// created race condition during fuzzy snapshot sync.
//
// This is the simplest fix, which may add ACL reference count
// again if it's already counted in in the ACL map of fuzzy
// snapshot, which might also happen for deleteNode txn, but
// at least it won't cause the ACL not exist issue.
//
// Later we can audit and delete all non-referenced ACLs from
// ACL map when loading the snapshot/txns from disk, like what
// we did for the global sessions.
Long longval = aclCache.convertAcls(acl);
Set<String> children = parent.getChildren();
//判断添加的节点是不是已经存在
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
//这里主要是修改服务端的摘要信息
nodes.preChange(parentName, parent);
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
}
// There is possibility that we'll replay txns for a node which
// was created and then deleted in the fuzzy range, and it's not
// exist in the snapshot, so replay the creation might revert the
// cversion and pzxid, need to check and only update when it's
// larger.
//设置父节点的cversion和Pzxid
if (parentCVersion > parent.stat.getCversion()) {
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
}
//创建子节点信息
DataNode child = new DataNode(data, longval, stat);
//把子节点加入到父节点中
parent.addChild(childName);
//把父节点加入到摘要信息的计算中
nodes.postChange(parentName, parent);
nodeDataSize.addAndGet(getNodeSize(path, child.data));
//把子节点信息加入到zookeeper内存数据库中,到此子节点信息加入了内存数据库,父节点在内存数据库中的状态信息也完成了更新
nodes.put(path, child);
EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
if (ephemeralType == EphemeralType.CONTAINER) {
//如果节点类型是Container,那么把它放进入containers中
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
//如果节点类型是ttl,那么把它放进入ttls中
ttls.add(path);
} else if (ephemeralOwner != 0) {
//如果是瞬时节点,那么把它放入瞬时接地对应的存储中
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
if (outputStat != null) {
child.copyStat(outputStat);
}
}
// now check if its one of the zookeeper node child
//下面是对节点配额的处理逻辑,就不分析了
if (parentName.startsWith(quotaZookeeper)) {
// now check if its the limit node
if (Quotas.limitNode.equals(childName)) {
// this is the limit node
// get the parent and add it to the trie
pTrie.addPath(parentName.substring(quotaZookeeper.length()));
}
if (Quotas.statNode.equals(childName)) {
updateQuotaForPath(parentName.substring(quotaZookeeper.length()));
}
}
// also check to update the quotas for this node
String lastPrefix = getMaxPrefixWithQuota(path);
long bytes = data == null ? 0 : data.length;
if (lastPrefix != null) {
// ok we have some match and need to update
updateCountBytes(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
//触发节点创建成功事件
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
//触发父节点的子节点变化事件
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
}
上就是节点创建在client和server端的处理逻辑
感谢耐心读完,下一遍文章会接着上面的代码继续分析watcher触发的逻辑
网友评论