美文网首页
zookeeper架构源码 - 客户端设计

zookeeper架构源码 - 客户端设计

作者: sunpy | 来源:发表于2022-08-11 17:09 被阅读0次

    zookeeper例子


    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("地址",
                5000, null);
    
        byte[] dataArr = zk.getData("/sunpy/data", null, new Stat());
        System.out.println(new String(dataArr));
    
    }
    

    客户端入口

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider hostProvider, ZKClientConfig clientConfig) throws IOException {
        LOG.info("Initiating client connection, connectString={} sessionTimeout={} watcher={}", new Object[]{connectString, sessionTimeout, watcher});
        this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
        this.hostProvider = hostProvider;
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        // 创建SendThread、EventThread线程
        this.cnxn = this.createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this.clientConfig, watcher, this.getClientCnxnSocket(), canBeReadOnly);
        // 运行SendThread、EventThread线程
        this.cnxn.start();
    }
    

    hostProvider:客户端地址列表管理器。
    ClientCxn:客户端核心线程,内部包含SendThread和EventThread。
    SendTread:I/O线程,负责zookeeper客户端与服务器之间的网络IO通信。
    EventThread:事件线程,负责对服务端事件处理。

    客户端实现逻辑


    1. 启动客户端,初始化

    调用createConnection方法
    创建SendThread网络通信线程。
    创建EventThread事件处理线程。

    SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
        super(makeThreadName("-SendThread()"));
        // 同步改变zookeeper的state状态
        changeZkState(States.CONNECTING);
        // 使用传入的ClientCnxnSocket对象进行初始化
        this.clientCnxnSocket = clientCnxnSocket;
        // 将网络通信线程设置为守护线程
        setDaemon(true);
    }
    
    EventThread() {
        super(makeThreadName("-EventThread"));
        // 将事件处理线程设置为守护线程
        setDaemon(true);
    }
    
    2. 建立连接

    启动SendThread线程、EventThread线程。
    SendThread线程实现逻辑:

    // 进入SendThread线程类,执行线程实现的逻辑:
    @Override
    public void run() {
        // 传入参数:发送Packet的队列outgoingQueue
        clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
        // 判断当前zookeeper的状态,state不为CLOSED和AUTH_FAILED
        while (state.isAlive()) {
            try {
                // 校验ClientCnxnSocket(网络通信Sokcet)的状态
                if (!clientCnxnSocket.isConnected()) { 
                    // 关闭退出
                    // 获取服务器地址重新连接
                }
                // 校验状态已连接
                if (state.isConnected()) { 
                    // 使用sasl权限认证校验
                    if (zooKeeperSaslClient != null) {
                        if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                            // 认证初始化
                        }
                        KeeperState authState = zooKeeperSaslClient.getKeeperState();
                        
                        if (sendAuthEvent) {
                            // 认证成功或者失败,将Event事件加入到EventThread中的WaitingEvents队列中
                            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                            if (state == States.AUTH_FAILED) {
                                eventThread.queueEventOfDeath();
                            }
                        }
                    }
                    // 发送数据时间-接听到数据时间
                    to = readTimeout - clientCnxnSocket.getIdleRecv();
                } else { //状态未连接
                    // 创建连接时间-接听到数据时间
                    to = connectTimeout - clientCnxnSocket.getIdleRecv();
                }
    
                if (to <= 0) { 
                    // 接听到数据,读超时,抛出会话异常
                }
    
                if (state.isConnected()) {
                    // 写超时,发送ping心跳包测试
                }
                
                if (state == States.CONNECTEDREADONLY) {
                    // 只读模式操作
                }
    
                /*
                 * 传输数据
                 * 采用NIO方式,ClientCnxnSocketNIO实现,那么就使用Selector通过感兴趣的key,来获取响应的通道传输
                 * 采用Netty方式,ClientCnxnSocketNetty实现
                 */
                clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
            } catch (Throwable e) {
                
            }
        }
    
        // 清空outgoingQueue队列中的packet
        synchronized (state) { 
            // When it comes to this point, it guarantees that later queued
            // packet to outgoingQueue will be notified of death.
            cleanup();
        }
        
        // 关闭ClientCnxnSocket
        clientCnxnSocket.close();
        
        // 如果当前状态为未关闭且有权限,那么向事件处理对象线程中添加事件对象
        if (state.isAlive()) {
            // 将WatchEvent事件添加到waitingEvents队列中
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
        }
        
        // 否则向事件处理对象线程中添加事件对象
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
    }
    

    doTransport方法传入pendingQueue用来接收响应的数据。


    3. 网络IO操作

    zookeeper提供了NIO(ClientCnxnSocketNIO)和Netty(ClientCnxnSocketNetty)两种具体实现。

    // NIO实现ClientCnxnSocketNIO
    @Override
    void doTransport(
        int waitTimeOut,
        Queue<Packet> pendingQueue,
        ClientCnxn cnxn) throws IOException, InterruptedException {
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        // 获取Selector感兴趣的key
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        
        // 遍历所有key
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // 建立连接的key
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
            } 
            // 读写操作的key
            else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }
        //清除感兴趣的key
        selected.clear();
    }
    

    NIO的读写操作,隐藏在doIO方法

    void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        // 感兴趣键为可读
        if (sockKey.isReadable()) { 
            // 将服务器传过来的数据写入到缓冲区
            int rc = sock.read(incomingBuffer);
            // 缓冲区不为空
            if (!incomingBuffer.hasRemaining()) {
                // 缓冲区调整指针,开始写
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount.getAndIncrement();
                    readLength();
                } else if (!initialized) {
                    readConnectResult();
                    enableRead();
                    if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else { // 读取缓冲区数据
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        
        // 感兴趣键为可写
        if (sockKey.isWritable()) { 
            Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
    
            if (p != null) {
                updateLastSend();
    
                if (p.bb == null) {
                    if ((p.requestHeader != null)
                        && (p.requestHeader.getType() != OpCode.ping)
                        && (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    p.createBB();
                }
                sock.write(p.bb); // 获取outgoingQueue队列中Packet向服务器写
                if (!p.bb.hasRemaining()) {
                    sentCount.getAndIncrement();
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                        && p.requestHeader.getType() != OpCode.ping
                        && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) { // outgoingQueue队列空了,就不用写了,否则继续写
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                disableWrite();
            } else {
                enableWrite();
            }
        }
    }
    
    4. 事件处理线程操作

    EventThread线程执行逻辑:

    // 进入EventThread线程类,执行线程实现的逻辑:
    @Override
    @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
    public void run() {
        try {
            isRunning = true;
            while (true) {
                // 从waitingEvents队列中获取event事件(阻塞方式)
                Object event = waitingEvents.take();
                if (event == eventOfDeath) {
                    wasKilled = true;
                } else { // event事件未死就可以执行event
                    processEvent(event); // 执行watch监控事件
                }
                if (wasKilled) { // 事件死亡结束死循环
                    synchronized (waitingEvents) {
                        if (waitingEvents.isEmpty()) {
                            isRunning = false;
                            break;
                        }
                    }
                }
            }
        } catch (InterruptedException e) {}
    }
    

    服务器地址列表设计


    zookeeper实现的服务器默认地址列表:

    createDefaultHostProvider(connectString)
    
    // default hostprovider
    private static HostProvider createDefaultHostProvider(String connectString) {
        return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
    }
    
    private void init(Collection<InetSocketAddress> serverAddresses, long randomnessSeed, Resolver resolver) {
        this.sourceOfRandomness = new Random(randomnessSeed);
        this.resolver = resolver;
        if (serverAddresses.isEmpty()) {
            throw new IllegalArgumentException("A HostProvider may not be empty!");
        }
        this.serverAddresses = shuffle(serverAddresses);
        currentIndex = -1;
        lastIndex = -1;
    }
    

    可以看到都是通过InetSocketAddress存储地址。

    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    

    ConnectStringParser实现方式,就是对于集群地址,采用分割字符串方式,形成多个String,然后包装成InetSocketAddress对象,最后存放在serverAddresses中。

    参考


    《从Paxos到Zookeeper分布式一致性原理与实战》

    相关文章

      网友评论

          本文标题:zookeeper架构源码 - 客户端设计

          本文链接:https://www.haomeiwen.com/subject/diazwrtx.html