【Zookeeper】 Server的启动流程

作者: 桥头桥尾 | 来源:发表于2017-07-20 16:13 被阅读0次

    一:前言

    当服务通过选举算法进行选举完之后,各个服务器就需要设置自己的角色,并启动相对应的服务(也就是服务的初始化),之后就等待客户端的请求,处理响应的请求。

    二:流程分析

    2.1、 LEADER

    功能:接收客户端的请求, 事务请求的提议者。

    首先我们查看启动代码:

        // makeLeader(logFactory) 新建一个Leader实体,打开Leader服务器的交换信息的接口,等待与Learner通信
        setLeader(makeLeader(logFactory));
        // Leader服务启动的主方法
        leader.lead();
    

    Leader服务启动的主方法leader.lead();流程分析:

      void lead() throws IOException, InterruptedException {
           ...
                //1> 加载FileTxnSnapLog中的数据, 并把每一个事务数据封装成一个Proposal,放入committedLog
                //   中,并计算minCommittedLog, maxCommittedLog, 数据放入ZKDatabase
                //2> 处理事务数据时,若事务Type为Session的数据,响应的增删到sessionsWithTimeouts中
                //3> 设置Leader的highestZxid
                //4> 通过ZKDatabase中的sessions与sessionsWithTimeouts进行比较,Kill失效的session, 并将
                //     失效相关联的临时节点进行删除
                zk.loadData();</br>
    
                // 开启接收Leaner的连接线程, 并把每一个Leaner的连接封装成一个LearnerHandler实体, 添加到
                // Leader的learners列表中,每一个LearnerHandler开启一个线程处理响应的Leaner的信息
                cnxAcceptor = new LearnerCnxAcceptor();
                cnxAcceptor.start();</br>
                
                readyToStart = true;
                // 获取当前选举的轮次, 同步等待法定人数的Leaner注册身份(FOLLOWERINFO/OBSERVERINFO)
                // 到Leader 超时时间为  initLimit \* tickTime
                long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
                // 根据当前的轮次,初始化zxid
                zk.setZxid(ZxidUtils.makeZxid(epoch, 0));</br>
                ...</br>
                
                newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                        null, null);</br>
                ...</br>
                //Leaner向Leaner发送LEADERINFO消息,并等待法定人数的Leaner的响应epoch的消息
                waitForEpochAck(self.getId(), leaderStateSummary);
                self.setCurrentEpoch(epoch);
                try {
                    //等待法定人数的Leaner初始化同步数据响应的消息,超时时间为 initLimit \* tickTime
                    waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
                } catch (InterruptedException e) {
                    ...
                }
                
                //开启对客户端的服务的主方法
                startZkServer();
                
                //是否有zxid的初始化设置
                String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
                if (initialZxid != null) {
                    long zxid = Long.parseLong(initialZxid);
                    zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
                }
                
                if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                    self.cnxnFactory.setZooKeeperServer(zk);
                }
               
               //对每一个Leaner发起ping检测消息, 检测的时间间隔为tickTime / 2
               // 检测learnerType = LearnerType.PARTICIPANT的人数是否少于法定的人数,
               // 如果少于,则   shutDown服务,进行新一轮的选举
                boolean tickSkip = true;
                while (true) {
                    Thread.sleep(self.tickTime / 2);
                    if (!tickSkip) {
                        self.tick++;
                    }
                    HashSet<Long> syncedSet = new HashSet<Long>();
                    syncedSet.add(self.getId());
    
                    for (LearnerHandler f : getLearners()) {
                        if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                            syncedSet.add(f.getSid());
                        }
                        f.ping();
                    }
    
                  if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                        shutdown("Not sufficient followers synced, only synced with sids: [ "
                                + getSidSetString(syncedSet) + " ]");
                        return;
                  } 
                  tickSkip = !tickSkip;
                }
               //...
        }
    

    下面我们队上面代码的两个流程进行流程分析①: Leaner向Leader的注册同步流程;②:Leader与客户端服务的流程。

    2.1.1、 Leaner向Leader的注册同步流程

    • 1.、接收到Leaner的信息注册消息 type = Leader.FOLLOWERINFO/OBSERVERINFO + acceptedEpoch + myid, 根据消息设置LeanerHandler的字段值, this.sid = li.getServerid();this.version = li.getProtocolVersion();, 同步等待法定人数Leander的注册消息得到newEpoch(如果每次接收到的lastAcceptedEpoch >= epoch, 则设置epoch = lastAcceptedEpoch+1;), 发送Leader信息(type:Leader.LEADERINFO, zxid: newEpoch + 0)给Leander
    • 2、同步等待Leaner接收到Leander的信息包之后返回的ACKEPOCH消息, 从ACKEPOCH消息获取到Leaner的lastLoggerZxid(为了防止与Leader的lastLoggerZxid冲突, 用peerLastZxid代替)。处理同步数据逻辑:
    • i: 如果peerLastZxid == LastZxid,则packetToSend置为DIFF, zxidToSend置为peerLastZxid
    • ii: 如果proposals.size() != 0 (proposals为committedLog的列表)
      • a. 子条件maxCommittedLog >= peerLastZxid && minCommittedLog <= peerLastZxid, 比较proposals中最接近且小于等于peerLastZxid的zxid, 如果小于, packetToSend置为TRUNC, zxidToSend置为此值; 否则packetToSend置为DIFF, zxidToSend置为maxCommittedLog, 然后把所有大于peerLastZxid 的propose 封装成QuorumPacket (type: commit, zxid: propose.packet.getZxid())放入queuedPackets队列中,
      • b.子条件peerLastZxid > maxCommittedLog, 则packetToSend置为TRUNC, zxidToSend置为maxCommittedLog
    • iii: peerLastZxid > maxCommittedLog, packetToSend置为TRUNC, zxidToSend置为 maxCommittedLog
    • iv: 否则packetToSend置为SNAP
    • 3、将toBeApplied(ToBeAppliedRequestProcessor未完成的请求)列表中的数据(type: Leader.COMMIT)添加到queuedPackets队列中, 如果handler.LearnerType() == LearnerType.PARTICIPANT(即Leander的角色为Follower), 则将outstandingProposals(ProposalRequestProcessor未完成的请求) 中的提议数据(type: Leader.PROPOSAL)添加到queuedPackets队列中, 将Leaner 根据leanerType分别放入forwardingFollowers 和 observingLearners集合中
    • 4、将新一轮的Leader的信息(type: NEWLEADER, zxid: newEpoch + 0)放入queuedPackets队列中
    • 5、发送同步信息(type: packetToSend, zxidToSend: zxidToSend), 如果为SNAP消息, 则zxidToSend为Leader的lastLoggerZxid, 且将ZKDatabase序列化为数据流发送给Leaner
    • 6、开启线程, 发送queuedPackets 队列中包给Leander
    • 7、接收来自Leander同步的Ack消息, 同步等待法定人数Leaner的人数的回应
    • 8、发送Leader.UPTODATE消息给客户端, 表示可以使用数据了
      <b>注:</b>可以看到queuedPackets队列中的数据的顺序为:Leader.COMMIT —> Leader.PROPOSAL——> Leader.NEWLEADER —> Leader.UPTODATE

    Leader异步等待法定的客户端注册同步:Leader的Lead()方法中:

    // 异步等待Leaner信息的注册
    long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
    // 异步等待Leaner接收到Leader的信息的EpochAck
    waitForEpochAck(self.getId(), leaderStateSummary);
    // 异步等待Leander对Leader的NEWLEADER消息的Ack
    waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
    

    2.1.2、 Leader与客户端服务的流程

    zk处理客户端的请求都是通过Proccessor进行链路处理。对于LeaderZooKeeperServer对应的Proccessor的处理关系为: PrepRequestProcessor ——> ProposalRequestProcessor ——> CommitProcessor ——> Leader.ToBeAppliedRequestProcessor ——>FinalRequestProcessor, 同时ProposalRequestProcessor 将事务请求(request.hdr != null)交给同步刷盘处理器处理 ProposalRequestProcessor ——> SyncRequestProcessor ——> AckRequestProcessor。AckRequestProcessor处理器将刷盘成功的请求交给Leader作为一个提议,作为Leader判断提议成功的法定人数, 成立交给CommitProcessor

    图片.png

    2.2、 FOLLOWER

    功能:接收客户端的请求, 事务请求提议的参与者,将事务请求转发给Leader。

    首先查看启动代码

         // makeFollower(logFactory)新建一个Follwer实体,建立与Leader通信
        setFollower(makeFollower(logFactory));
        // follower启动的主方法
        follower.followLeader();
    

    Follower服务启动的主方法follower.followLeader();流程分析:

    void followLeader() throws InterruptedException {
           ...
            try {
                InetSocketAddress addr = findLeader();            
                try {
                    //与Leader建立通信
                    connectToLeader(addr);
                    //将自己的信息注册到Leader
                    long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);</br>
    
                    //初始化同步Leader的数据
                    long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                    if (newEpoch < self.getAcceptedEpoch()) {
                        LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                                + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                        throw new IOException("Error: Epoch of leader is lower");
                    }
                    syncWithLeader(newEpochZxid);</br>
    
                    //接收Leader通信数据,并做响应的业务处理
                    QuorumPacket qp = new QuorumPacket();
                    while (self.isRunning()) {
                        readPacket(qp);
                        processPacket(qp);
                    }
                } catch (Exception e) {
                   ...
                }
            } finally {
                zk.unregisterJMX((Learner)this);
            }
        }
    

    2.2.1 Follwer注册与同步流程分析

    • 1:发送自身信息包(type: Leader.FOLLOWERINFO, zxid: acceptedEpoch + 1)给Leader
    • 2: 接收到Leader的信息包(type: Leader.LEADERINFO, zxid: newEpoch + 1), 判断newEpoch 与 自身acceptedEpoch 的大小: 大于, 则将 自身acceptedEpoch设置为 newEpoch, 发送的epoch设置为旧的acceptedEpoch; 相等, 发送epoch设置为-1; 否则异常。 包(type: Leader.ACKEPOCH, zxid: lastLoggedZxid)
    • 3: 接收来自Leader根据lastLoggedZxid发送过来的初始化同步信息: Leader.DIFF, 不做任何操作; Leader.SNAP, 则将包中的数据序列化作为自己的ZKDatabase; Leader.TRUNC, 则根据接收到的zxid来TRUNC当前ZKDatabase的数据, 设置zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    • 4: 接收来自Leader的同步数据: Leader.COMMIT——> Leader.PROPOSAL——> Leader.NEWLEADER ——> Leader.UPTODATE
      Leader.PROPOSAL: 提议的数据, 添加到packetsNotCommitted 队列中
      Leader.COMMIT: 需要提交的数据,添加到packetsCommitted队列
      Leader.NEWLEADER: 接收新的Leader领导的事务,保存快照数据, 设置self.setCurrentEpoch(newEpoch);, 回复AcK消息给Leader
      Leader.UPTODATE, 同步完成,设置FollowerZooKeeperServer(self.cnxnFactory.setZooKeeperServer(zk)), 跳出同步流程
    • 5: 对packetsNotCommitted 中的数据调用fzk.logRequest(p.hdr, p.rec)(刷盘, 返回提议AcK),对packetsCommitted中的数据调用fzk.commit(zxid);(提交操作)

    2.2.1 Follwer与客户端的服务流程

    对于角色Follwer的处理客户端的请求是通过下面的RequestProcessor进行处理: FollowerRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor; 同时将其转发给Leader是并把请求,Leader会进行提议请求,将接收到的提议请求交给SyncRequestProcessor ——> SendAckRequestProcessor, SendAckRequestProcessor将ack消息发送给Leader,作为Leader判断提议成功的法定人数,成立交给CommitProcessor 。

    图片.png

    2.3、 OBSERVER

    功能:接收客户端的请求, 将事务请求转发给Leader。

    首先查看启动代码

    // makeObserver(logFactory)新建一个Observer实体,建立与Leader通信
    setObserver(makeObserver(logFactory));
    // observer启动的主方法
    observer.observeLeader();
    

    Observer服务启动的主方法observer.observeLeader();流程分析:

    void observeLeader() throws InterruptedException {
         ...
                try {
                    //建立与Leader的连接
                    connectToLeader(addr);
                    //将自身的信息注册到Leader中, 返回Leader的lastLoggerZxid
                    long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                    //同步Leader的数据信息
                    syncWithLeader(newLeaderZxid);
                     //接收Leader通信数据,并做响应的业务处理
                    QuorumPacket qp = new QuorumPacket();
                    while (self.isRunning()) {
                        readPacket(qp);
                        processPacket(qp);                   
                    }
                } catch (Exception e) {
                   ...
                }
           ...
        }
    

    3.3.1 Observer注册与同步流程分析

    • 1:发送自身信息包(type: Leader.OBSERVERINFO, zxid: acceptedEpoch + 1)给Leader
    • 2: 接收到Leader的信息包(type: Leader.LEADERINFO, zxid: newEpoch + 1), 判断newEpoch 与 自身acceptedEpoch 的大小: 大于, 则将 自身acceptedEpoch设置为 newEpoch, 发送的epoch设置为旧的acceptedEpoch; 相等, 发送epoch设置为-1; 否则异常。 包(type: Leader.ACKEPOCH, zxid: lastLoggedZxid)
    • 3: 接收来自Leader根据lastLoggedZxid发送过来的初始化同步信息: Leader.DIFF, 不做任何操作; Leader.SNAP, 则将包中的数据序列化作为自己的ZKDatabase; Leader.TRUNC, 则根据接收到的zxid来TRUNC当前ZKDatabase的数据, 设置zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    • 4: 接收来自Leader的同步数据: Leader.COMMIT——> Leader.NEWLEADER ——> Leader.UPTODATE
      Leader.COMMIT: 需要提交的数据,添加到packetsCommitted队列
      Leader.NEWLEADER: 接收新的Leader领导的事务,保存快照数据, 设置self.setCurrentEpoch(newEpoch);, 回复AcK消息给Leader
      Leader.UPTODATE, 同步完成,设置FollowerZooKeeperServer(self.cnxnFactory.setZooKeeperServer(zk)), 跳出同步流程

    3.3.2 Observer与客户端的服务流程

    对于Observer的处理客户端的请求是通过下面的RequestProcessor进行处理:ObserverRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor ; 同时对于Leader的Leader.INFORM消息会同时交给SyncRequestProcessor(刷盘操作)跟CommitProcessor。

    图片.png

    相关文章

      网友评论

        本文标题:【Zookeeper】 Server的启动流程

        本文链接:https://www.haomeiwen.com/subject/uknfkxtx.html