美文网首页
Zookeeper(四)-客户端-启动流程

Zookeeper(四)-客户端-启动流程

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-12-22 16:24 被阅读0次

概述

本节重点分析下客户端启动流程

启动流程

客户端启动流程.png

1. 构造ZooKeeper,如下代码为例:

public static void main(String[] args) {
    ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 3000000, new DemoTest());
}

@Override
public void process(WatchedEvent event) {
    System.out.println("接收到watch通知:" + event);
}

2.构造hostProvider

// 解析服务端地址
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
  • 解析connectString每个IP分别生成一个IPV4 InetSocketAddress一个IPV6 InetSocketAddress

3.创建ClientCnxn

// 创建ClientCnxn   重点看 getClientCnxnSocket(), connectStringParser.getChrootPath():跟路径
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
        hostProvider, sessionTimeout, this, watchManager,
        getClientCnxnSocket(), canBeReadOnly);
  • connectStringParser.getChrootPath解析跟路径,例如localhost:2181/test1,根据路径为/test1,后续改客户端所有操作都基于该根路径;
  • getClientCnxnSocket反射生成ClientCnxnSocket实例,默认为ClientCnxnSocketNIO(下面也是以ClientCnxnSocketNIO为例进行分析),也可以通过zookeeper.clientCnxnSocket进行配置;
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
        ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    // 默认0
    this.sessionId = sessionId;
    // 默认 new byte[16]
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    // 客户端根路径
    this.chrootPath = chrootPath;
    // 连接超时时间
    connectTimeout = sessionTimeout / hostProvider.size();
    // 响应超时时间
    readTimeout = sessionTimeout * 2 / 3;
    // 是否只读客户端,默认false
    readOnly = canBeReadOnly;
    // 创建请求处理线程
    sendThread = new SendThread(clientCnxnSocket);
    // 创建事件处理线程
    eventThread = new EventThread();
}

4.构造EventThread / SendThread

SendThread(ClientCnxnSocket clientCnxnSocket) {
    // 线程名后缀 -SendThread
    super(makeThreadName("-SendThread()"));
    // 连接中
    state = States.CONNECTING;
    this.clientCnxnSocket = clientCnxnSocket;
    setDaemon(true);
}

EventThread() {
    super(makeThreadName("-EventThread"));
    setDaemon(true);
}

5.启动启动EventThread线程,执行run方法

public void run() {
   try {
      isRunning = true;
      while (true) {
          // 从LinkedBlockingQueue中获取事件,为空时阻塞
         Object event = waitingEvents.take();
         // 连接关闭时默认放入 eventOfDeath
         if (event == eventOfDeath) {
            wasKilled = true;
         } else {
             // 处理事件
            processEvent(event);
         }
         if (wasKilled)
             // 连接关闭,并且等待事件队列为空时break
            synchronized (waitingEvents) {
               if (waitingEvents.isEmpty()) {
                  isRunning = false;
                  break;
               }
            }
      }
   } catch (InterruptedException e) {
      LOG.error("Event thread exiting due to interruption", e);
   }
    LOG.info("EventThread shut down for session: 0x{}",Long.toHexString(getSessionId()));
}
  • while (true)死循环;
  • waitingEvents.take()阻塞队列LinkedBlockingQueue waitingEvents为空阻塞;

6.启动SendThread线程,循环第一次执行run方法
忽略无关代码

public void run() {
    InetSocketAddress serverAddress = null;
    while (state.isAlive()) {
        try {
            // 未连接,SelectionKey为空
            if (!clientCnxnSocket.isConnected()) {
                // 建立连接
                startConnect(serverAddress);
            }
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        } catch (Throwable e) {

        }
    }
}
  • state.isAlive()状态不是CLOSED或AUTH_FAILED一直循环;

7.建立连接

void connect(InetSocketAddress addr) throws IOException {
    // 创建SocketChannel
    SocketChannel sock = createSock();
    try {
        // 注册事件,创建连接
       registerAndConnect(sock, addr);
    } catch (IOException e) {
        LOG.error("Unable to open socket to " + addr);
        sock.close();
        throw e;
    }
    initialized = false;
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}

SocketChannel createSock() throws IOException {
    SocketChannel sock;
    // 打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址)
    sock = SocketChannel.open();
    // 设置非阻塞模式
    sock.configureBlocking(false);
    sock.socket().setSoLinger(false, -1);
    // 关闭Nagle算法
    sock.socket().setTcpNoDelay(true);
    return sock;
}

private final Selector selector = Selector.open();
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
throws IOException {
    // 注册OP_CONNECT事件到多路复用器中
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    // 异步连接服务端
    boolean immediateConnect = sock.connect(addr);// return false; 非阻塞同步返回false
    if (immediateConnect) {
        sendThread.primeConnection();
    }
}
  • SocketChannel.open()打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址);
  • sock.configureBlocking(false)设置非阻塞模式;
  • sock.socket().setTcpNoDelay(true)关闭Nagle算法,降低小包的延迟;
  • Selector.open()创建多路复用器;
  • sock.register(selector, SelectionKey.OP_CONNECT)注册OP_CONNECT事件到多路复用器中;
  • sock.connect(addr)非阻塞模式,服务端直接返回,异步创建连接;

8.doTransport多路复用器事件处理

void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn) throws IOException, InterruptedException {
    // select没有事件产生的话阻塞waitTimeOut
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        // 从多路复用器获取事件集合
        selected = selector.selectedKeys();
    }
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        // OP_CONNECT事件
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                // 处理创建连接请求
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, outgoingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        synchronized(outgoingQueue) {
            if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                enableWrite();
            }
        }
    }
    selected.clear();
}
  • selector.select(waitTimeOut)select没有事件产生的话阻塞waitTimeOut;
  • selector.selectedKeys()从多路复用器获取事件集合;
  • sendThread.primeConnection()如果是OP_CONNECT事件,交给sendThread处理连接请求;

9.primeConnection处理连接请求

void primeConnection() throws IOException {
    // 构造ConnectRequest
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,
            sessionTimeout, sessId, sessionPasswd);
    // 构造ConnectRequest的Packet包入队outgoingQueue
    outgoingQueue.addFirst(new Packet(null, null, conReq,
                null, null, readOnly));
}
  • new ConnectRequest构造ConnectRequest
  • new Packet构造ConnectRequest的Packet包,注意RequestHeader为null,后面会通过RequestHeader是否为空判断是否是ConnectRequest包;
  • outgoingQueue.addFirstPacket入队outgoingQueue;SendThread再次循环调用到doTransport时处理这个packet;

10.循环第二次执行SendThread.run
此时多路复用器获取到OP_WRITE事件,执行doIO中sockKey.isWritable()逻辑,向服务端写连接请求包;

if (sockKey.isWritable()) {
    synchronized(outgoingQueue) {
        // 获取需要发送的 packet
        Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());

        if (p != null) {
            // lastSend = now;
            updateLastSend();
            // If we already started writing p, p.bb will already exist
            if (p.bb == null) {
                if ((p.requestHeader != null) &&
                        (p.requestHeader.getType() != OpCode.ping) &&
                        (p.requestHeader.getType() != OpCode.auth)) {
                    p.requestHeader.setXid(cnxn.getXid());
                }
                // 生成ByteBuffer
                p.createBB();
            }
            // write bytebuffer
            sock.write(p.bb);
            // 数据包已经写完
            if (!p.bb.hasRemaining()) {
                // 已发送数据包计数+1
                sentCount++;
                // outgoingQueue中移除该packet
                outgoingQueue.removeFirstOccurrence(p);
                if (p.requestHeader != null
                        && p.requestHeader.getType() != OpCode.ping
                        && p.requestHeader.getType() != OpCode.auth) {
                    synchronized (pendingQueue) {
                        // packet放到已发送等待响应的队列 pedingQueue
                        pendingQueue.add(p);
                    }
                }
            }
        }
        if (outgoingQueue.isEmpty()) {
            // outgoingQueue为空,表示没有数据包要发送:关闭写兴趣标记;后续在doIO或doTranspost等地方调用enableWrite打开;
            disableWrite();
        } else if (!initialized && p != null && !p.bb.hasRemaining()) {
            disableWrite();
        } else {
            // 注册OP_WRITE事件
            enableWrite();
        }
    }
}
  • synchronized(outgoingQueue)outgoingQueue/pendingQueue是非线程安全队列,因此直接把整个逻辑块进行同步操作;
  • findSendablePacket从outgoingQueue获取Packet;
  • createBB序列化ConnectRequest,并写入ByteBuffer;
  • sock.write(p.bb)把ByteBuffer写到服务端Socket接收缓冲区;
  • outgoingQueue.removeFirstOccurrence(p)/pendingQueue.add(p)数据包写完后!p.bb.hasRemaining(),改Packet从待发送队列中移除,加入到已发送待响应队列;
  • disableWrite/enableWrite写出后,如果已经没有再需要写出的数据包,则停止注册OP_WRITE事件,后续等再有数据需要写时再通过enableWrite注册OP_WRITE事件;

11.循环第三次执行SendThread.run
此时服务端处理完ConnectRequest,多路复用器获取到OP_READ事件,执行doIO中sockKey.isReadable()逻辑,读取服务端连接响应包;

if (sockKey.isReadable()) {
    // 读取服务端响应包
    int rc = sock.read(incomingBuffer);
    if (rc < 0) {
        throw new EndOfStreamException(
                "Unable to read additional data from server sessionid 0x"
                        + Long.toHexString(sessionId) + ", likely server has closed socket");
    }
    // incomingBuffer默认4字节,先读取完4字节响应头
    if (!incomingBuffer.hasRemaining()) {
        incomingBuffer.flip();
        // incomingBuffer == lenBuffer表示第一次读取,即读取RPC包的前4字节,即包长度
        if (incomingBuffer == lenBuffer) {
            recvCount++;
            // incomingBuffer分配为响应体长度ByteBuffer
            readLength();
        } else if (!initialized) {
            // 读取连接响应包
            readConnectResult();
            // 注册OP_READ事件
            enableRead();
            if (findSendablePacket(outgoingQueue,
                    cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                enableWrite();
            }
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
            updateLastHeard();
            // initialized连接后为false,执行一次之后置为true
            initialized = true;
        } else {
            sendThread.readResponse(incomingBuffer);
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
            updateLastHeard();
        }
    }
}
  • sock.read(incomingBuffer)读取ByteBuffer,incomingBuffer默认4字节,先读取完4字节响应头;
  • readLength()incomingBuffer重新分配为响应体长度ByteBuffer;
  • readConnectResult()读取连接响应体,下一步详细分析;
  • enableRead()注册OP_READ事件;
  • initialized = true默认以及每次连接后置为false,执行一次之后即建立连接之后置为true,以后都执行sendThread.readResponse(incomingBuffer)逻辑;

12.readConnectResult读取连接响应体

void readConnectResult() throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ConnectResponse conRsp = new ConnectResponse();
    // 反序列化ConnectResponse
    conRsp.deserialize(bbia, "connect");

    // read "is read-only" flag
    boolean isRO = false;
    try {
        isRO = bbia.readBool("readOnly");
    } catch (IOException e) {
        LOG.warn("Connected to an old server; r-o mode will be unavailable");
    }
    this.sessionId = conRsp.getSessionId();
    // 建立连接后处理,主要处理回调
    sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
  • deserialize响应体报文反序列化为ConnectResponse;
  • sendThread.onConnected建立连接后处理,主要处理回调;

13.queueEvent构造WatcherSetEventPair入队waitingEvents

public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();
    // materialize the watchers based on the event
    WatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(), event.getType(), event.getPath()), event);
    // queue the pair (watch set & event) for later processing
    waitingEvents.add(pair);
}
  • new WatcherSetEventPair构造WatcherSetEventPair(watcher.materialize下一步分析);
  • waitingEvents.add(pair)入队waitingEvents(LinkedBlockingQueue阻塞队列线程安全不需要同步操作);

14.materialize构造Watcher集合

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                Watcher.Event.EventType type,
                                String clientPath)
{
    Set<Watcher> result = new HashSet<Watcher>();
    switch (type) {
    case None:
        result.add(defaultWatcher);
        boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                state != Watcher.Event.KeeperState.SyncConnected;

        synchronized(dataWatches) {
            for(Set<Watcher> ws: dataWatches.values()) {
                result.addAll(ws);
            }
            if (clear) {
                dataWatches.clear();
            }
        }

        synchronized(existWatches) {
            for(Set<Watcher> ws: existWatches.values()) {
                result.addAll(ws);
            }
            if (clear) {
                existWatches.clear();
            }
        }

        synchronized(childWatches) {
            for(Set<Watcher> ws: childWatches.values()) {
                result.addAll(ws);
            }
            if (clear) {
                childWatches.clear();
            }
        }
        return result;
    ......
}
  • result.add(defaultWatcher)构造ZooKeeper时watcher(ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 3000000, new DemoTest());)
  • dataWatches/existWatches/childWatches此时启动时都为空;

14.EventThread.run阻塞结束,处理WatcherSetEventPair事件

private void processEvent(Object event) {
  ......
  if (event instanceof WatcherSetEventPair) {
      // each watcher will process the event
      WatcherSetEventPair pair = (WatcherSetEventPair) event;
      for (Watcher watcher : pair.watchers) {
          try {
              // 回调watcher.process方法
              watcher.process(pair.event);
          } catch (Throwable t) {
              LOG.error("Error while calling watcher ", t);
          }
      }
  }
  ......
}
  • pair.watchers遍历处理watcher回调;
  • watcher.process(pair.event)回调watcher.process方法,即示例代码中process方法;
    -----over-----

相关文章

网友评论

      本文标题:Zookeeper(四)-客户端-启动流程

      本文链接:https://www.haomeiwen.com/subject/baoigktx.html