Watcher 的基本流程
分为3个步骤:客户端注册watcher,服务端处理watcher和客户端回调watcher。
客户端注册的3种方式:getData、exists、getChildren
ZooKeeper API 的初始化过程
设置默认watcher
org.apache.zookeeper.ZooKeeper#ZooKeeper()
watchManager.defaultWatcher = watcher;
设置通信和事件的处理
SendThread:客户端和服务端的数据通信
EventThread:客户端回调watcher的通知
org.apache.zookeeper.ClientCnxn#ClientCnxn()
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
watcher注册
客户端发送请求
加入队列代码
Packet org.apache.zookeeper.ClientCnxn#queuePacket
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
向服务端发送消息
org.apache.zookeeper.ClientCnxnSocketNIO#doIO
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
服务端接收请求
服务端有一个 NIOServerCnxn 类,用来处理客户端发送过来的请求
org.apache.zookeeper.server.ZooKeeperServer#processPacket
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
org.apache.zookeeper.server.ZooKeeperServer#submitRequest
firstProcessor.processRequest(si);
请求链
PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
处理逻辑是队列中增加元素,异步来处理元素。
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
客户端接收响应请求
org.apache.zookeeper.ClientCnxnSocketNIO#doIO
接收服务端请求
org.apache.zookeeper.ClientCnxn.SendThread#readResponse
将响应信息变成Packet
org.apache.zookeeper.ClientCnxn#finishPacket
1 将watcher注册到map中
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
2 将Packet唤醒
事件触发
zookeeper.setData(“/mic”, “1”.getByte(),-1) ;
服务端的事件响应
org.apache.zookeeper.server.DataTree#setData
调用修改数据方法
org.apache.zookeeper.server.WatchManager#triggerWatch
调用对应的watcher
org.apache.zookeeper.server.NIOServerCnxn#process
向客户端发送notification响应
客户端处理事件响应
org.apache.zookeeper.ClientCnxn.SendThread#readResponse
解析响应包
org.apache.zookeeper.ClientCnxn.EventThread#queueEvent
向waitingEvents(等待队列中)增加WatcherSetEventPair元素
org.apache.zookeeper.ZooKeeper.ZKWatchManager#materialize
根据路径从Map<String, Set<Watcher>> dataWatches中获取注册的watcher,并去除。
org.apache.zookeeper.ClientCnxn.EventThread#run
从waitingEvents等待队列中取出元素,调用watcher的实现类
服务端接收数据请求
org.apache.zookeeper.server.NIOServerCnxnFactory#run
从客户端中收到请求
org.apache.zookeeper.server.NIOServerCnxn#doIO
处理io
org.apache.zookeeper.server.NIOServerCnxn#readPayload
读取请求负载
org.apache.zookeeper.server.NIOServerCnxn#readRequest
处理请求
org.apache.zookeeper.server.ZooKeeperServer#processPacket
处理请求包
网友评论