美文网首页
zookeeper的原理-leader选举

zookeeper的原理-leader选举

作者: 剑道_7ffc | 来源:发表于2020-05-15 07:42 被阅读0次

    Zookeeper 的一致性

    Zookeeper 的来源

    通过算法来解决分布式环境下多个节点中选举出一个leader。

    Zookeeper 的一致性是什么情况?

    根据zab协议的同步流程,数据同步是采用过半提交策略,意味着它是最终一致性,而不是强一致性。
    zookeeper 是一个顺序一致性模型。

    什么是顺序一致性呢

    下图是弱一致性


    image.png

    下图是更强的一直性保证,若如果 B1 得到的 x 的值为 1,那么 C1 看到的值也一定是 1


    image.png
    顺序一致性是针对单个操作,单个数据对象。属于 CAP 中 C这个范畴。一个数据被更新后,能够立马被后续的读操作读到。
    zookeeper 的顺序一致性实现是缩水版,zookeeper可以在执行读取操作前调用sync来解决弱一致性的问题(客户端Az执行了更新操作,客户端B可能读取是旧值,也可以是新值);zookeeper 基于 zxid 以及阻塞队列的方式来实现请求的顺序一致性,若客户端刚开始从最新的follower读取,从而获取了zxid,若又向旧的follower连接,则连接失败,原因是客户端的zxid大于服务端的zxid。因此zookeeper保证的是时间轴的一致性。

    Single System Image

    client 只要连接过一次 zookeeper,就不会有历史的倒退。

    leader 选举的原理

    leader选举的两个阶段,一是服务器启动时的leader选举,二是运行过程中leader宕机导致的leader选举。

    重要的参数

    服务器 ID(myId)

    编号越大在选举算法的权重越大。

    事务id(zxid)

    值越大说明数据越新,在选举算法的权重越大。

    逻辑时钟(epoch – logicalclock)

    投票的次数,每投完一次票,则加1

    选举状态

    LOOKING,竞选状态。
    FOLLOWING,随从状态,同步 leader 状态,参与投票。
    OBSERVING,观察状态,同步 leader 状态,不参与投票。
    LEADING,领导者状态。

    服务器启动时的 leader 选举

    1 每个服务器处于LOOKING,发起一个投票(epoch,ZXID,myId),并把投票发送到其他机器。
    2 接受其他机器的投票,并检验有效性如检查是否是本轮投票等
    3 处理投票:将别人的投票和本机器的投票进行比较,比较顺序是epoch,ZXID和myid,大的是leader。
    4 统计投票:判断是否已有过半的相同的投票信息
    5 改变服务器状态:一旦确定leader则更新自己的服务器状态,leader更新为LEADING,follow更新为FOLLOWING。u若没有则重新发起投票。

    运行过程中的 leader 选举

    1 变更状态:将剩余非Observer服务器的状态变为LOOKING.
    2 剩余的其他流程和启动时一样。

    leader 选举的源码分析

    入口

    通过server.sh的来启动QuorumPeerMain的main方法

    选举逻辑

    正常启动的流程图
    image.png
    核心源码

    1 org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader

    public Vote lookForLeader() throws InterruptedException {
        try {
            //接收到的票据的集合
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
    
            //
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
    
            int notTimeout = finalizeWait;
    
            synchronized(this){
                //逻辑时钟->epoch
                logicalclock.incrementAndGet();
                //proposal
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }
    
            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications();//我要广播自己的票据
    
            /*
             * Loop in which we exchange notifications until we find a leader
             */
    
            //接收到了票据
            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                //recvqueue是从网络上接收到的其他机器的Notification
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);
    
                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                        manager.connectAll();//重新连接集群中的所有节点
                    }
    
                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
    
                else if(validVoter(n.sid) && validVoter(n.leader)) {//判断是否是一个有效的票据
                    /*
                     * Only proceed if the vote comes from a replica in the
                     * voting view for a replica in the voting view.
                     */
                    switch (n.state) {
                    case LOOKING: //第一次进入到这个case
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) { //
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();//清空
                            //收到票据之后,当前的server要听谁的。
                            //可能是听server1的、也可能是听server2,也可能是听server3
                            //zab  leader选举算法
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //把自己的票据更新成对方的票据,那么下一次,发送的票据就是新的票据
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                //收到的票据小于当前的节点的票据,下一次发送票据,仍然发送自己的
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            //继续发送通知
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) { //说明当前的数据已经过期了
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }
    
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }
    
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        //决断时刻(当前节点的更新后的vote信息,和recvset集合中的票据进行归纳,)
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {
    
                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }
    
                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
    
                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock.get(),
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
            }
            return null;
        }
    }
    

    2 org.apache.zookeeper.server.quorum.FastLeaderElection#totalOrderPredicate

        protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
            LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                    Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
            if(self.getQuorumVerifier().getWeight(newId) == 0){
                return false;
            }
            
            /*
             *
             * We return true if one of the following three cases hold:
             * 1- New epoch is higher
             * 2- New epoch is the same as current epoch, but new zxid is higher
             * 3- New epoch is the same as current epoch, new zxid is the same
             *  as current zxid, but server id is higher.
             */
            
            return ((newEpoch > curEpoch) || 
                    ((newEpoch == curEpoch) &&
                    ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
        }
    

    3 org.apache.zookeeper.server.quorum.FastLeaderElection#sendNotifications

        private void sendNotifications() {
            for (QuorumServer server : self.getVotingView().values()) {
                long sid = server.id;
    
                ToSend notmsg = new ToSend(ToSend.mType.notification,
                        proposedLeader, //myid
                        proposedZxid, //zxid
                        logicalclock.get(),//epoch
                        QuorumPeer.ServerState.LOOKING,//
                        sid, //myid
                        proposedEpoch); //发起票据epoch
                if(LOG.isDebugEnabled()){
                    LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                          Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                          " (n.round), " + sid + " (recipient), " + self.getId() +
                          " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
                }
                sendqueue.offer(notmsg); //阻塞队列,  线程->生产者消费者模式
            }
        }
    

    4 org.apache.zookeeper.server.quorum.FastLeaderElection#termPredicate

        protected boolean termPredicate(
                HashMap<Long, Vote> votes,
                Vote vote) {
    
            HashSet<Long> set = new HashSet<Long>();
    
            /*
             * First make the views consistent. Sometimes peers will have
             * different zxids for a server depending on timing.
             *
             */
            for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
                if (vote.equals(entry.getValue())){ //对票据进行归纳
                    set.add(entry.getKey()); //如果存在2票,set里面是不是有2个?
                }
            }
    
            return self.getQuorumVerifier().containsQuorum(set); //验证
        }
    

    5 org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum

        public boolean containsQuorum(Set<Long> set){
            return (set.size() > half); //已经归纳的票据是否大于half .2>1  -> leader选举、 数据同步
        }
    

    消息传输

    通信流程图
    image.png
    image.png

    选举完成的处理逻辑

    FOLLOWING

    1 流程图


    image.png

    2 核心代码方法
    org.apache.zookeeper.server.quorum.Follower#followLeader

    LEADING

    1 核心代码方法
    org.apache.zookeeper.server.quorum.Leader#lead

    相关文章

      网友评论

          本文标题:zookeeper的原理-leader选举

          本文链接:https://www.haomeiwen.com/subject/tqlqohtx.html