美文网首页程序员
ZooKeeper服务端启动源码 集群

ZooKeeper服务端启动源码 集群

作者: Kohler | 来源:发表于2019-01-29 11:50 被阅读3次

    集群和单机版启动类都是QuorumPeerMain,进入initializeAndRun方法

    启动

    1. 解析配置文件zoo.cfg
    2. 创建并启动历史文件清理器DatadirCleanupManager
    3. 根据集群模式还是单机模式的启动
    if (args.length == 1 && config.servers.size() > 0) {
      // 集群
      runFromConfig(config);
    } else {
      ZooKeeperServerMain.main(args);
    }
    

    集群模式会进入if块

    初始化

    运行runFromConfig方法,在runFromConfig方法内部可以看到,其核心实例是QuorumPeer,而不再是单机模式的ZooKeeperServer实例,QuorumPeer实例可以看作是集群的一个节点,集群中的所有的QuorumPeer实例协作完成集群的选举、投票。

    1. 创建并配置ServerCnxnFactory,和单机版一致。

      ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
      cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
      

      cnxnFactory会赋值给quorumPeerquorumPeer.setCnxnFactory(cnxnFactory);

    2. 实例化quorumPeer并设值

      quorumPeer = getQuorumPeer();
      // 设置集群所有的peer,集群机器之间互相通信
      quorumPeer.setQuorumPeers(config.getServers());
      ...
      

      这个就是根据配置中server.id解析出来的,如

      server.1=localhost:2888:3888
      server.2=localhost:2887:3887
      server.3=localhost:2886:3886
      
    3. 创建持久化文件管理器FileTxnSnapLog,并给quorumPeer赋值

      quorumPeer.setTxnFactory(new FileTxnSnapLog(
              new File(config.getDataLogDir()),
              new File(config.getDataDir())));
      
    4. 创建内存数据库,并赋值给quorumPeer

      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
      
    5. 初始化并启动quorumPeer

      quorumPeer.initialize();
      quorumPeer.start();
      quorumPeer.join();
      

      QuorumPeer#start方法

      //QuorumPeer#start
      public synchronized void start() {
        loadDataBase();
        cnxnFactory.start();        
        startLeaderElection();
        super.start();
      }
      

      启动quorumPeer步骤有

      • 加载内存数据库
      • 启动cnxnFactory,客户端连接的IO线程
      • 集群选举
      • 选举线程启动
      1. 集群版加载内存数据库会去分析当前的Epoch
      private long acceptedEpoch = -1;
      private long currentEpoch = -1;
      
      1. 启动cnxnFactory后,这时候客户端IO线程是没法工作的,因为在创建客户端连接的时候需要zkServer变量,处理调用链

        protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) {
          return new NIOServerCnxn(zkServer, sock, sk, this);
        }
        

        需要等集群选举完成、数据同步完成后,为其赋值,才能开启工作

      所以先主要分析集群选举和选举线程启动

    集群选举

    集群选举需要当前peer与其他机器在选举端口上建立连接,然后发送投票进行选举,选举端口在配置文件中配置

    server.id - This is the host:port[:port] that the server with the given id will use for the quorum protocol.

    其中,第一个端口用于指定Follower服务器与Leader进行运行时通信和数据同步时所使用的端口,第二个端口则专门用于进行Leader选举过程中的投票通信,在初始化时``quorumPeer`为其赋值。

    1. 初始化投票
      QuorumPeer#startLeaderElection方法初始化投票

      • 创建当前投票,优先给自己投票
        currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());

      • 创建选举算法,默认electionType=3,也就是FastLeaderElection

        // QuorumPeer#createElectionAlgorithm
        case 3:
          qcm = createCnxnManager();
           // 监听连接
          QuorumCnxManager.Listener listener = qcm.listener;
          if(listener != null) {
            listener.start();
            le = new FastLeaderElection(this, qcm);
          }
        

        创建Leader选举所需的网络IO层QuorumCnxManager,同时启动对Leader选举端口的监听,等待集群中其他服务器创建连接。

    调用start方法启动线程,进入run方法

    1. 注册JMX服务

      jmxQuorumBean = new QuorumBean(this);
      MBeanRegistry.getInstance().register(jmxQuorumBean, null);
      ...
      
    2. 检测当前服务器状态,并根据当前状态做处理

      switch (getPeerState()) {
        case LOOKING:
          ...
        case OBSERVING:
          ...
        case FOLLOWING:
          ...
        case LEADING:
           ...
      }
      

      集群启动状态当然是LOOKING

      private ServerState state = ServerState.LOOKING;
      

      LOOKING状态的机器需要去获取集群的Leader,如果当前没有Leader,则进入选举模式。

      setCurrentVote(makeLEStrategy().lookForLeader());
      
    3. Leader选举
      选举算法以默认的FastLeaderElection#lookForLeader为例,该方法开始新一轮Leader选举。每当QuorumPeer将其状态更改为LOOKING时,就会调用此方法,并向所有其他peers发送通知。具体选举算法单独分析。

    4. 完成选举后服务器状态为:OBSERVINGFOLLOWINGLEADING,对应角色分别是ObserverFollowerLeaderObserverFollower的区别在于Observer不参与任何投票。

    角色交互

    完成集群选举后,集群Leader和Followers之间需要进行数据同步,并在后续的消息处理中,Followers会将事物请求以Request的形式转发给Leader。

    Follower

    当节点中状态为FOLLOWING时,将设置当前角色为Follower,包括创建Follower并启动

    setFollower(makeFollower(logFactory));
    follower.followLeader();
    

    Follower#followLeader方法

    void followLeader() throws InterruptedException {
      ...
      QuorumServer leaderServer = findLeader();            
      try {
        connectToLeader(leaderServer.addr, leaderServer.hostname);
        long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
    
        // leader zxid比自己的zxid还要小,出错了
        long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
        if (newEpoch < self.getAcceptedEpoch()) {
          LOG.error("");
          throw new IOException("Error: Epoch of leader is lower");
        }
        syncWithLeader(newEpochZxid);                
        QuorumPacket qp = new QuorumPacket();
        while (this.isRunning()) {
          readPacket(qp);
          processPacket(qp);
        }
      }
      ...
    }
    

    步骤

    1. 找到当前leader,通过投票查找

      Vote current = self.getCurrentVote();
      for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
          s.recreateSocketAddresses();
          leaderServer = s;
          break;
        }
      }
      
    2. 连接到leader,重试连接上一步找到的leader

      sock = new Socket();        
      sock.setSoTimeout(self.tickTime * self.initLimit);
      for (int tries = 0; tries < 5; tries++) {
        sock.connect(addr, self.tickTime * self.syncLimit);
      }
      
    3. 向leader注册,
      这一步Follower向Leader同步投票的Epoch以及Follower的自己的最新事务id、Epoch,并接受Leader的Epoch。

    4. 同步数据
      上一步Leader收到Follower最新的zxid后,根据自己的zxid决定采用哪种方式同步数据。在Learner#syncWithLeader方法中,Leader通知Follower以何种方式进行同步

      readPacket(qp);
      if (qp.getType() == Leader.DIFF) {
       // 差异化同步
        snapshotNeeded = false;
      } else if (qp.getType() == Leader.SNAP) {
        // 全量同步
        zk.getZKDatabase().clear();
        zk.getZKDatabase().deserializeSnapshot(leaderIs);
        String signature = leaderIs.readString("signature");
        if (!signature.equals("BenWasHere")) {
          LOG.error("Missing signature. Got " + signature);
          throw new IOException("Missing signature");                   
        }
        zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
      } else if (qp.getType() == Leader.TRUNC) {
        //截断日志
        boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
        if (!truncated) {
          System.exit(13);
        }
        zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
      } else {
        System.exit(13);
      }
      

      Follower根据同步类型,处理本地日志文件及本地数据库

      • DIFF:差异化同步
      • SNAP:全量同步
      • TRUNC:截断日志

      然后Leader开始发送数据同步

      // 数据同步知道接收到UPTODATE类型的数据包结束
      outerLoop:
      while (self.isRunning()) {
        readPacket(qp);
        switch(qp.getType()) {
          // 投票
          case Leader.PROPOSAL:
            PacketInFlight pif = new PacketInFlight();
            pif.hdr = new TxnHeader();
            pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
            if (pif.hdr.getZxid() != lastQueued + 1) {
              
            }
            lastQueued = pif.hdr.getZxid();
            packetsNotCommitted.add(pif);
            break;
          // 提交
          case Leader.COMMIT:
            if (!writeToTxnLog) {
              pif = packetsNotCommitted.peekFirst();
              if (pif.hdr.getZxid() != qp.getZxid()) {
                LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
              } else {
                zk.processTxn(pif.hdr, pif.rec);
                packetsNotCommitted.remove();
              }
            } else {
              packetsCommitted.add(qp.getZxid());
            }
            break;
          // 只有observer才能得到这种类型的包。我们将此视为接收PROPOSAL和COMMIT。
          case Leader.INFORM:
            PacketInFlight packet = new PacketInFlight();
            packet.hdr = new TxnHeader();
            packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
            // Log warning message if txn comes out-of-order
            if (packet.hdr.getZxid() != lastQueued + 1) {
              
            }
            lastQueued = packet.hdr.getZxid();
            if (!writeToTxnLog) {
              // Apply to db directly if we haven't taken the snapshot
              zk.processTxn(packet.hdr, packet.rec);
            } else {
              packetsNotCommitted.add(packet);
              packetsCommitted.add(qp.getZxid());
            }
            break;
          // 同步完成
          case Leader.UPTODATE:
            if (isPreZAB1_0) {
              zk.takeSnapshot();
              self.setCurrentEpoch(newEpoch);
            }
            self.cnxnFactory.setZooKeeperServer(zk);                
            break outerLoop;
          case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
            File updating = new File(self.getTxnFactory().getSnapDir(),
                                     QuorumPeer.UPDATING_EPOCH_FILENAME);
            if (!updating.exists() && !updating.createNewFile()) {
              throw new IOException("Failed to create " + updating.toString());
            }
            if (snapshotNeeded) {
              zk.takeSnapshot();
            }
            self.setCurrentEpoch(newEpoch);
            if (!updating.delete()) {
              throw new IOException("Failed to delete " + updating.toString());
            }
            //需要将数据写入事务日志
            writeToTxnLog = true;
            isPreZAB1_0 = false;
            writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
            break;
        }
      }
      

      同步完成后

      发送响应

      ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
      writePacket(ack, true);
      

      开始接收客户端请求,这个zk在不同角色的节点上是不同的角色,FollowerZooKeeperServerObserverZooKeeperServer

      zk.startup();
      

      还需要补充内存数据库中snapshot与log之间的差异

    5. 不断与Leader通信,同步数据

      while (this.isRunning()) {
        readPacket(qp);
        processPacket(qp);
      }
      

      Follower#processPacket方法检查在qp中接收的数据包,并根据其内容进行分发。

      protected void processPacket(QuorumPacket qp) throws IOException{
        switch (qp.getType()) {
           // 心跳
          case Leader.PING:            
            ping(qp);            
            break;
          // 事务投票
          case Leader.PROPOSAL:            
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
            }
            lastQueued = hdr.getZxid();
            fzk.logRequest(hdr, txn);
            break;
          // 提交事物
          case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break;
          case Leader.UPTODATE:
            LOG.error("Received an UPTODATE message after Follower started");
            break;
          case Leader.REVALIDATE:
            revalidate(qp);
            break;
          // 通知Learner服务器已经完成了Sync操作
          case Leader.SYNC:
            fzk.sync();
            break;
          default:
            LOG.error("Invalid packet type: {} received by Observer", qp.getType());
        }
      }
      

      Follower后续还需要不断与Leader通信,进行事务投票。

    至此Follower开始对外提供服务。

    Leader

    Follower类似,

    setLeader(makeLeader(logFactory));
    leader.lead();
    

    QuorumPeer#makeLeader方法,

    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
      return new Leader(this, new LeaderZooKeeperServer(logFactory,
                        this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }
    

    Leader内部处理请求的是LeaderZooKeeperServer

    Leader#lead的主要流程

    1. 加载内存数据库

      zk.loadData();
      
    2. 创建LearnerCnxAcceptor,启动等待来自新followers的连接请求的线程。

      cnxAcceptor = new LearnerCnxAcceptor();
      cnxAcceptor.start();
      

      Leader.LearnerCnxAcceptor#run方法中

      Socket s = ss.accept();
      // start with the initLimit, once the ack is processed
      // in LearnerHandler switch to the syncLimit
      s.setSoTimeout(self.tickTime * self.initLimit);
      s.setTcpNoDelay(nodelay);
      BufferedInputStream is = new BufferedInputStream(s.getInputStream());
      // 为每个Learner创建一条线程,处理投票、数据同步
      LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
      fh.start();
      
    3. 等待Leaner响应Ack

      readyToStart = true;
      long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
      
      zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
      
      synchronized(this){
        lastProposed = zk.getZxid();
      }
      
      newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
      
      waitForEpochAck(self.getId(), leaderStateSummary);
      self.setCurrentEpoch(epoch);
      
      waitForNewLeaderAck(self.getId(), zk.getZxid());
      

      准备完毕,只需要等待过半数的Leaner的回复即可对外工作,在LeanerHandler中也会调用waitForEpochAckwaitForEpochAck唤醒Leader

    4. 对外提供服务

      startZkServer();
      
    5. 心跳,和Leaner保活

    至此ZooKeeper集群模式启动完毕,整个集群开始对外提供服务。

    相关文章

      网友评论

        本文标题:ZooKeeper服务端启动源码 集群

        本文链接:https://www.haomeiwen.com/subject/yvfxsqtx.html