美文网首页
zk源码阅读42:Leader源码解析

zk源码阅读42:Leader源码解析

作者: 赤子心_d709 | 来源:发表于2017-08-27 16:02 被阅读348次

    摘要

    前面两节讲了Learner,定义了Learner角色。
    以及LearnerHandler,完成Learner与Leader的交互,
    这一节讲解Leader,定义Leader的角色,主要讲解

    内部类
      Proposal,提议的数据结构
      ToBeAppliedRequestProcessor
      XidRolloverException
      LearnerCnxAcceptor,线程,监听Learner连接,启动LearnerHandler不断交互
    属性
      消息类型相关
      其他
    函数
      启动期相关函数
        lead,入口函数
        getEpochToPropose:获取集群最大的lastAcceptedEpoch,+1用于设置新的acceptedEpoch
        waitForEpochAck:等待过半机器(Learner和leader)针对Leader发出的LEADERINFO回复ACKEPOCH
        waitForNewLeaderAck:等到有过半的参与者针对Leader发出的NEWLEADER返回ACK
        startZkServer:启动zk
      运行期处理相关
        处理ACK
          processAck:针对提议回复ACK的处理逻辑,如果过半验证了就通知所有Learner
          commit,sendPacket:针对提议的确认,给参与者进行通知
          inform,sendObserverPacket:针对提议的确认,给观察者进行通知
        处理提议
          propose:根据Request产生提议,发给所有参与者
        处理同步请求
          processSync:处理同步请求(暂时不懂)
          sendSync:发送Sync请求给合适的server
      LearnerHandler相关:
        startForwarding:启动交互时,把内存中的提议,即将生效的提议(都还没记录在事务日志当中)告诉给Learner
    

    内部类

    Leader内部类

    Proposal

    即经常看的的 "提议"
    源码如下

        static public class Proposal {//提议
            public QuorumPacket packet;//集群间传递的包
    
            public HashSet<Long> ackSet = new HashSet<Long>();//接收到ack的机器sid集合
    
            public Request request;//请求
    
            @Override
            public String toString() {
                return packet.getType() + ", " + packet.getZxid() + ", " + request;
            }
        }
    

    ToBeAppliedRequestProcessor

    这是一个请求处理的调用链,这里写成了内部类,以后讲调用链的时候再讲

    XidRolloverException

    Xid回滚异常,一般用于Xid低32位满了的时候用(高32位是epoch号码)
    就是一个构造函数就没了

    public static class XidRolloverException extends Exception {//xid回滚异常
            public XidRolloverException(String message) {
                super(message);
            }
        }
    

    LearnerCnxAcceptor

    用于接收Learner的连接,启动LearnerHandler

        class LearnerCnxAcceptor extends ZooKeeperThread{
            private volatile boolean stop = false;
    
            public LearnerCnxAcceptor() {
                super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
            }
    
            @Override
            public void run() {
                try {
                    while (!stop) {
                        try{
                            Socket s = ss.accept();//接收socket链接
                            // start with the initLimit, once the ack is processed
                            // in LearnerHandler switch to the syncLimit
                            s.setSoTimeout(self.tickTime * self.initLimit);
                            s.setTcpNoDelay(nodelay);
                            LearnerHandler fh = new LearnerHandler(s, Leader.this);//构造LearnerHandler
                            fh.start();//启动LearnerHandler
                        } catch (SocketException e) {
                            if (stop) {
                                LOG.info("exception while shutting down acceptor: "
                                        + e);
    
                                // When Leader.shutdown() calls ss.close(),
                                // the call to accept throws an exception.
                                // We catch and set stop to true.
                                stop = true;
                            } else {
                                throw e;
                            }
                        }
                    }
                } catch (Exception e) {
                    LOG.warn("Exception while accepting follower", e);
                }
            }
            
            public void halt() {
                stop = true;
            }
        }
    

    属性

    集群消息类型相关

    各类型意义在源码分析38已经讲过,源码如下

        final static int DIFF = 13;
        final static int TRUNC = 14;
        final static int SNAP = 15;
        final static int OBSERVERINFO = 16;
        final static int NEWLEADER = 10;//leader发给learner的,当leader发送了同步数据的相关的packets之后发送
        final static int FOLLOWERINFO = 11;
        final static int UPTODATE = 12;
        public static final int LEADERINFO = 17;
        public static final int ACKEPOCH = 18;  
        final static int REQUEST = 1;
        public final static int PROPOSAL = 2;
        final static int ACK = 3;
        final static int COMMIT = 4;
        final static int PING = 5;
        final static int REVALIDATE = 6;
        final static int SYNC = 7;
        final static int INFORM = 8;//通知observer 有commit消息
    

    其他主要属性

        final LeaderZooKeeperServer zk;
    
        final QuorumPeer self;
    
        private boolean quorumFormed = false;//是否已有过半参与者确认当前leader并且完成同步
    
        LearnerCnxAcceptor cnxAcceptor;//读取Learner消息的线程
    
        private final HashSet<LearnerHandler> learners =
                new HashSet<LearnerHandler>();//learnerHandler集合
    
        private final HashSet<LearnerHandler> forwardingFollowers =
                new HashSet<LearnerHandler>();//参与者的LearnerHandler集合
    
        private final HashSet<LearnerHandler> observingLearners =
                new HashSet<LearnerHandler>();//观察者LearnerHandler集合
    
        private final HashMap<Long,List<LearnerSyncRequest>> pendingSyncs =
            new HashMap<Long,List<LearnerSyncRequest>>();//正在处理的同步(要等到过半ack才算完)
    
        final AtomicLong followerCounter = new AtomicLong(-1);//只不过是一个临时变量,并不是Follower count,看调用方
    
        ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();//已经提出,还没有处理完的提议的map,key是zxid
    
        ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();//即将生效的提议(已有过半确认)
    
        Proposal newLeaderProposal = new Proposal();//newLeader的提议
    
        StateSummary leaderStateSummary;//leader状态总结
    
        long epoch = -1;//新的acceptEpoch号码,各端的max(lastAcceptedEpoch) + 1, 先默认-1
        boolean waitingForNewEpoch = true;//是否在等待新的acceptEpoch号生成
        long lastCommitted = -1;//最近commit的zxid
        long lastProposed;//最近一次提议的zxid
        private HashSet<Long> connectingFollowers = new HashSet<Long>();//连接上leader的sid集合
        private HashSet<Long> electingFollowers = new HashSet<Long>();//针对LEADERINFO回复ACKEPOCH的集合
        private boolean electionFinished = false;//是否过半机器注册成功
    

    函数

    简单的一些getset之类的函数不讲

    启动期相关函数

    leader启动,会先和learner完成数据同步等,这部分函数如下

    lead

    选举完leader之后,leader的入口函数,源码如下

        void lead() throws IOException, InterruptedException {
            self.end_fle = System.currentTimeMillis();
            LOG.info("LEADING - LEADER ELECTION TOOK - " +
                  (self.end_fle - self.start_fle));
            self.start_fle = 0;//开始fast leader election时间
            self.end_fle = 0;//结束时间
    
            zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
    
            try {
                self.tick = 0;//初始tick为0,代表第几个tickTime
                zk.loadData();//先loadData加载数据
                
                leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());//根据epoch和zxid记录状态
    
                // Start thread that waits for connection requests from 
                // new followers.
                cnxAcceptor = new LearnerCnxAcceptor();//等待learner的连接
                cnxAcceptor.start();
                
                readyToStart = true;
                long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
                
                zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
                
                synchronized(this){
                    lastProposed = zk.getZxid();//获取zxid
                }
                
                newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                        null, null);//生成NEWLEADER包
    
    
                if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                    LOG.info("NEWLEADER proposal has Zxid of "
                            + Long.toHexString(newLeaderProposal.packet.getZxid()));
                }
                
                waitForEpochAck(self.getId(), leaderStateSummary);
                self.setCurrentEpoch(epoch);
    
                // We have to get at least a majority of servers in sync with
                // us. We do this by waiting for the NEWLEADER packet to get
                // acknowledged
                try {
                    waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);//等待一段时间,收到过半的参与者返回的ACK
                } catch (InterruptedException e) {
                    shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                            + getSidSetString(newLeaderProposal.ackSet) + " ]");
                    HashSet<Long> followerSet = new HashSet<Long>();
                    for (LearnerHandler f : learners)
                        followerSet.add(f.getSid());
                        
                    if (self.getQuorumVerifier().containsQuorum(followerSet)) {
                        LOG.warn("Enough followers present. "
                                + "Perhaps the initTicks need to be increased.");
                    }
    
                    Thread.sleep(self.tickTime);
                    self.tick++;
                    return;
                }
                
                startZkServer();//启动zk
                
                /**
                 * WARNING: do not use this for anything other than QA testing
                 * on a real cluster. Specifically to enable verification that quorum
                 * can handle the lower 32bit roll-over issue identified in
                 * ZOOKEEPER-1277. Without this option it would take a very long
                 * time (on order of a month say) to see the 4 billion writes
                 * necessary to cause the roll-over to occur.
                 * 
                 * This field allows you to override the zxid of the server. Typically
                 * you'll want to set it to something like 0xfffffff0 and then
                 * start the quorum, run some operations and see the re-election.
                 */
                String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
                if (initialZxid != null) {
                    long zxid = Long.parseLong(initialZxid);
                    zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
                }
                
                if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                    self.cnxnFactory.setZooKeeperServer(zk);
                }
                // Everything is a go, simply start counting the ticks
                // WARNING: I couldn't find any wait statement on a synchronized
                // block that would be notified by this notifyAll() call, so
                // I commented it out
                //synchronized (this) {
                //    notifyAll();
                //}
                // We ping twice a tick, so we only update the tick every other
                // iteration
                boolean tickSkip = true;//一个tick ping两次,
        
                while (true) {
                    Thread.sleep(self.tickTime / 2);//每tickTime周期的一半
                    if (!tickSkip) {
                        self.tick++;//每两次代表一个tick
                    }
                    HashSet<Long> syncedSet = new HashSet<Long>();
    
                    // lock on the followers when we use it.
                    syncedSet.add(self.getId());
    
                    for (LearnerHandler f : getLearners()) {
                        // Synced set is used to check we have a supporting quorum, so only
                        // PARTICIPANT, not OBSERVER, learners should be used
                        if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {//如果是同步的,且是参与者
                            syncedSet.add(f.getSid());
                        }
                        f.ping();//验证有没有LearnerHandler的proposal超时了没有处理
                    }
    
                    // check leader running status
                    if (!this.isRunning()) {
                        shutdown("Unexpected internal error");
                        return;
                    }
    
                  if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {//需要验证时,集群验证过半验证失败
                    //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                        // Lost quorum, shutdown
                        shutdown("Not sufficient followers synced, only synced with sids: [ "
                                + getSidSetString(syncedSet) + " ]");
                        // make sure the order is the same!
                        // the leader goes to looking
                        return;
                  } 
                  tickSkip = !tickSkip;//翻转,进入下半个tickTime
                }
            } finally {
                zk.unregisterJMX(this);
            }
        }
    

    这里不展开讲LearnerCnxAcceptor启动LearnerHandler与Learner完成交互的细节
    主要完成的事情,就是

    启动LearnerCnxAcceptor,利用LearnerHandler与各Learner进行IO
    Leader(以及LearnerHandler)调用getEpochToPropose,接收Leader(以及Learner向Leader注册时发送的LearnerInfo),更新epoch号,作为新的AcceptedEpoch和CurrentEpoch
    LearnerHandler把发送LEADERINFO包,把上面定的epoch也发送出去
    调用waitForEpochAck,等待过半learner(以及leader),针对LEADERINFO返回ACKEPOCH包
    (省略:LearnHandler中间不断给各Learner同步数据)
    LearnerHandler发出NEWLEADER,Learner接收到,返回ACK
    LearnerHandler以及Leader调用waitForNewLeaderAck等待过半PARTICIPANT回复ACK
    启动zkServer,不断完成ping,保持过半参与者同步
    

    里面涉及的函数,按照调用顺序来排,有
    getEpochToPropose,waitForEpochAck,waitForNewLeaderAck,startZkServer,讲解如下

    getEpochToPropose

    被Leader和LearnerHandler调用,即获取集群最大的lastAcceptedEpoch,+1用于设置新的acceptedEpoch

        public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {//被Leader和LearnerHandler调用,即获取集群最大的lastAcceptedEpoch,+1用于设置新的acceptedEpoch
            synchronized(connectingFollowers) {
                if (!waitingForNewEpoch) {//如果还在等待new epoch
                    return epoch;
                }
                if (lastAcceptedEpoch >= epoch) {//选出leader以及learner中最大一个lastAcceptedEpoch
                    epoch = lastAcceptedEpoch+1;//更新自己的epoch为最大的+1,
                }
                connectingFollowers.add(sid);//连接的Learner添加记录
                QuorumVerifier verifier = self.getQuorumVerifier();
                if (connectingFollowers.contains(self.getId()) && 
                                                verifier.containsQuorum(connectingFollowers)) {//如果自己也连接上,并且已经有过半机器连接上
                    waitingForNewEpoch = false;//不用再等了
                    self.setAcceptedEpoch(epoch);//设置acceptedEpoch
                    connectingFollowers.notifyAll();
                } else {
                    long start = System.currentTimeMillis();
                    long cur = start;
                    long end = start + self.getInitLimit()*self.getTickTime();
                    while(waitingForNewEpoch && cur < end) {//如果已经有机器连接上leader了,那么最多等待一段时间直到其他机器通过 过半验证
                        connectingFollowers.wait(end - cur);
                        cur = System.currentTimeMillis();
                    }
                    if (waitingForNewEpoch) {
                        throw new InterruptedException("Timeout while waiting for epoch from quorum");        
                    }
                }
                return epoch;
            }
        }
    

    waitForEpochAck

    验证leader的StateSummary是最新的
    等待过半机器(Learner和leader)针对Leader发出的LEADERINFO回复ACKEPOCH

        public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
            synchronized(electingFollowers) {
                if (electionFinished) {//如果已经过半返回了
                    return;
                }
                if (ss.getCurrentEpoch() != -1) {
                    if (ss.isMoreRecentThan(leaderStateSummary)) {//如果有currentEpoch,zxid比Leader自己的还要新
                        throw new IOException("Follower is ahead of the leader, leader summary: " 
                                                        + leaderStateSummary.getCurrentEpoch()
                                                        + " (current epoch), "
                                                        + leaderStateSummary.getLastZxid()
                                                        + " (last zxid)");
                    }
                    electingFollowers.add(id);
                }
                QuorumVerifier verifier = self.getQuorumVerifier();
                if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {//过半验证通过
                    electionFinished = true;
                    electingFollowers.notifyAll();
                } else {                
                    long start = System.currentTimeMillis();
                    long cur = start;
                    long end = start + self.getInitLimit()*self.getTickTime();
                    while(!electionFinished && cur < end) {//等待一段时间
                        electingFollowers.wait(end - cur);
                        cur = System.currentTimeMillis();
                    }
                    if (!electionFinished) {//等待结束了还没有过半验证通过
                        throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
                    }
                }
            }
        }
    

    waitForNewLeaderAck

    //等到有过半的参与者针对Leader发出的NEWLEADER返回ACK

        public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType)
                throws InterruptedException {
    
            synchronized (newLeaderProposal.ackSet) {
    
                if (quorumFormed) {//如果已经过半同步了
                    return;
                }
    
                long currentZxid = newLeaderProposal.packet.getZxid();
                if (zxid != currentZxid) {
                    LOG.error("NEWLEADER ACK from sid: " + sid
                            + " is from a different epoch - current 0x"
                            + Long.toHexString(currentZxid) + " receieved 0x"
                            + Long.toHexString(zxid));
                    return;
                }
    
                if (learnerType == LearnerType.PARTICIPANT) {
                    newLeaderProposal.ackSet.add(sid);//参与者返回的ACK才算数
                }
    
                if (self.getQuorumVerifier().containsQuorum(
                        newLeaderProposal.ackSet)) {//过半同步
                    quorumFormed = true;//标记已经过半同步了
                    newLeaderProposal.ackSet.notifyAll();
                } else {
                    long start = System.currentTimeMillis();
                    long cur = start;
                    long end = start + self.getInitLimit() * self.getTickTime();
                    while (!quorumFormed && cur < end) {//先同步的,等待一段时间,等过半的机器都同步
                        newLeaderProposal.ackSet.wait(end - cur);
                        cur = System.currentTimeMillis();
                    }
                    if (!quorumFormed) {//等了一段时间还没有过半同步,就报错
                        throw new InterruptedException(
                                "Timeout while waiting for NEWLEADER to be acked by quorum");
                    }
                }
            }
        }
    

    startZkServer

    启动zk

        private synchronized void startZkServer() {//启动zk
            // Update lastCommitted and Db's zxid to a value representing the new epoch
            lastCommitted = zk.getZxid();
            LOG.info("Have quorum of supporters, sids: [ "
                    + getSidSetString(newLeaderProposal.ackSet)
                    + " ]; starting up and setting last processed zxid: 0x{}",
                    Long.toHexString(zk.getZxid()));
            zk.startup();
            /*
             * Update the election vote here to ensure that all members of the
             * ensemble report the same vote to new servers that start up and
             * send leader election notifications to the ensemble.
             * 
             * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
             */
            self.updateElectionVote(getEpoch());
    
            zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
        }
    

    运行期处理相关

    processAck

    针对提议回复ACK的处理逻辑,如果过半验证了就通知所有Learner

    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
            if (LOG.isTraceEnabled()) {//log相关
                LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
                for (Proposal p : outstandingProposals.values()) {
                    long packetZxid = p.packet.getZxid();
                    LOG.trace("outstanding proposal: 0x{}",
                            Long.toHexString(packetZxid));
                }
                LOG.trace("outstanding proposals all");
            }
    
            if ((zxid & 0xffffffffL) == 0) {
                /*
                 * We no longer process NEWLEADER ack by this method. However,
                 * the learner sends ack back to the leader after it gets UPTODATE
                 * so we just ignore the message.
                 */
                return;
            }
        
            if (outstandingProposals.size() == 0) {//没有处理当中的提议
                if (LOG.isDebugEnabled()) {
                    LOG.debug("outstanding is 0");
                }
                return;
            }
            if (lastCommitted >= zxid) {//提议已经处理过了
                if (LOG.isDebugEnabled()) {
                    LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                            Long.toHexString(lastCommitted), Long.toHexString(zxid));
                }
                // The proposal has already been committed
                return;
            }
            Proposal p = outstandingProposals.get(zxid);//获取zxid对应的提议
            if (p == null) {
                LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
                        Long.toHexString(zxid), followerAddr);
                return;
            }
            
            p.ackSet.add(sid);//对应提议的ack集合添加sid记录
            if (LOG.isDebugEnabled()) {
                LOG.debug("Count for zxid: 0x{} is {}",
                        Long.toHexString(zxid), p.ackSet.size());
            }
            if (self.getQuorumVerifier().containsQuorum(p.ackSet)){//过半回复ack
                if (zxid != lastCommitted+1) {
                    LOG.warn("Commiting zxid 0x{} from {} not first!",
                            Long.toHexString(zxid), followerAddr);
                    LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
                }
                outstandingProposals.remove(zxid);//该proposal已经处理完了
                if (p.request != null) {
                    toBeApplied.add(p);//即将应用的队列 添加该proposal
                }
    
                if (p.request == null) {
                    LOG.warn("Going to commmit null request for proposal: {}", p);
                }
                commit(zxid);//提交,发给所有参与者
                inform(p);//告诉所有观察者
                zk.commitProcessor.commit(p.request);//leader自己也提交
                if(pendingSyncs.containsKey(zxid)){
                    for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {//
                        sendSync(r);//发送同步请求给LearnerSyncRequest记录的server
                    }
                }
            }
        }
    

    里面调用了commit,inform函数,如下

    commit

    //更新lastCommitted,生成commit包,发给所有参与者

        public void commit(long zxid) {
            synchronized(this){
                lastCommitted = zxid;//更新最近commit的zxid
            }
            QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);//生成commit消息
            sendPacket(qp);//发给所有参与者
        }
    

    inform

    //生成inform包,告诉所有observer

        public void inform(Proposal proposal) {
            QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, 
                                                proposal.packet.getData(), null);
            sendObserverPacket(qp);//告诉所有observer
        }
    

    又分别调用了sendPacket和sendObserverPacket函数,如下

    sendPacket&sendObserverPacket

    即把QuorumPacket 加入到对应的LearnerHandler的列表里面去

        void sendPacket(QuorumPacket qp) {//QuorumPacket发给所有 参与者
            synchronized (forwardingFollowers) {
                for (LearnerHandler f : forwardingFollowers) {                
                    f.queuePacket(qp);
                }
            }
        }
    
        void sendObserverPacket(QuorumPacket qp) {//QuorumPacket发给所有 观察者
            for (LearnerHandler f : getObservingLearners()) {
                f.queuePacket(qp);
            }
        }
    

    propose

    根据Request产生提议,发给所有参与者

        public Proposal propose(Request request) throws XidRolloverException {//根据Request产生提议,发给所有参与者
            /**
             * Address the rollover issue. All lower 32bits set indicate a new leader
             * election. Force a re-election instead. See ZOOKEEPER-1277
             */
            if ((request.zxid & 0xffffffffL) == 0xffffffffL) {//如果zxid低的32位已经满了,为了不溢出,就关闭掉,重新选举
                String msg =
                        "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
                shutdown(msg);
                throw new XidRolloverException(msg);//xid回滚异常
            }
    
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
            try {
                request.hdr.serialize(boa, "hdr");
                if (request.txn != null) {
                    request.txn.serialize(boa, "txn");
                }
                baos.close();
            } catch (IOException e) {
                LOG.warn("This really should be impossible", e);
            }
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, 
                    baos.toByteArray(), null);//产生提议
            
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
            synchronized (this) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Proposing:: " + request);
                }
    
                lastProposed = p.packet.getZxid();//更新最近一次提议的zxid
                outstandingProposals.put(lastProposed, p);//出现了一个尚未处理的提议
                sendPacket(pp);//提议发给所有参与者
            }
            return p;
        }
    

    processSync

    处理同步请求,这里不是很懂

        synchronized public void processSync(LearnerSyncRequest r){//处理同步请求,注意上锁了
            if(outstandingProposals.isEmpty()){//没有正在处理的提议
                sendSync(r);//发送同步请求给LearnerSyncRequest记录的server
            } else {
                List<LearnerSyncRequest> l = pendingSyncs.get(lastProposed);//把当时最新的lastProposed记录下来
                if (l == null) {
                    l = new ArrayList<LearnerSyncRequest>();
                }
                l.add(r);//lastProposed对应的记录添加进当前的request,这个列表的同步都是到lastProposed这个位置
                pendingSyncs.put(lastProposed, l);//放入map,等到这个提议通过
            }
        }
    

    调用了sendSync如下

    sendSync

    //发送Sync请求给合适的server(LearnerSyncRequest记录的)

        public void sendSync(LearnerSyncRequest r){
            QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
            r.fh.queuePacket(qp);
        }
    

    LearnerHandler相关

    startForwarding

    就是learner同步的时候,同步方式是和leader的commitLog相关的,在此期间,记录在内存中的提议,以及即将生效的提议也要告诉给learner

        synchronized public long startForwarding(LearnerHandler handler,
                long lastSeenZxid) {//让leader知道Follower在进行同步,另外看zk当前有没有新的提议,或者同步的信息发送过去的
            // Queue up any outstanding requests enabling the receipt of
            // new requests
            if (lastProposed > lastSeenZxid) {//如果自己的zxid比发给learner的zxid大
                for (Proposal p : toBeApplied) {
                    if (p.packet.getZxid() <= lastSeenZxid) {
                        continue;//即将生效的zxid,对应learner已经有记录了
                    }
                    handler.queuePacket(p.packet);
                    // Since the proposal has been committed we need to send the
                    // commit message also
                    QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet//添加commit
                            .getZxid(), null, null);
                    handler.queuePacket(qp);//加入handler的发送队列
                }
                // Only participant need to get outstanding proposals
                if (handler.getLearnerType() == LearnerType.PARTICIPANT) {//如果是参与者,顺便把提议也发送过去
                    List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet());
                    Collections.sort(zxids);
                    for (Long zxid: zxids) {
                        if (zxid <= lastSeenZxid) {
                            continue;
                        }
                        handler.queuePacket(outstandingProposals.get(zxid).packet);//添加packet
                    }
                }
            }
            if (handler.getLearnerType() == LearnerType.PARTICIPANT) {//如果是参与者,就加到 参与者集合中
                addForwardingFollower(handler);//加入 参与者 集合中
            } else {
                addObserverLearnerHandler(handler);//否则加入到观察者 集合中
            }
                    
            return lastProposed;
        }
    

    思考

    CurrentEpoch和AcceptedEpoch

    在源码31节思考中已经讲过,
    CurrentEpoch是当前周期,AcceptedEpoch是即将成为的周期,
    前者相当于最终决定,后者相当于决定前的提议(个人理解)

    getEpochToPropose函数的调用

    这里是Leader和LearnerHandler都会调用,因此lastAcceptedEpoch是所有机器(包含leader)的最大lastAcceptedEpoch+1
    (之前以为Leader先初始化好epoch值,然后各LearnerHandler再调用,还在想怎么会出现Learner的LastAcceptedEpoch比Leader的高)

    Leader和Learner的交互,什么时候各自可以启动zkServer

    Leader:自己发送NEWLEADER之后,过半参与者返回ACK,即可启动
    Learner:对应的LearnerHandler知道了Leader满足了上述条件,给Learner发送UPTODATE消息,Learner知道过半机器都认可了Leader的数据,即可启动

    getEpochToPropose,waitForEpochAck,waitForNewLeaderAck的共性

    三个函数都是等待一段时间,如果指定时间内没有完成过半验证,就认为超时,抛出异常

    问题

    getEpochToPropose函数的过半返回机制以及lastAcceptedEpoch的矛盾

    这里意思就是说,如果过半机器(包含leader和Learner)都调用了这个函数,那么就不用管后面的lastAcceptedEpoch比现在决定的AcceptedEpoch大了。
    这样会出现一个矛盾,假如不考虑网络延迟,Learner过半的机器都调用了这个函数但是Leader还没调用,那么Leader自己本身的epoch会比大家决定的还要大,这样很不合理

    processSync函数

    现在还不是很理解这个函数,调用方是ProposalRequestProcessor
    针对的是LearnerSyncRequest,对应的是OpCode.sync
    也就是ZooKeeper#sync函数被调用才会触发
    也就是zookeeper client api调用这个sync函数时会触发,
    目前不太理解这个函数有什么用,其他什么getData之类的都好理解
    看注释好像是client请求同步到最新?

    吐槽

    变量名,注释

    followerCounter变量和命名都多大关系
    

    相关文章

      网友评论

          本文标题:zk源码阅读42:Leader源码解析

          本文链接:https://www.haomeiwen.com/subject/fcdudxtx.html