美文网首页
深入浅出Zookeeper Leader选举

深入浅出Zookeeper Leader选举

作者: fengjixcuhui | 来源:发表于2017-04-01 17:03 被阅读0次

    入口函数QuorumPeerMain主线程启动

    public void runFromConfig(QuorumPeerConfig config) throws IOException {
          try {
              ManagedUtil.registerLog4jMBeans();
          } catch (JMException e) {
              LOG.warn("Unable to register log4j JMX control", e);
          }
    
          LOG.info("Starting quorum peer");
          try {
              ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                                    config.getMaxClientCnxns());
    
              quorumPeer = new QuorumPeer();
              quorumPeer.setClientPortAddress(config.getClientPortAddress());
              quorumPeer.setTxnFactory(new FileTxnSnapLog(
                          new File(config.getDataLogDir()),
                          new File(config.getDataDir())));
              quorumPeer.setQuorumPeers(config.getServers());
              quorumPeer.setElectionType(config.getElectionAlg());
              quorumPeer.setMyid(config.getServerId());
              quorumPeer.setTickTime(config.getTickTime());
              quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
              quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
              quorumPeer.setInitLimit(config.getInitLimit());
              quorumPeer.setSyncLimit(config.getSyncLimit());
              quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
              quorumPeer.setCnxnFactory(cnxnFactory);
              quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
              quorumPeer.setLearnerType(config.getPeerType());
              quorumPeer.setSyncEnabled(config.getSyncEnabled());
              quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
    
              quorumPeer.start();
              quorumPeer.join();
          } catch (InterruptedException e) {
              // warn, but generally this is ok
              LOG.warn("Quorum Peer interrupted", e);
          }
        }
    

    QuorumPeer复写Thread.start方法

     @Override
        public synchronized void start() {
            loadDataBase();
            cnxnFactory.start();
            startLeaderElection();
            super.start();
        }
    

    loadDataBase 恢复epoch

     private void loadDataBase() {
            File updating = new File(getTxnFactory().getSnapDir(),
                                     UPDATING_EPOCH_FILENAME);
            try {
                //从本地文件恢复db  
                zkDb.loadDataBase();
    
                //从最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid  
                long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
                long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
                try {
                    currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
                    if (epochOfZxid > currentEpoch && updating.exists()) {
                        LOG.info("{} found. The server was terminated after " +
                                 "taking a snapshot but before updating current " +
                                 "epoch. Setting current epoch to {}.",
                                 UPDATING_EPOCH_FILENAME, epochOfZxid);
                        setCurrentEpoch(epochOfZxid);
                        if (!updating.delete()) {
                            throw new IOException("Failed to delete " +
                                                  updating.toString());
                        }
                    }
                } catch(FileNotFoundException e) {
                    // pick a reasonable epoch number
                    // this should only happen once when moving to a
                    // new code version
                    currentEpoch = epochOfZxid;
                    LOG.info(CURRENT_EPOCH_FILENAME
                            + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                            currentEpoch);
                    writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
                }
                if (epochOfZxid > currentEpoch) {
                    throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
                }
                try {
                    acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
                } catch(FileNotFoundException e) {
                    // pick a reasonable epoch number
                    // this should only happen once when moving to a
                    // new code version
                    acceptedEpoch = epochOfZxid;
                    LOG.info(ACCEPTED_EPOCH_FILENAME
                            + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                            acceptedEpoch);
                    writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
                }
                if (acceptedEpoch < currentEpoch) {
                    throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
                }
            } catch(IOException ie) {
                LOG.error("Unable to load database on disk", ie);
                throw new RuntimeException("Unable to run quorum server ", ie);
            }
        }
    

    开始Leader选举

    synchronized public void startLeaderElection() {
            try {
                currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
            } catch(IOException e) {
                RuntimeException re = new RuntimeException(e.getMessage());
                re.setStackTrace(e.getStackTrace());
                throw re;
            }
            for (QuorumServer p : getView().values()) {
                if (p.id == myid) {
                    myQuorumAddr = p.addr;
                    break;
                }
            }
            if (myQuorumAddr == null) {
                throw new RuntimeException("My id " + myid + " not in the peer list");
            }
            if (electionType == 0) {
                try {
                    udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                    responder = new ResponderThread();
                    responder.start();
                } catch (SocketException e) {
                    throw new RuntimeException(e);
                }
            }
            this.electionAlg = createElectionAlgorithm(electionType);
        }
    

    获取选举算法默认是FastLeaderElection

    protected Election createElectionAlgorithm(int electionAlgorithm){
            Election le=null;
    
            //TODO: use a factory rather than a switch
            switch (electionAlgorithm) {
            case 0:
                le = new LeaderElection(this);
                break;
            case 1:
                le = new AuthFastLeaderElection(this);
                break;
            case 2:
                le = new AuthFastLeaderElection(this, true);
                break;
            case 3:
                qcm = new QuorumCnxManager(this);
                QuorumCnxManager.Listener listener = qcm.listener;
                if(listener != null){
                    listener.start();
                    le = new FastLeaderElection(this, qcm);
                } else {
                    LOG.error("Null listener when initializing cnx manager");
                }
                break;
            default:
                assert false;
            }
            return le;
        }
    

    FastLeaderElection初始化 。构造参数调用了starter方法(原因看注释)

     /**
         * Constructor of FastLeaderElection. It takes two parameters, one
         * is the QuorumPeer object that instantiated this object, and the other
         * is the connection manager. Such an object should be created only once
         * by each peer during an instance of the ZooKeeper service.
         *
         * @param self  QuorumPeer that created this object
         * @param manager   Connection manager
         */
        public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
            this.stop = false;
            this.manager = manager;
            starter(self, manager);
        }
    
       /** This method is invoked by the constructor. Because it is a
         * part of the starting procedure of the object that must be on
         * any constructor of this class, it is probably best to keep as
         * a separate method. As we have a single constructor currently,
         * it is not strictly necessary to have it separate.
         *
         * @param self      QuorumPeer that created this object
         * @param manager   Connection manager
         */
        private void starter(QuorumPeer self, QuorumCnxManager manager) {
            this.self = self;
            proposedLeader = -1;
            proposedZxid = -1;
    
            sendqueue = new LinkedBlockingQueue<ToSend>();
            recvqueue = new LinkedBlockingQueue<Notification>();
            this.messenger = new Messenger(manager);
        }
         /**
             * Constructor of class Messenger.
             *
             * @param manager   Connection manager
             */
            Messenger(QuorumCnxManager manager) {
    
                this.ws = new WorkerSender(manager);
    
                Thread t = new Thread(this.ws,
                        "WorkerSender[myid=" + self.getId() + "]");
                t.setDaemon(true);
                t.start();
    
                this.wr = new WorkerReceiver(manager);
    
                t = new Thread(this.wr,
                        "WorkerReceiver[myid=" + self.getId() + "]");
                t.setDaemon(true);
                t.start();
            }
    
    

    QuorumPeer线程启动

    try {
                /*
                 * Main loop
                 */
                while (running) {
                    switch (getPeerState()) {
                    case LOOKING:
                        LOG.info("LOOKING");
                         {
                            try {
                                setBCVote(null);
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception", e);
                                setPeerState(ServerState.LOOKING);
                            }
                        }
                        break;
                    case OBSERVING:
                        try {
                            LOG.info("OBSERVING");
                            setObserver(makeObserver(logFactory));
                            observer.observeLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e );
                        } finally {
                            observer.shutdown();
                            setObserver(null);
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    case FOLLOWING:
                        try {
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            follower.shutdown();
                            setFollower(null);
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    case LEADING:
                        LOG.info("LEADING");
                        try {
                            setLeader(makeLeader(logFactory));
                            leader.lead();
                            setLeader(null);
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            if (leader != null) {
                                leader.shutdown("Forcing shutdown");
                                setLeader(null);
                            }
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    }
                }
            } finally {
                LOG.warn("QuorumPeer main thread exited");
                try {
                    MBeanRegistry.getInstance().unregisterAll();
                } catch (Exception e) {
                    LOG.warn("Failed to unregister with JMX", e);
                }
                jmxQuorumBean = null;
                jmxLocalPeerBean = null;
            }
        }
    
    

    进入选举流程

    /**
         * Starts a new round of leader election. Whenever our QuorumPeer
         * changes its state to LOOKING, this method is invoked, and it
         * sends notifications to all other peers.
         */
        public Vote lookForLeader() throws InterruptedException {
            if (self.start_fle == 0) {
               self.start_fle = System.currentTimeMillis();
            }
            try {
                HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
    
                HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
    
                int notTimeout = finalizeWait;
    
                synchronized(this){
                    logicalclock++;
                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                }
    
                LOG.info("New election. My id =  " + self.getId() +
                        ", proposed zxid=0x" + Long.toHexString(proposedZxid));
                sendNotifications();
    
                /*
                 * Loop in which we exchange notifications until we find a leader
                 */
    
                while ((self.getPeerState() == ServerState.LOOKING) &&
                        (!stop)){
                    /*
                     * Remove next notification from queue, times out after 2 times
                     * the termination time
                     */
                    Notification n = recvqueue.poll(notTimeout,
                            TimeUnit.MILLISECONDS);
    
                    /*
                     * Sends more notifications if haven't received enough.
                     * Otherwise processes new notification.
                     */
                    if(n == null){
                    //消息发完了,继续发送,一直到选出leader为止  
                        if(manager.haveDelivered()){
                            sendNotifications();
                        } else {
                      //消息还在,可能其他server还没启动,尝试连接  
                            manager.connectAll();
                        }
    
                        /*
                         * Exponential backoff
                         */
                        int tmpTimeOut = notTimeout*2;
                        notTimeout = (tmpTimeOut < maxNotificationInterval?
                                tmpTimeOut : maxNotificationInterval);
                        LOG.info("Notification time out: " + notTimeout);
                    }
                    else if(self.getVotingView().containsKey(n.sid)) {
                        /*
                         * Only proceed if the vote comes from a replica in the
                         * voting view.
                         */
                        switch (n.state) {
                        case LOOKING:
                            // If notification > current, replace and send messages out
                            if (n.electionEpoch > logicalclock) {
                                logicalclock = n.electionEpoch;
                                recvset.clear();
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                                } else {
                                    updateProposal(getInitId(),
                                            getInitLastLoggedZxid(),
                                            getPeerEpoch());
                                }
                                sendNotifications();
                            } else if (n.electionEpoch < logicalclock) {
                                if(LOG.isDebugEnabled()){
                                    LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                            + Long.toHexString(n.electionEpoch)
                                            + ", logicalclock=0x" + Long.toHexString(logicalclock));
                                }
                                break;
                            } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                                sendNotifications();
                            }
    
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Adding vote: from=" + n.sid +
                                        ", proposed leader=" + n.leader +
                                        ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                        ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                            }
    
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
    
                            if (termPredicate(recvset,
                                    new Vote(proposedLeader, proposedZxid,
                                            logicalclock, proposedEpoch))) {
    
                                // Verify if there is any change in the proposed leader
                                while((n = recvqueue.poll(finalizeWait,
                                        TimeUnit.MILLISECONDS)) != null){
                                    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                            proposedLeader, proposedZxid, proposedEpoch)){
                                        recvqueue.put(n);
                                        break;
                                    }
                                }
    
                                /*
                                 * This predicate is true once we don't read any new
                                 * relevant message from the reception queue
                                 */
                                if (n == null) {
                                    self.setPeerState((proposedLeader == self.getId()) ?
                                            ServerState.LEADING: learningState());
    
                                    Vote endVote = new Vote(proposedLeader,
                                                            proposedZxid,
                                                            logicalclock,
                                                            proposedEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                            }
                            break;
          //如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经服务的zk集群时   OBSER不参与投票
                        case OBSERVING:
                            LOG.debug("Notification from observer: " + n.sid);
                            break;
                        case FOLLOWING:
                        case LEADING:
                            /*
                             * Consider all notifications from the same epoch
                             * together.
                             */
                            if(n.electionEpoch == logicalclock){
                                recvset.put(n.sid, new Vote(n.leader,
                                                              n.zxid,
                                                              n.electionEpoch,
                                                              n.peerEpoch));
    
                                if(ooePredicate(recvset, outofelection, n)) {
                                    self.setPeerState((n.leader == self.getId()) ?
                                            ServerState.LEADING: learningState());
    
                                    Vote endVote = new Vote(n.leader,
                                            n.zxid,
                                            n.electionEpoch,
                                            n.peerEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                            }
    
                            /*
                             * Before joining an established ensemble, verify
                             * a majority is following the same leader.
                             */
                            outofelection.put(n.sid, new Vote(n.version,
                                                                n.leader,
                                                                n.zxid,
                                                                n.electionEpoch,
                                                                n.peerEpoch,
                                                                n.state));
    
                            if(ooePredicate(outofelection, outofelection, n)) {
                                synchronized(this){
                                    logicalclock = n.electionEpoch;
                                    self.setPeerState((n.leader == self.getId()) ?
                                            ServerState.LEADING: learningState());
                                }
                                Vote endVote = new Vote(n.leader,
                                                        n.zxid,
                                                        n.electionEpoch,
                                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                            break;
                        default:
                            LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                                    n.state, n.sid);
                            break;
                        }
                    } else {
                        LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                    }
                }
                return null;
            } finally {
                try {
                    if(self.jmxLeaderElectionBean != null){
                        MBeanRegistry.getInstance().unregister(
                                self.jmxLeaderElectionBean);
                    }
                } catch (Exception e) {
                    LOG.warn("Failed to unregister with JMX", e);
                }
                self.jmxLeaderElectionBean = null;
            }
        }
    

    发送投票信息

    /**
         * Send notifications to all peers upon a change in our vote
         */
        private void sendNotifications() {
            for (QuorumServer server : self.getVotingView().values()) {
                long sid = server.id;
    
                ToSend notmsg = new ToSend(ToSend.mType.notification,
                        proposedLeader,
                        proposedZxid,
                        logicalclock,
                        QuorumPeer.ServerState.LOOKING,
                        sid,
                        proposedEpoch);
                if(LOG.isDebugEnabled()){
                    LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                          Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
                          " (n.round), " + sid + " (recipient), " + self.getId() +
                          " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
                }
                sendqueue.offer(notmsg);
            }
        }
    

    WorkerSender 线程

    /**
             * This worker simply dequeues a message to send and
             * and queues it on the manager's queue.
             */
    
            class WorkerSender extends ZooKeeperThread {
                volatile boolean stop;
                QuorumCnxManager manager;
    
                WorkerSender(QuorumCnxManager manager){
                    super("WorkerSender");
                    this.stop = false;
                    this.manager = manager;
                }
    
                public void run() {
                    while (!stop) {
                        try {
                            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                            if(m == null) continue;
    
                            process(m);
                        } catch (InterruptedException e) {
                            break;
                        }
                    }
                    LOG.info("WorkerSender is down");
                }
    
                /**
                 * Called by run() once there is a new message to send.
                 *
                 * @param m     message to send
                 */
                void process(ToSend m) {
                    ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                                            m.leader,
                                                            m.zxid,
                                                            m.electionEpoch,
                                                            m.peerEpoch);
                    manager.toSend(m.sid, requestBuffer);
                }
            }
    

    QuorumCnxManager 负责具体发送

    /**
         * Processes invoke this message to queue a message to send. Currently,
         * only leader election uses it.
         */
        public void toSend(Long sid, ByteBuffer b) {
            /*
             * If sending message to myself, then simply enqueue it (loopback).
             */
            if (self.getId() == sid) {
                 b.position(0);
                 addToRecvQueue(new Message(b.duplicate(), sid));
                /*
                 * Otherwise send to the corresponding thread to send.
                 */
            } else {
                 /*
                  * Start a new connection if doesn't have one already.
                  */
                 if (!queueSendMap.containsKey(sid)) {
                     ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
                             SEND_CAPACITY);
                     queueSendMap.put(sid, bq);
                     addToSendQueue(bq, b);
    
                 } else {
                     ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                     if(bq != null){
                         addToSendQueue(bq, b);
                     } else {
                         LOG.error("No queue for server " + sid);
                     }
                 }
                 connectOne(sid);
    
            }
        }
    

    connectOne尝试建立连接

    synchronized void connectOne(long sid){
            if (senderWorkerMap.get(sid) == null){
                InetSocketAddress electionAddr;
                if (self.quorumPeers.containsKey(sid)) {
                    electionAddr = self.quorumPeers.get(sid).electionAddr;
                } else {
                    LOG.warn("Invalid server id: " + sid);
                    return;
                }
                try {
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Opening channel to server " + sid);
                    }
                    Socket sock = new Socket();
                    setSockOpts(sock);
                    sock.connect(self.getView().get(sid).electionAddr, cnxTO);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Connected to server " + sid);
                    }
                    initiateConnection(sock, sid);
                } catch (UnresolvedAddressException e) {
                    // Sun doesn't include the address that causes this
                    // exception to be thrown, also UAE cannot be wrapped cleanly
                    // so we log the exception in order to capture this critical
                    // detail.
                    LOG.warn("Cannot open channel to " + sid
                            + " at election address " + electionAddr, e);
                    // Resolve hostname for this server in case the
                    // underlying ip address has changed.
                    if (self.getView().containsKey(sid)) {
                        self.getView().get(sid).recreateSocketAddresses();
                    }
                    throw e;
                } catch (IOException e) {
                    LOG.warn("Cannot open channel to " + sid
                            + " at election address " + electionAddr,
                            e);
                    // We can't really tell if the server is actually down or it failed
                    // to connect to the server because the underlying IP address
                    // changed. Resolve the hostname again just in case.
                    if (self.getView().containsKey(sid)) {
                        self.getView().get(sid).recreateSocketAddresses();
                    }
                }
            } else {
                LOG.debug("There is a connection already for server " + sid);
            }
        }
    

    由于这个时候只有server.1启动,当它尝试去连接其他server时,会报错,选举线程会一直重试。此时,server.1只收到了自己的选票。然后我们启动server.2,server.2也会主动去连接server.1,这个时候server.1h和server.2会相互发起连接,但最终只有有一个连接成功.

    /**
         * If this server has initiated the connection, then it gives up on the
         * connection if it loses challenge. Otherwise, it keeps the connection.
         */
        public boolean initiateConnection(Socket sock, Long sid) {
            DataOutputStream dout = null;
            try {
                // Sending id and challenge
                dout = new DataOutputStream(sock.getOutputStream());
                dout.writeLong(self.getId());
                dout.flush();
            } catch (IOException e) {
                LOG.warn("Ignoring exception reading or writing challenge: ", e);
                closeSocket(sock);
                return false;
            }
    
            // If lost the challenge, then drop the new connection
            if (sid > self.getId()) {
                LOG.info("Have smaller server identifier, so dropping the " +
                         "connection: (" + sid + ", " + self.getId() + ")");
                closeSocket(sock);
                // Otherwise proceed with the connection
            } else {
                SendWorker sw = new SendWorker(sock, sid);
                RecvWorker rw = new RecvWorker(sock, sid, sw);
                sw.setRecv(rw);
    
                SendWorker vsw = senderWorkerMap.get(sid);
    
                if(vsw != null)
                    vsw.finish();
    
                senderWorkerMap.put(sid, sw);
                if (!queueSendMap.containsKey(sid)) {
                    queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                            SEND_CAPACITY));
                }
    
                sw.start();
                rw.start();
    
                return true;
    
            }
            return false;
        }
    
    

    只有sid 更大节点 可以主动发起连接.
    SendWorker Thread

    synchronized void send(ByteBuffer b) throws IOException {
                byte[] msgBytes = new byte[b.capacity()];
                try {
                    b.position(0);
                    b.get(msgBytes);
                } catch (BufferUnderflowException be) {
                    LOG.error("BufferUnderflowException ", be);
                    return;
                }
                dout.writeInt(b.capacity());
                dout.write(b.array());
                dout.flush();
            }
    
           @Override
            public void run() {
                threadCnt.incrementAndGet();
                try {
                    /**
                     * If there is nothing in the queue to send, then we
                     * send the lastMessage to ensure that the last message
                     * was received by the peer. The message could be dropped
                     * in case self or the peer shutdown their connection
                     * (and exit the thread) prior to reading/processing
                     * the last message. Duplicate messages are handled correctly
                     * by the peer.
                     *
                     * If the send queue is non-empty, then we have a recent
                     * message than that stored in lastMessage. To avoid sending
                     * stale message, we should send the message in the send queue.
                     */
                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                    if (bq == null || isSendQueueEmpty(bq)) {
                       ByteBuffer b = lastMessageSent.get(sid);
                       if (b != null) {
                           LOG.debug("Attempting to send lastMessage to sid=" + sid);
                           send(b);
                       }
                    }
                } catch (IOException e) {
                    LOG.error("Failed to send last message. Shutting down thread.", e);
                    this.finish();
                }
    
                try {
                    while (running && !shutdown && sock != null) {
    
                        ByteBuffer b = null;
                        try {
                            ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                                    .get(sid);
                            if (bq != null) {
                                b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                            } else {
                                LOG.error("No queue of incoming messages for " +
                                          "server " + sid);
                                break;
                            }
    
                            if(b != null){
                                lastMessageSent.put(sid, b);
                                send(b);
                            }
                        } catch (InterruptedException e) {
                            LOG.warn("Interrupted while waiting for message on queue",
                                    e);
                        }
                    }
                } catch (Exception e) {
                    LOG.warn("Exception when using channel: for id " + sid + " my id = " +
                            self.getId() + " error = " + e);
                }
                this.finish();
                LOG.warn("Send worker leaving thread");
            }
    
    

    QuorumCnxManager Listener

    @Override
            public void run() {
                int numRetries = 0;
                InetSocketAddress addr;
                while((!shutdown) && (numRetries < 3)){
                    try {
                        ss = new ServerSocket();
                        ss.setReuseAddress(true);
                        if (self.getQuorumListenOnAllIPs()) {
                            int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
                            addr = new InetSocketAddress(port);
                        } else {
                            addr = self.quorumPeers.get(self.getId()).electionAddr;
                        }
                        LOG.info("My election bind port: " + addr.toString());
                        setName(self.quorumPeers.get(self.getId()).electionAddr
                                .toString());
                        ss.bind(addr);
                        while (!shutdown) {
                            Socket client = ss.accept();
                            setSockOpts(client);
                            LOG.info("Received connection request "
                                    + client.getRemoteSocketAddress());
                            receiveConnection(client);
                            numRetries = 0;
                        }
                    } catch (IOException e) {
                        LOG.error("Exception while listening", e);
                        numRetries++;
                        try {
                            ss.close();
                            Thread.sleep(1000);
                        } catch (IOException ie) {
                            LOG.error("Error closing server socket", ie);
                        } catch (InterruptedException ie) {
                            LOG.error("Interrupted while sleeping. " +
                                      "Ignoring exception", ie);
                        }
                    }
                }
                LOG.info("Leaving listener");
                if (!shutdown) {
                    LOG.error("As I'm leaving the listener thread, "
                            + "I won't be able to participate in leader "
                            + "election any longer: "
                            + self.quorumPeers.get(self.getId()).electionAddr);
                }
            }
        public void receiveConnection(Socket sock) {
             Long sid = null;
             DataInputStream din = new DataInputStream(sock.getInputStream());
             sid = din.readLong();
            //If wins the challenge, then close the new connection.
            if (sid < self.getId()) {
                /*
                 * This replica might still believe that the connection to sid is
                 * up, so we have to shut down the workers before trying to open a
                 * new connection.
                 */
                SendWorker sw = senderWorkerMap.get(sid);
                if (sw != null) {
                    sw.finish();
                }
    
                /*
                 * Now we start a new connection
                 */
                LOG.debug("Create new connection to server: " + sid);
                closeSocket(sock);
                connectOne(sid);
                // Otherwise start worker threads to receive data.
            } else {
                SendWorker sw = new SendWorker(sock, sid);
                RecvWorker rw = new RecvWorker(sock, sid, sw);
                sw.setRecv(rw);
    
                SendWorker vsw = senderWorkerMap.get(sid);
    
                if(vsw != null)
                    vsw.finish();
    
                senderWorkerMap.put(sid, sw);
    
                if (!queueSendMap.containsKey(sid)) {
                    queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                            SEND_CAPACITY));
                }
                sw.start();
                rw.start();
                return;
            }
        }
    

    RecvWorker Thread

      @Override
            public void run() {
                threadCnt.incrementAndGet();
                try {
                    while (running && !shutdown && sock != null) {
                        /**
                         * Reads the first int to determine the length of the
                         * message
                         */
                        int length = din.readInt();
                        if (length <= 0 || length > PACKETMAXSIZE) {
                            throw new IOException(
                                    "Received packet with invalid packet: "
                                            + length);
                        }
                        /**
                         * Allocates a new ByteBuffer to receive the message
                         */
                        byte[] msgArray = new byte[length];
                        din.readFully(msgArray, 0, length);
                        ByteBuffer message = ByteBuffer.wrap(msgArray);
                        addToRecvQueue(new Message(message.duplicate(), sid));
                    }
                } catch (Exception e) {
                    LOG.warn("Connection broken for id " + sid + ", my id = " +
                            self.getId() + ", error = " , e);
                } finally {
                    LOG.warn("Interrupting SendWorker");
                    sw.finish();
                    if (sock != null) {
                        closeSocket(sock);
                    }
                }
            }
        }
    

    业务层接受线程

    public void run() {
    
                    Message response;
                    while (!stop) {
                        // Sleeps on receive
                        try{
                            response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                            if(response == null) continue;
    
                            /*
                             * If it is from an observer, respond right away.
                             * Note that the following predicate assumes that
                             * if a server is not a follower, then it must be
                             * an observer. If we ever have any other type of
                             * learner in the future, we'll have to change the
                             * way we check for observers.
                             */
                           //如果是Observer,则返回当前选举结果  
                            if(!self.getVotingView().containsKey(response.sid)){
                                Vote current = self.getCurrentVote();
                                ToSend notmsg = new ToSend(ToSend.mType.notification,
                                        current.getId(),
                                        current.getZxid(),
                                        logicalclock,
                                        self.getPeerState(),
                                        response.sid,
                                        current.getPeerEpoch());
    
                                sendqueue.offer(notmsg);
                            } else {
                                // Receive new message
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Receive new notification message. My id = "
                                            + self.getId());
                                }
    
                                /*
                                 * We check for 28 bytes for backward compatibility
                                 */
                                if (response.buffer.capacity() < 28) {
                                    LOG.error("Got a short response: "
                                            + response.buffer.capacity());
                                    continue;
                                }
                                boolean backCompatibility = (response.buffer.capacity() == 28);
                                response.buffer.clear();
    
                                // Instantiate Notification and set its attributes
                                Notification n = new Notification();
    
                                // State of peer that sent this message
                                QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                                switch (response.buffer.getInt()) {
                                case 0:
                                    ackstate = QuorumPeer.ServerState.LOOKING;
                                    break;
                                case 1:
                                    ackstate = QuorumPeer.ServerState.FOLLOWING;
                                    break;
                                case 2:
                                    ackstate = QuorumPeer.ServerState.LEADING;
                                    break;
                                case 3:
                                    ackstate = QuorumPeer.ServerState.OBSERVING;
                                    break;
                                default:
                                    continue;
                                }
    
                                n.leader = response.buffer.getLong();
                                n.zxid = response.buffer.getLong();
                                n.electionEpoch = response.buffer.getLong();
                                n.state = ackstate;
                                n.sid = response.sid;
                                if(!backCompatibility){
                                    n.peerEpoch = response.buffer.getLong();
                                } else {
                                    if(LOG.isInfoEnabled()){
                                        LOG.info("Backward compatibility mode, server id=" + n.sid);
                                    }
                                    n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
                                }
    
                                /*
                                 * Version added in 3.4.6
                                 */
    
                                n.version = (response.buffer.remaining() >= 4) ?
                                             response.buffer.getInt() : 0x0;
    
                                /*
                                 * Print notification info
                                 */
                                if(LOG.isInfoEnabled()){
                                    printNotification(n);
                                }
    
                                /*
                                 * If this server is looking, then send proposed leader
                                 */
    
                                if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                                    recvqueue.offer(n);
    
                                    /*
                                     * Send a notification back if the peer that sent this
                                     * message is also looking and its logical clock is
                                     * lagging behind.
                                     */
                                    if((ackstate == QuorumPeer.ServerState.LOOKING)
                                            && (n.electionEpoch < logicalclock)){
                                        Vote v = getVote();
                                        ToSend notmsg = new ToSend(ToSend.mType.notification,
                                                v.getId(),
                                                v.getZxid(),
                                                logicalclock,
                                                self.getPeerState(),
                                                response.sid,
                                                v.getPeerEpoch());
                                        sendqueue.offer(notmsg);
                                    }
                                } else {
                                   //如果自己不在选举中,而对方server在LOOKING中,则向其发送当前的选举结果,当有server加入一个essemble时有用  
                                    /*
                                     * If this server is not looking, but the one that sent the ack
                                     * is looking, then send back what it believes to be the leader.
                                     */
                                    Vote current = self.getCurrentVote();
                                    if(ackstate == QuorumPeer.ServerState.LOOKING){
                                        if(LOG.isDebugEnabled()){
                                            LOG.debug("Sending new notification. My id =  " +
                                                    self.getId() + " recipient=" +
                                                    response.sid + " zxid=0x" +
                                                    Long.toHexString(current.getZxid()) +
                                                    " leader=" + current.getId());
                                        }
    
                                        ToSend notmsg;
                                        if(n.version > 0x0) {
                                            notmsg = new ToSend(
                                                    ToSend.mType.notification,
                                                    current.getId(),
                                                    current.getZxid(),
                                                    current.getElectionEpoch(),
                                                    self.getPeerState(),
                                                    response.sid,
                                                    current.getPeerEpoch());
    
                                        } else {
                                            Vote bcVote = self.getBCVote();
                                            notmsg = new ToSend(
                                                    ToSend.mType.notification,
                                                    bcVote.getId(),
                                                    bcVote.getZxid(),
                                                    bcVote.getElectionEpoch(),
                                                    self.getPeerState(),
                                                    response.sid,
                                                    bcVote.getPeerEpoch());
                                        }
                                        sendqueue.offer(notmsg);
                                    }
                                }
                            }
                        } catch (InterruptedException e) {
                            System.out.println("Interrupted Exception while waiting for new message" +
                                    e.toString());
                        }
                    }
                    LOG.info("WorkerReceiver is down");
                }
            }
    

    由于整个集群只有3台机器,所以server.1和server.2启动后,即可选举出Leader。
    Leader选举小结
    1 server启动时默认选举自己,并向整个集群广播
    2 收到消息时,通过3层判断:选举轮数,zxid,server id大小判断是否同意对方,如果同意,则修改自己的选票,并向集群广播
    3 QuorumCnxManager负责IO处理,每2个server建立一个连接,只允许id大的server连id小的server,每个server启动单独的读写线程处理,使用阻塞IO
    4.默认超过半数机器同意时,则选举成功,修改自身状态为LEADING或FOLLOWING
    5.Obserer机器不参与选举

    相关文章

      网友评论

          本文标题:深入浅出Zookeeper Leader选举

          本文链接:https://www.haomeiwen.com/subject/sbiiottx.html