美文网首页
zookeeper架构源码 - 客户端设计

zookeeper架构源码 - 客户端设计

作者: sunpy | 来源:发表于2022-08-11 17:09 被阅读0次

zookeeper例子


public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ZooKeeper("地址",
            5000, null);

    byte[] dataArr = zk.getData("/sunpy/data", null, new Stat());
    System.out.println(new String(dataArr));

}

客户端入口

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider hostProvider, ZKClientConfig clientConfig) throws IOException {
    LOG.info("Initiating client connection, connectString={} sessionTimeout={} watcher={}", new Object[]{connectString, sessionTimeout, watcher});
    this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
    this.hostProvider = hostProvider;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    // 创建SendThread、EventThread线程
    this.cnxn = this.createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this.clientConfig, watcher, this.getClientCnxnSocket(), canBeReadOnly);
    // 运行SendThread、EventThread线程
    this.cnxn.start();
}

hostProvider:客户端地址列表管理器。
ClientCxn:客户端核心线程,内部包含SendThread和EventThread。
SendTread:I/O线程,负责zookeeper客户端与服务器之间的网络IO通信。
EventThread:事件线程,负责对服务端事件处理。

客户端实现逻辑


1. 启动客户端,初始化

调用createConnection方法
创建SendThread网络通信线程。
创建EventThread事件处理线程。

SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
    super(makeThreadName("-SendThread()"));
    // 同步改变zookeeper的state状态
    changeZkState(States.CONNECTING);
    // 使用传入的ClientCnxnSocket对象进行初始化
    this.clientCnxnSocket = clientCnxnSocket;
    // 将网络通信线程设置为守护线程
    setDaemon(true);
}

EventThread() {
    super(makeThreadName("-EventThread"));
    // 将事件处理线程设置为守护线程
    setDaemon(true);
}
2. 建立连接

启动SendThread线程、EventThread线程。
SendThread线程实现逻辑:

// 进入SendThread线程类,执行线程实现的逻辑:
@Override
public void run() {
    // 传入参数:发送Packet的队列outgoingQueue
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    // 判断当前zookeeper的状态,state不为CLOSED和AUTH_FAILED
    while (state.isAlive()) {
        try {
            // 校验ClientCnxnSocket(网络通信Sokcet)的状态
            if (!clientCnxnSocket.isConnected()) { 
                // 关闭退出
                // 获取服务器地址重新连接
            }
            // 校验状态已连接
            if (state.isConnected()) { 
                // 使用sasl权限认证校验
                if (zooKeeperSaslClient != null) {
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        // 认证初始化
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    
                    if (sendAuthEvent) {
                        // 认证成功或者失败,将Event事件加入到EventThread中的WaitingEvents队列中
                        eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                        if (state == States.AUTH_FAILED) {
                            eventThread.queueEventOfDeath();
                        }
                    }
                }
                // 发送数据时间-接听到数据时间
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else { //状态未连接
                // 创建连接时间-接听到数据时间
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }

            if (to <= 0) { 
                // 接听到数据,读超时,抛出会话异常
            }

            if (state.isConnected()) {
                // 写超时,发送ping心跳包测试
            }
            
            if (state == States.CONNECTEDREADONLY) {
                // 只读模式操作
            }

            /*
             * 传输数据
             * 采用NIO方式,ClientCnxnSocketNIO实现,那么就使用Selector通过感兴趣的key,来获取响应的通道传输
             * 采用Netty方式,ClientCnxnSocketNetty实现
             */
            clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            
        }
    }

    // 清空outgoingQueue队列中的packet
    synchronized (state) { 
        // When it comes to this point, it guarantees that later queued
        // packet to outgoingQueue will be notified of death.
        cleanup();
    }
    
    // 关闭ClientCnxnSocket
    clientCnxnSocket.close();
    
    // 如果当前状态为未关闭且有权限,那么向事件处理对象线程中添加事件对象
    if (state.isAlive()) {
        // 将WatchEvent事件添加到waitingEvents队列中
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
    }
    
    // 否则向事件处理对象线程中添加事件对象
    eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
}

doTransport方法传入pendingQueue用来接收响应的数据。


3. 网络IO操作

zookeeper提供了NIO(ClientCnxnSocketNIO)和Netty(ClientCnxnSocketNetty)两种具体实现。

// NIO实现ClientCnxnSocketNIO
@Override
void doTransport(
    int waitTimeOut,
    Queue<Packet> pendingQueue,
    ClientCnxn cnxn) throws IOException, InterruptedException {
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    // 获取Selector感兴趣的key
    synchronized (this) {
        selected = selector.selectedKeys();
    }
    
    // 遍历所有key
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        // 建立连接的key
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                updateSocketAddresses();
                sendThread.primeConnection();
            }
        } 
        // 读写操作的key
        else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
            enableWrite();
        }
    }
    //清除感兴趣的key
    selected.clear();
}

NIO的读写操作,隐藏在doIO方法

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    // 感兴趣键为可读
    if (sockKey.isReadable()) { 
        // 将服务器传过来的数据写入到缓冲区
        int rc = sock.read(incomingBuffer);
        // 缓冲区不为空
        if (!incomingBuffer.hasRemaining()) {
            // 缓冲区调整指针,开始写
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount.getAndIncrement();
                readLength();
            } else if (!initialized) {
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else { // 读取缓冲区数据
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    
    // 感兴趣键为可写
    if (sockKey.isWritable()) { 
        Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());

        if (p != null) {
            updateLastSend();

            if (p.bb == null) {
                if ((p.requestHeader != null)
                    && (p.requestHeader.getType() != OpCode.ping)
                    && (p.requestHeader.getType() != OpCode.auth)) {
                    p.requestHeader.setXid(cnxn.getXid());
                }
                p.createBB();
            }
            sock.write(p.bb); // 获取outgoingQueue队列中Packet向服务器写
            if (!p.bb.hasRemaining()) {
                sentCount.getAndIncrement();
                outgoingQueue.removeFirstOccurrence(p);
                if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
                    synchronized (pendingQueue) {
                        pendingQueue.add(p);
                    }
                }
            }
        }
        if (outgoingQueue.isEmpty()) { // outgoingQueue队列空了,就不用写了,否则继续写
            disableWrite();
        } else if (!initialized && p != null && !p.bb.hasRemaining()) {
            disableWrite();
        } else {
            enableWrite();
        }
    }
}
4. 事件处理线程操作

EventThread线程执行逻辑:

// 进入EventThread线程类,执行线程实现的逻辑:
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
    try {
        isRunning = true;
        while (true) {
            // 从waitingEvents队列中获取event事件(阻塞方式)
            Object event = waitingEvents.take();
            if (event == eventOfDeath) {
                wasKilled = true;
            } else { // event事件未死就可以执行event
                processEvent(event); // 执行watch监控事件
            }
            if (wasKilled) { // 事件死亡结束死循环
                synchronized (waitingEvents) {
                    if (waitingEvents.isEmpty()) {
                        isRunning = false;
                        break;
                    }
                }
            }
        }
    } catch (InterruptedException e) {}
}

服务器地址列表设计


zookeeper实现的服务器默认地址列表:

createDefaultHostProvider(connectString)

// default hostprovider
private static HostProvider createDefaultHostProvider(String connectString) {
    return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
}

private void init(Collection<InetSocketAddress> serverAddresses, long randomnessSeed, Resolver resolver) {
    this.sourceOfRandomness = new Random(randomnessSeed);
    this.resolver = resolver;
    if (serverAddresses.isEmpty()) {
        throw new IllegalArgumentException("A HostProvider may not be empty!");
    }
    this.serverAddresses = shuffle(serverAddresses);
    currentIndex = -1;
    lastIndex = -1;
}

可以看到都是通过InetSocketAddress存储地址。

ConnectStringParser connectStringParser = new ConnectStringParser(connectString);

ConnectStringParser实现方式,就是对于集群地址,采用分割字符串方式,形成多个String,然后包装成InetSocketAddress对象,最后存放在serverAddresses中。

参考


《从Paxos到Zookeeper分布式一致性原理与实战》

相关文章

网友评论

      本文标题:zookeeper架构源码 - 客户端设计

      本文链接:https://www.haomeiwen.com/subject/diazwrtx.html