美文网首页技术干货
zk源码阅读29:Server选举Leader时的网络IO:Qu

zk源码阅读29:Server选举Leader时的网络IO:Qu

作者: 赤子心_d709 | 来源:发表于2017-08-03 18:34 被阅读322次

    摘要

    每台服务器在启动的过程中,会启动一个QuorumPeerManager,负责各台服务器之间的底层Leader选举过程中的网络通信。
    这个类就是QuorumCnxManager
    在看本节源码之前,建议先把refer中的内容看完,也不多,对于基本概念有一个理解

    本节主要讲解

    内部类
      SendWorker类作为网络IO的发送者,从发送队列取出,发给对应sid的机器
      Message类定义了消息结构,包含sid以及消息体ByteBuffer
      RecvWorker类作为网络IO的接受者
      Listener类作为electionPort端口的监听器,等待其他机器的连接
    属性
      recvQueue作为接受队列
      queueSendMap表示每个sid对应的发送的发送队列
    函数
      连接相关
      sender,recv生产消费相关
      其他
    思考以及总结
    

    内部类

    QuorumCnxManager的内部类

    可以看到有四个内部类
    SendWorker类,Message类,RecvWorker类,Listener类

    SendWorker

    这个类作为“发送者”,继承ZooKeeperThread,线程不断地从发送队列取出,发给对应sid的机器
    属性

            Long sid;//目标机器sid,不是当前机器sid
            Socket sock;
            RecvWorker recvWorker;//该sid对应的RecvWorker
            volatile boolean running = true;
            DataOutputStream dout;
    

    主要方法

    构造函数
    
            SendWorker(Socket sock, Long sid) {
                super("SendWorker:" + sid);
                this.sid = sid;
                this.sock = sock;
                recvWorker = null;
                try {
                    dout = new DataOutputStream(sock.getOutputStream());
                } catch (IOException e) {
                    LOG.error("Unable to access socket output stream", e);
                    closeSocket(sock);
                    running = false;
                }
                LOG.debug("Address of remote peer: " + this.sid);
            }
    
    run方法
    
            @Override
            public void run() {
                threadCnt.incrementAndGet();//线程数+1
                try {
                    /**
                     * If there is nothing in the queue to send, then we
                     * send the lastMessage to ensure that the last message
                     * was received by the peer. The message could be dropped
                     * in case self or the peer shutdown their connection
                     * (and exit the thread) prior to reading/processing
                     * the last message. Duplicate messages are handled correctly
                     * by the peer.
                     *
                     * If the send queue is non-empty, then we have a recent
                     * message than that stored in lastMessage. To avoid sending
                     * stale message, we should send the message in the send queue.
                     */
                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);//找到sid对应需要send的队列
                    if (bq == null || isSendQueueEmpty(bq)) {
                       ByteBuffer b = lastMessageSent.get(sid);//如果没有什么发的,就把上一次发的再发一遍(重发能够正确处理)
                       if (b != null) {
                           LOG.debug("Attempting to send lastMessage to sid=" + sid);
                           send(b);//发送
                       }
                    }
                } catch (IOException e) {
                    LOG.error("Failed to send last message. Shutting down thread.", e);
                    this.finish();
                }
                
                try {
                    while (running && !shutdown && sock != null) {
    
                        ByteBuffer b = null;
                        try {
                            ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                                    .get(sid);
                            if (bq != null) {
                                b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);//从发送队列里面取出消息
                            } else {//队列没有记录在map中
                                LOG.error("No queue of incoming messages for " +
                                          "server " + sid);
                                break;
                            }
    
                            if(b != null){
                                lastMessageSent.put(sid, b);//更新最后一次发送的
                                send(b);//发送
                            }
                        } catch (InterruptedException e) {
                            LOG.warn("Interrupted while waiting for message on queue",
                                    e);
                        }
                    }
                } catch (Exception e) {
                    LOG.warn("Exception when using channel: for id " + sid + " my id = " + 
                            self.getId() + " error = " + e);
                }
                this.finish();
                LOG.warn("Send worker leaving thread");
            }
        }
    
    
    send方法
    
            synchronized void send(ByteBuffer b) throws IOException {
                byte[] msgBytes = new byte[b.capacity()];
                try {
                    b.position(0);
                    b.get(msgBytes);
                } catch (BufferUnderflowException be) {
                    LOG.error("BufferUnderflowException ", be);
                    return;
                }
                dout.writeInt(b.capacity());
                dout.write(b.array());
                dout.flush();
            }
    

    里面涉及的字段在下面会讲,这里注意
    在SendWorker中,一旦Zookeeper发现针对当前服务器的消息发送队列为空,那么此时需要从lastMessageSent中取出一个最近发送过的消息来进行再次发送,这是为了解决接收方在消息接收前或者接收到消息后服务器挂了,导致消息尚未被正确处理。同时,Zookeeper能够保证接收方在处理消息时,会对重复消息进行正确的处理。

    Message

    这个类定义了server之间传输的消息结构,源码如下

        static public class Message {
            
            Message(ByteBuffer buffer, long sid) {
                this.buffer = buffer;
                this.sid = sid;
            }
    
            ByteBuffer buffer;
            long sid;
        }
    

    sid为消息来源方的sid,buffer即消息体

    RecvWorker

    这个类作为“接受者”,类似SendWorker,继承ZooKeeperThread,线程不断地从网络IO中读取数据,放入接收队列
    属性

            Long sid;//来源方sid
            Socket sock;
            volatile boolean running = true;
            DataInputStream din;//input
            final SendWorker sw;
    

    主要方法

    构造方法
    
            RecvWorker(Socket sock, Long sid, SendWorker sw) {
                super("RecvWorker:" + sid);
                this.sid = sid;
                this.sock = sock;
                this.sw = sw;
                try {
                    din = new DataInputStream(sock.getInputStream());
                    // OK to wait until socket disconnects while reading.
                    sock.setSoTimeout(0);
                } catch (IOException e) {
                    LOG.error("Error while accessing socket for " + sid, e);
                    closeSocket(sock);
                    running = false;
                }
            }
    
    run方法
            @Override
            public void run() {
                threadCnt.incrementAndGet();
                try {
                    while (running && !shutdown && sock != null) {
                        /**
                         * Reads the first int to determine the length of the
                         * message
                         */
                        int length = din.readInt();//获取长度
                        if (length <= 0 || length > PACKETMAXSIZE) {
                            throw new IOException(
                                    "Received packet with invalid packet: "
                                            + length);
                        }
                        /**
                         * Allocates a new ByteBuffer to receive the message
                         */
                        byte[] msgArray = new byte[length];
                        din.readFully(msgArray, 0, length);
                        ByteBuffer message = ByteBuffer.wrap(msgArray);//解析出byteBuffer
                        addToRecvQueue(new Message(message.duplicate(), sid));//加入接收队列
                    }
                } catch (Exception e) {
                    LOG.warn("Connection broken for id " + sid + ", my id = " + 
                            self.getId() + ", error = " , e);
                } finally {
                    LOG.warn("Interrupting SendWorker");
                    sw.finish();
                    if (sock != null) {
                        closeSocket(sock);
                    }
                }
            }
        }
    

    Listener

    这个类也继承ZooKeeperThread,主要监听electionPort,不断的接收外部连接
    run方法核心代码如下

                         while (!shutdown) {
                            Socket client = ss.accept();
                            setSockOpts(client);
                            LOG.info("Received connection request "
                                    + client.getRemoteSocketAddress());
                            receiveConnection(client);//不断接受连接
                            numRetries = 0;
                        }
    

    内部类小结

    SendWorker和RecvWorker互相依赖对方,原因在下面思考中列出
    RecvWorker相比SendWorker代码要好理解

    两者都有属性sid,表示每个机器和其他机器连接时,按sid区分不同的RecvWorker和SendWorker
    好比sid1和其余(n-1)个server建立连接,那么就按sid分开,有(n-1)个RecvWorker和SendWorker

    Message作为消息的封装,包含sid和ByteBuffer作为消息体

    Listener主要监听本机配置的electionPort,不断的接收外部连接

    属性

    将重要属性字段整理如下

    属性 默认值 备注
    RECV_CAPACITY 100 接收队列的长度
    SEND_CAPACITY 1 发送队列的长度,原因在"思考"中提到
    ConcurrentHashMap<Long, SendWorker> senderWorkerMap; sid对应的SendWorker
    ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; 消息发送队列,key为各机器sid
    ConcurrentHashMap<Long, ByteBuffer> lastMessageSent; 上一次发送给sid机器的内容
    ArrayBlockingQueue<Message> recvQueue 接收队列

    比较好理解

    函数

    请求连接,接收连接相关函数

    请求连接相关

    connectOne:连接上一个sid的服务器

        synchronized void connectOne(long sid){//连接上某个sid的server
            if (senderWorkerMap.get(sid) == null){//如果没有记录在sender的map里面
                InetSocketAddress electionAddr;
                if (self.quorumPeers.containsKey(sid)) {
                    electionAddr = self.quorumPeers.get(sid).electionAddr;//从配置文件获取对应sid机器的选举端口
                } else {
                    LOG.warn("Invalid server id: " + sid);
                    return;
                }
                try {
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Opening channel to server " + sid);
                    }
                    Socket sock = new Socket();
                    setSockOpts(sock);
                    sock.connect(self.getView().get(sid).electionAddr, cnxTO);//连接上对应socket
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Connected to server " + sid);
                    }
                    initiateConnection(sock, sid);//初始化连接
                } catch (UnresolvedAddressException e) {
                    // Sun doesn't include the address that causes this
                    // exception to be thrown, also UAE cannot be wrapped cleanly
                    // so we log the exception in order to capture this critical
                    // detail.
                    LOG.warn("Cannot open channel to " + sid
                            + " at election address " + electionAddr, e);
                    // Resolve hostname for this server in case the
                    // underlying ip address has changed.
                    if (self.getView().containsKey(sid)) {
                        self.getView().get(sid).recreateSocketAddresses();
                    }
                    throw e;
                } catch (IOException e) {
                    LOG.warn("Cannot open channel to " + sid
                            + " at election address " + electionAddr,
                            e);
                    // We can't really tell if the server is actually down or it failed
                    // to connect to the server because the underlying IP address
                    // changed. Resolve the hostname again just in case.
                    if (self.getView().containsKey(sid)) {
                        self.getView().get(sid).recreateSocketAddresses();
                    }
                }
            } else {
                LOG.debug("There is a connection already for server " + sid);
            }
        }
    

    如果senderWorkerMap没有sid对应记录,代表目前没有连接,那么就去连,主要核心代码是调用
    initiateConnection函数

    initiateConnection:初始化连接

        /**
         * If this server has initiated the connection, then it gives up on the
         * connection if it loses challenge. Otherwise, it keeps the connection.
         */
        public boolean initiateConnection(Socket sock, Long sid) {//初始化连接
            DataOutputStream dout = null;
            try {
                // Sending id and challenge
                dout = new DataOutputStream(sock.getOutputStream());
                dout.writeLong(self.getId());//发送本机sid
                dout.flush();
            } catch (IOException e) {
                LOG.warn("Ignoring exception reading or writing challenge: ", e);
                closeSocket(sock);
                return false;
            }
            
            // If lost the challenge, then drop the new connection
            if (sid > self.getId()) {//发送连接的时候,只让大sid给小sid发送,如果当前sid小,那就close掉
                LOG.info("Have smaller server identifier, so dropping the " +
                         "connection: (" + sid + ", " + self.getId() + ")");
                closeSocket(sock);
                // Otherwise proceed with the connection
            } else {//自己sid大,初始化SendWorker和RecvWorker
                SendWorker sw = new SendWorker(sock, sid);
                RecvWorker rw = new RecvWorker(sock, sid, sw);//rw记录sw
                sw.setRecv(rw);//sw记录rw
    
                SendWorker vsw = senderWorkerMap.get(sid);
                
                if(vsw != null)
                    vsw.finish();//finish掉sid对应的SendWorker,vsw
                
                senderWorkerMap.put(sid, sw);//放入新的SendWorker,sw
                if (!queueSendMap.containsKey(sid)) {
                    queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                            SEND_CAPACITY));
                }
                
                sw.start();
                rw.start();
                
                return true;    
                
            }
            return false;
        }
    

    接收连接相关

    receiveConnection:接收外部connect

        public void receiveConnection(Socket sock) {//接收connect
            Long sid = null;
            
            try {
                // Read server id
                DataInputStream din = new DataInputStream(sock.getInputStream());
                sid = din.readLong();
                if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
                    sid = din.readLong();
    
                    // next comes the #bytes in the remainder of the message
                    // note that 0 bytes is fine (old servers)
                    int num_remaining_bytes = din.readInt();
                    if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
                        LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
                        closeSocket(sock);
                        return;
                    }
                    byte[] b = new byte[num_remaining_bytes];
    
                    // remove the remainder of the message from din
                    int num_read = din.read(b);
                    if (num_read != num_remaining_bytes) {
                        LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                    }
                }
                if (sid == QuorumPeer.OBSERVER_ID) {
                    /*
                     * Choose identifier at random. We need a value to identify
                     * the connection.
                     */
                    
                    sid = observerCounter--;
                    LOG.info("Setting arbitrary identifier to observer: " + sid);
                }
            } catch (IOException e) {
                closeSocket(sock);
                LOG.warn("Exception reading or writing challenge: " + e.toString());
                return;
            }
            
            //If wins the challenge, then close the new connection.
            if (sid < self.getId()) {//如果自己id大,就close掉当前连接(当前是小sid发给大sid的连接),自己再去连对方sid
                /*
                 * This replica might still believe that the connection to sid is
                 * up, so we have to shut down the workers before trying to open a
                 * new connection.
                 */
                SendWorker sw = senderWorkerMap.get(sid);
                if (sw != null) {
                    sw.finish();
                }
    
                /*
                 * Now we start a new connection
                 */
                LOG.debug("Create new connection to server: " + sid);
                closeSocket(sock);
                connectOne(sid);
    
                // Otherwise start worker threads to receive data.
            } else {
                SendWorker sw = new SendWorker(sock, sid);
                RecvWorker rw = new RecvWorker(sock, sid, sw);
                sw.setRecv(rw);
    
                SendWorker vsw = senderWorkerMap.get(sid);
                
                if(vsw != null)
                    vsw.finish();
                
                senderWorkerMap.put(sid, sw);
                
                if (!queueSendMap.containsKey(sid)) {
                    queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                            SEND_CAPACITY));
                }
                
                sw.start();
                rw.start();
                
                return;
            }
        }
    

    这里有个概念,就是win challenge和lose challenge
    在zk中,为了保证每一对server只有一个socket,Zookeeper只允许SID大的服务器主动和其他机器建立连接,否则断开连接。
    发出连接时,要求自己sid大,完成SendWorker和ReceiveWorker的构造以及线程启动,否则close
    接收连接时,要求自己sid小,完成SendWorker和ReceiveWorker的构造以及线程启动,否则close
    在“思考”中也会分析

    sender生产,消费相关函数

    生成,加入sender队列

        private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
              ByteBuffer buffer) {//发送队列长度为1,如果满了就remove,然后add
            if (queue.remainingCapacity() == 0) {
                try {
                    queue.remove();
                } catch (NoSuchElementException ne) {
                    // element could be removed by poll()
                    LOG.debug("Trying to remove from an empty " +
                            "Queue. Ignoring exception " + ne);
                }
            }
            try {
                queue.add(buffer);
            } catch (IllegalStateException ie) {
                // This should never happen
                LOG.error("Unable to insert an element in the queue " + ie);
            }
        }
    

    每个sender的队列长度都是1,为了避免发送旧的数据,因此会把旧的remove掉

    消费sender队列

        private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
              long timeout, TimeUnit unit) throws InterruptedException {
           return queue.poll(timeout, unit);
        }
    

    recv生产,消费相关函数

    recv队列生产

        public void addToRecvQueue(Message msg) {
            synchronized(recvQLock) {
                if (recvQueue.remainingCapacity() == 0) {
                    try {
                        recvQueue.remove();
                    } catch (NoSuchElementException ne) {
                        // element could be removed by poll()
                         LOG.debug("Trying to remove from an empty " +
                             "recvQueue. Ignoring exception " + ne);
                    }
                }
                try {
                    recvQueue.add(msg);//加入接收队列
                } catch (IllegalStateException ie) {
                    // This should never happen
                    LOG.error("Unable to insert element in the recvQueue " + ie);
                }
            }
        }
    

    这里并不是很清楚为什么加入队列时,如果满了要把前面的remove掉,队列的长度上限是100

    recv队列消费

        public Message pollRecvQueue(long timeout, TimeUnit unit)
           throws InterruptedException {
           return recvQueue.poll(timeout, unit);
        }
    

    其他函数

    toSend

    将消息根据sid添加进recv队列或者send队列,间接调用send,recv的生产

        public void toSend(Long sid, ByteBuffer b) {
            /*
             * If sending message to myself, then simply enqueue it (loopback).
             */
            if (self.getId() == sid) {//如果发送给自己,加入recv队列
                 b.position(0);
                 addToRecvQueue(new Message(b.duplicate(), sid));
                /*
                 * Otherwise send to the corresponding thread to send.
                 */
            } else {
                 /*
                  * Start a new connection if doesn't have one already.
                  */
                 if (!queueSendMap.containsKey(sid)) {//如果发送map没有记录这个sid
                     ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
                             SEND_CAPACITY);//阻塞队列长度为1
                     queueSendMap.put(sid, bq);
                     addToSendQueue(bq, b);
    
                 } else {
                     ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                     if(bq != null){
                         addToSendQueue(bq, b);
                     } else {
                         LOG.error("No queue for server " + sid);
                     }
                 }
                 connectOne(sid);//和这个sid建立连接
                    
            }
        }
    

    haveDelivered

    是否发送过消息

        /**
         * Check if all queues are empty, indicating that all messages have been delivered.
         */
        boolean haveDelivered() {//如果有一个队列是空的,代表发送过了,和注释不一致
            for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
                LOG.debug("Queue size: " + queue.size());
                if (queue.size() == 0) {
                    return true;
                }
            }
    
            return false;
        }
    

    这里吐槽一下,代码和注释不一致,在"思考"中进行讲解

    连接所有queueSendMap的sid

        public void connectAll(){//把所有需要发送消息的机器sid都连接上
            long sid;
            for(Enumeration<Long> en = queueSendMap.keys();//连接所有queueSendMap记录的sid
                en.hasMoreElements();){
                sid = en.nextElement();
                connectOne(sid);
            }      
        }
    

    思考

    tricky方法的体现:每一对server之间只有一个连接

    可以理解成n个server,互相之间都要用connection
    好比n个点,用无向的边连起来,用[sidn,sid1]表示sidn向sid1建立了连接
    那么,[sid1,sidn]就没有存在的必要了,也就是n*(n+1)/2条边就够了

    代码里面规定的是,正常条件下只有sid大的server向sid小的server建立连接
    体现在哪

    连接请求发出时,如果对方sid比自己大,仅仅发送自己sid也就是一个long过去,然后close掉
    QuorumCnxManager#initiateConnection
    
    接收连接请求时,如果对方sid比自己小,那么close掉socket然后自己去连接对方sid
    QuorumCnxManager#receiveConnection
    

    也就是说,当sid小的向sid大的server发送连接请求时,也只是在告诉对方
    “你sid大,你来连我”

    SendWorker以及RecvWorker的初始化
    如何保证[sidn,sid1]这样的连接中,双方都有初始化两个worker
    在连接请求发出时,sid大的一方,也就是sidn初始化两个worker
    在接收连接请求时,sid小的一方,也就是sid1初始化两个worker
    代码同样体现在上面两个函数中

    为什么RecvWorker和SendWorker要互相记录对方

    代码里面就是finish的时候
    SendWorker#finish调用对应的RecvWorker#finish
    RecvWorker#run的finally段也去调用SendWorker#finish

    然后变量senderWorkerMap是final ConcurrentHashMap<Long, SendWorker>类型的,
    可以看到并不存在对应RecvWorker的map,
    所以原因就是 根据sid找到SendWorker,然后方便调用finish方法

    为什么发送队列的长度为1,入队时满了就要把前面的踢出去

    长度为1 QuorumCnxManager#SEND_CAPACITY
    踢出去 QuorumCnxManager#addToSendQueue

    应该参考SEND_CAPACITY注释

    // Initialized to 1 to prevent sending
    // stale notifications to peers
    

    因为是选举leader投票,有特殊的要求:如果之前的票还没有投出去又产生了新的票,那么旧的票就可以直接作废了,不用真正的投出去

    发送队列的生产,消费与发送的调用顺序

    生产 
    QuorumCnxManager#toSend
    QuorumCnxManager#addToSendQueue
    
    消费
    QuorumCnxManager.SendWorker#run
    QuorumCnxManager#pollSendQueue
    
    发送
    QuorumCnxManager.SendWorker#run
    SendWorker#send
    

    queueSendMap的意义

    final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;//消息发送队列,key为各机器sid

    强调一下,key是sid,代表当前机器和对应sid的机器
    建立了联系或者有要发送的内容

    queueSendMap的改动只有put操作,没有remove等操作
    基本都是

        queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                            SEND_CAPACITY));
    

    QuorumCnxManager#haveDelivered 注释和代码不一致

    源码在之前贴过了
    这里的代码和注释不一致,加入生产队列的函数addToSendQueue只有在QuorumCnxManager#toSend中被调用

    toSend调用addToSendQueue
    之后讲FastLeaderElection#lookForLeader会知道toSend是建立连接的入口,也就是入口方会在queueSendMap中sid对应的发送队列添加一条记录
    而haveDelivered 方法中,如果有一个sid对应的队列长度为0,就代表发送队列的 任务 被消费掉了,也就是发送出去了,所以称之为"haveDeliverd",只不过注释有问题

    问题

    SendWorker发送队列没有东西的时候,把最后一次发送的内容再发一遍

    说是这样能解决 ”接收方在消息接收前或者接收到消息后服务器挂了“ 的问题
    那么倒数第二条为什么不发,倒数第三条为什么不发???

    QuorumCnxManager#addToRecvQueue接收队列满的时候就把最前面的删掉

    为什么,漏发了没关系吗

    总结

    这个类就是网络IO的调度器
    SendWorker和RecvWorker作为两个线程不断将消息进行收发
    Listener监听外部来的连接
    主要属性 queueSendMap记录和哪些sid建立联系,对应的发送队列是什么

    refer

    http://www.cnblogs.com/leesf456/p/6107600.html 第3部分 QuorumCnxManager:网络I/O
    《paoxs到zk》 7.6.3

    相关文章

      网友评论

        本文标题:zk源码阅读29:Server选举Leader时的网络IO:Qu

        本文链接:https://www.haomeiwen.com/subject/gynjlxtx.html