美文网首页
zookeeper的原理-watcher

zookeeper的原理-watcher

作者: 剑道_7ffc | 来源:发表于2020-05-22 07:40 被阅读0次

Watcher 的基本流程

分为3个步骤:客户端注册watcher,服务端处理watcher和客户端回调watcher。
客户端注册的3种方式:getData、exists、getChildren

ZooKeeper API 的初始化过程

设置默认watcher

org.apache.zookeeper.ZooKeeper#ZooKeeper()

watchManager.defaultWatcher = watcher;
设置通信和事件的处理

SendThread:客户端和服务端的数据通信
EventThread:客户端回调watcher的通知
org.apache.zookeeper.ClientCnxn#ClientCnxn()

        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

watcher注册

客户端发送请求

加入队列代码

Packet org.apache.zookeeper.ClientCnxn#queuePacket

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
        Record response, AsyncCallback cb, String clientPath,
        String serverPath, Object ctx, WatchRegistration watchRegistration)
{
    Packet packet = null;

    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);
        }
    }
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}
向服务端发送消息

org.apache.zookeeper.ClientCnxnSocketNIO#doIO

Packet p = findSendablePacket(outgoingQueue,
        cnxn.sendThread.clientTunneledAuthenticationInProgress());

if (p != null) {
    updateLastSend();
    // If we already started writing p, p.bb will already exist
    if (p.bb == null) {
        if ((p.requestHeader != null) &&
                (p.requestHeader.getType() != OpCode.ping) &&
                (p.requestHeader.getType() != OpCode.auth)) {
            p.requestHeader.setXid(cnxn.getXid());
        }
        p.createBB();
    }
    sock.write(p.bb);
    if (!p.bb.hasRemaining()) {
        sentCount++;
        outgoingQueue.removeFirstOccurrence(p);
        if (p.requestHeader != null
                && p.requestHeader.getType() != OpCode.ping
                && p.requestHeader.getType() != OpCode.auth) {
            synchronized (pendingQueue) {
                pendingQueue.add(p);
            }
        }
    }
}

服务端接收请求

服务端有一个 NIOServerCnxn 类,用来处理客户端发送过来的请求

org.apache.zookeeper.server.ZooKeeperServer#processPacket
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
  h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
org.apache.zookeeper.server.ZooKeeperServer#submitRequest
firstProcessor.processRequest(si);
请求链

PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
处理逻辑是队列中增加元素,异步来处理元素。

    ExistsRequest existsRequest = new ExistsRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
            existsRequest);
    String path = existsRequest.getPath();
    if (path.indexOf('\0') != -1) {
        throw new KeeperException.BadArgumentsException();
    }
    Stat stat = zks.getZKDatabase().statNode(path, existsRequest
            .getWatch() ? cnxn : null);
    rsp = new ExistsResponse(stat);

客户端接收响应请求

org.apache.zookeeper.ClientCnxnSocketNIO#doIO

接收服务端请求

org.apache.zookeeper.ClientCnxn.SendThread#readResponse

将响应信息变成Packet

org.apache.zookeeper.ClientCnxn#finishPacket

1 将watcher注册到map中
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
2 将Packet唤醒

事件触发

zookeeper.setData(“/mic”, “1”.getByte(),-1) ;

服务端的事件响应

org.apache.zookeeper.server.DataTree#setData

调用修改数据方法

org.apache.zookeeper.server.WatchManager#triggerWatch

调用对应的watcher

org.apache.zookeeper.server.NIOServerCnxn#process

向客户端发送notification响应

客户端处理事件响应

org.apache.zookeeper.ClientCnxn.SendThread#readResponse

解析响应包

org.apache.zookeeper.ClientCnxn.EventThread#queueEvent

向waitingEvents(等待队列中)增加WatcherSetEventPair元素

org.apache.zookeeper.ZooKeeper.ZKWatchManager#materialize

根据路径从Map<String, Set<Watcher>> dataWatches中获取注册的watcher,并去除。

org.apache.zookeeper.ClientCnxn.EventThread#run

从waitingEvents等待队列中取出元素,调用watcher的实现类

服务端接收数据请求

org.apache.zookeeper.server.NIOServerCnxnFactory#run

从客户端中收到请求

org.apache.zookeeper.server.NIOServerCnxn#doIO

处理io

org.apache.zookeeper.server.NIOServerCnxn#readPayload

读取请求负载

org.apache.zookeeper.server.NIOServerCnxn#readRequest

处理请求

org.apache.zookeeper.server.ZooKeeperServer#processPacket

处理请求包

处理图

image.png

集群模式下的处理流程

image.png

相关文章

网友评论

      本文标题:zookeeper的原理-watcher

      本文链接:https://www.haomeiwen.com/subject/crowohtx.html