美文网首页
zk源码阅读31:集群server中QuorumPeer源码解析

zk源码阅读31:集群server中QuorumPeer源码解析

作者: 赤子心_d709 | 来源:发表于2017-08-08 18:45 被阅读850次

摘要

本节讲解QuorumPeer源码,网上资源较少,不知道应该怎么合适的翻译
QuorumPeer主要记录当前服务器各种信息,然后是一个线程,不断地完成leader选举(崩溃恢复以及原子广播这两个阶段的更迭
本节内容较多,主要讲解

内部类
  ServerState记录当前server所处的状态
  LearnerType记录server参与投票,选举的类型
  ResponderThread是旧版本udp选举的类
  QuorumServer记录每个sid机器的地址,server间通信端口,以及server间选举leader端口
属性
方法
  构造函数
  get,set相关
  启动相关函数
  选举状态相关
  epoch持久化相关

内部类

QuorumPeer内部类

简介如下

ServerState

在上一节讲过,再贴出来,记录server所处的四种状态

  LOOKING:寻找Leader状态。当服务器处于该状态时,它会认为当前集群中没有Leader,因此需要进入Leader选举状态。
  FOLLOWING:跟随者状态。表明当前服务器角色是Follower。
  LEADING:领导者状态。表明当前服务器角色是Leader。
  OBSERVING:观察者状态。表明当前服务器角色是Observer。

LearnerType

标记服务器是否能够参与投票,选举

PARTICIPANT,能够参与一致性的投票,以及选举leader
OBSERVER,只能观察

ResponderThread

这个不讲了,是3.4.0之前选举leader时用udp协议的版本,现在已经Deprecated了

QuorumServer

这个类就是记录每一个配置中server的主要信息
构造函数

        public QuorumServer(long id, String hostname,
                            Integer port, Integer electionPort,
                            LearnerType type) {
            this.id = id;//机器的serverId
            this.hostname=hostname;//host
            if (port!=null){
                this.port=port;//选举完之后,leader和learner的通信端口
            }
            if (electionPort!=null){
                this.electionPort=electionPort;//选举之前,各server参与选举的port
            }
            if (type!=null){
                this.type = type;//server类型,为PARTICIPANT 或者 OBSERVER;
            }
            this.recreateSocketAddresses();
        }

主要函数recreateSocketAddresses,创建socket

        public void recreateSocketAddresses() {//解析host创建InetSocketAddress
            InetAddress address = null;
            try {
                address = InetAddress.getByName(this.hostname);
                LOG.info("Resolved hostname: {} to address: {}", this.hostname, address);
                this.addr = new InetSocketAddress(address, this.port);
                if (this.electionPort > 0){
                    this.electionAddr = new InetSocketAddress(address, this.electionPort);
                }
            } catch (UnknownHostException ex) {
                LOG.warn("Failed to resolve address: {}", this.hostname, ex);
                // Have we succeeded in the past?
                if (this.addr != null) {
                    // Yes, previously the lookup succeeded. Leave things as they are
                    return;
                }
                // The hostname has never resolved. Create our InetSocketAddress(es) as unresolved
                this.addr = InetSocketAddress.createUnresolved(this.hostname, this.port);
                if (this.electionPort > 0){
                    this.electionAddr = InetSocketAddress.createUnresolved(this.hostname,
                                                                           this.electionPort);
                }
            }
        }

字段

字段非常多,主要字段(非log,jmx,旧版本ResponderThread相关等)列表如下

字段 意义 默认值
QuorumCnxManager qcm; 选举leader时的网络IO管理器
ZKDatabase zkDb; 数据库对象
long OBSERVER_ID; 表识observer Long.MAX_VALUE;
long start_fle, end_fle; fast leader election开始,结束的时间
LearnerType learnerType server参与类型 LearnerType.PARTICIPANT;默认参与者
Map<Long, QuorumServer> quorumPeers; 记录每个sid对应的server
QuorumVerifier quorumConfig; 集群验证器(通常为过半验证)
long myid; 当前机器myid(sid)
Vote currentVote; 某一轮投票最终认定的leader
Vote bcVote; 向前兼容的vote,也代表认定的leader
boolean running 是否不断完成leader选举以及原子广播 true;
int tickTime 同QuorumPeerConfig中的定义
int minSessionTimeout 同QuorumPeerConfig中的定义
int maxSessionTimeout 同QuorumPeerConfig中的定义
int initLimit,syncLimit 同QuorumPeerConfig中的定义
boolean syncEnabled 同QuorumPeerConfig中的定义 true;
boolean quorumListenOnAllIPs 同QuorumPeerConfig中的定义 false;
int tick; 当前所处的ticktime周期数
ServerState state server当前状态,初始为looking状态 ServerState.LOOKING;
InetSocketAddress myQuorumAddr; 当前机器的地址
int electionType; 选举算法类型编号,对应不同的electionAlg
Election electionAlg; 选举算法
ServerCnxnFactory cnxnFactory; 与client进行IO
FileTxnSnapLog logFactory 事务快照日志
QuorumStats quorumStats 状态对象
Follower follower; 如果是following状态时,服务器对应的follower
Leader leader; 如果是leading状态,服务器对应的leader
Observer observer; 如果是observer,服务器对应的observer
String SYNC_ENABLED 判断observer是否同步的属性名称 "zookeeper.observer.syncEnabled";
long acceptedEpoch 接下来接收或者要成为epoch,会变动,稳定了之后就成了currentEpoch -1;
long currentEpoch 当前所处的epoch,是稳定的 -1;
String CURRENT_EPOCH_FILENAME currentEpoch对应的文件名 "currentEpoch";
String ACCEPTED_EPOCH_FILENAME acceptedEpoch对应的文件名 "acceptedEpoch";
String UPDATING_EPOCH_FILENAME updatingEpoch对应的文件名 "updatingEpoch";

这里面

acceptedEpoch 和 currentEpoch 的意义是自己理解,代码注释没有

方法

也特别多,不截图了,把重要的列举出来

构造函数

有好几个,真正调用只有一个

    public QuorumPeer() {
        super("QuorumPeer");//线程名
        quorumStats = new QuorumStats(this);//quorumStats并没有被用到
    }

get,set相关

只列举重要的几个

getView

//返回当时整个集群的视图,就是所有server的id以及对应的配置

public Map<Long,QuorumPeer.QuorumServer> getView() {
        return Collections.unmodifiableMap(this.quorumPeers);
    }

getVotingView获取

//返回整个集群中,作为参与者的所有server的视图

    public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
        Map<Long,QuorumPeer.QuorumServer> ret = 
            new HashMap<Long, QuorumPeer.QuorumServer>();
        Map<Long,QuorumPeer.QuorumServer> view = getView();
        for (QuorumServer server : view.values()) {            
            if (server.type == LearnerType.PARTICIPANT) {
                ret.put(server.id, server);
            }
        }        
        return ret;
    }

countParticipants

//计算map中PARTICIPANT的server的个数,用于构造集群验证器

    protected static int countParticipants(Map<Long,QuorumServer> peers) {
      int count = 0;
      for (QuorumServer q : peers.values()) {
          if (q.type == LearnerType.PARTICIPANT) {
              count++;
          }
      }
      return count;
    }

getLastLoggedZxid

获取lastZxid

    public long getLastLoggedZxid() {
        if (!zkDb.isInitialized()) {
            loadDataBase();
        }
        return zkDb.getDataTreeLastProcessedZxid();
    }

启动相关函数

start

该函数是启动的入口

    public synchronized void start() {
        loadDataBase();//从事务日志目录dataLogDir和数据快照目录dataDir中恢复出DataTree数据
        cnxnFactory.start();//开启对客户端的连接端口,启动ServerCnxnFactory主线程
        startLeaderElection();//创建出选举算法
        super.start();//启动QuorumPeer线程,在该线程中进行服务器状态的检查
    }

loadDataBase

这个函数目前没有看懂,主要是不知道currentEpoch,acceptedEpoch,updatingEpoch的区别是什么
代码先贴上,以后有机会再看

    private void loadDataBase() {
        File updating = new File(getTxnFactory().getSnapDir(),
                                 UPDATING_EPOCH_FILENAME);
        try {
            zkDb.loadDataBase();

            // load the epochs
            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
            long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
            try {
                currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
                if (epochOfZxid > currentEpoch && updating.exists()) {
                    LOG.info("{} found. The server was terminated after " +
                             "taking a snapshot but before updating current " +
                             "epoch. Setting current epoch to {}.",
                             UPDATING_EPOCH_FILENAME, epochOfZxid);
                    setCurrentEpoch(epochOfZxid);
                    if (!updating.delete()) {
                        throw new IOException("Failed to delete " +
                                              updating.toString());
                    }
                }
            } catch(FileNotFoundException e) {
                // pick a reasonable epoch number
                // this should only happen once when moving to a
                // new code version
                currentEpoch = epochOfZxid;
                LOG.info(CURRENT_EPOCH_FILENAME
                        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                        currentEpoch);
                writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
            }
            if (epochOfZxid > currentEpoch) {
                throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
            }
            try {
                acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
            } catch(FileNotFoundException e) {
                // pick a reasonable epoch number
                // this should only happen once when moving to a
                // new code version
                acceptedEpoch = epochOfZxid;
                LOG.info(ACCEPTED_EPOCH_FILENAME
                        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                        acceptedEpoch);
                writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
            }
            if (acceptedEpoch < currentEpoch) {
                throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
            }
        } catch(IOException ie) {
            LOG.error("Unable to load database on disk", ie);
            throw new RuntimeException("Unable to run quorum server ", ie);
        }
    }

startLeaderElection

创建选举算法

    synchronized public void startLeaderElection() {//初始化自己投票,以及初始化投票算法
        try {
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());//初始化投票,leader为自己
        } catch(IOException e) {
            RuntimeException re = new RuntimeException(e.getMessage());
            re.setStackTrace(e.getStackTrace());
            throw re;
        }
        for (QuorumServer p : getView().values()) {
            if (p.id == myid) {
                myQuorumAddr = p.addr;
                break;
            }
        }
        if (myQuorumAddr == null) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        if (electionType == 0) {//不会进入这个if
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);//根据electionType创建对应选举算法
    }

createElectionAlgorithm

//根据electionAlgorithm编号获取对应的Election算法,当前版本只会是FastLeaderElection

    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
                
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = new QuorumCnxManager(this);
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

run

完成jmx初始化,然后不断处于恢复模式(选leader)和广播模式(处理数据),直到shutdown

    public void run() {//先初始化jmx,然后不断处于恢复模式(选leader)和广播模式(处理数据),直到shutdown
        setName("QuorumPeer" + "[myid=" + getId() + "]" +
                cnxnFactory.getLocalAddress());

        LOG.debug("Starting quorum peer");
        try {//先完成jmx的初始化
            jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for(QuorumServer s: getView().values()){
                ZKMBeanInfo p;
                if (getId() == s.id) {
                    p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {
                    p = new RemotePeerBean(s);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }

        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                                logFactory, this,
                                new ZooKeeperServer.BasicDataTreeBuilder(),
                                this.zkDb);
    
                        // Instead of starting roZk immediately, wait some grace
                        // period before we decide we're partitioned.
                        //
                        // Thread is used here because otherwise it would require
                        // changes in each of election strategy classes which is
                        // unnecessary code coupling.
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());//设置投票
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());//设置投票
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));//设置自己为observer
                        observer.observeLeader();//观察leader
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );                        
                    } finally {
                        observer.shutdown();
                        setObserver(null);//结束时清空observer
                        setPeerState(ServerState.LOOKING);//还原为looking
                    }
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));//设置自己为follower
                        follower.followLeader();//follow leader
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);//清空follower
                        setPeerState(ServerState.LOOKING);//还原为looking
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));//设置自己为leader
                        leader.lead();
                        setLeader(null);//结束时清空leader
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);//还原为looking
                    }
                    break;
                }
            }
        } finally {
            LOG.warn("QuorumPeer main thread exited");
            try {
                MBeanRegistry.getInstance().unregisterAll();
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            jmxQuorumBean = null;
            jmxLocalPeerBean = null;
        }
    }

选举状态相关

就是选举结束时,各server成为Follower,Leader,Observer之一

    protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {//生成follower
        return new Follower(this, new FollowerZooKeeperServer(logFactory, 
                this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }
     
    protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {//生成leader
        return new Leader(this, new LeaderZooKeeperServer(logFactory,
                this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }
    
    protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {//生成observer
        return new Observer(this, new ObserverZooKeeperServer(logFactory,
                this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }

    synchronized protected void setLeader(Leader newLeader){
        leader=newLeader;
    }

    synchronized protected void setFollower(Follower newFollower){
        follower=newFollower;
    }
    
    synchronized protected void setObserver(Observer newObserver){
        observer=newObserver;
    }

    public String getServerState() {//获取当前状态
        switch (getPeerState()) {
        case LOOKING:
            return QuorumStats.Provider.LOOKING_STATE;
        case LEADING:
            return QuorumStats.Provider.LEADING_STATE;
        case FOLLOWING:
            return QuorumStats.Provider.FOLLOWING_STATE;
        case OBSERVING:
            return QuorumStats.Provider.OBSERVING_STATE;
        }
        return QuorumStats.Provider.UNKNOWN_STATE;
    }

epoch持久化相关

主要是acceptedEpoch,currentEpoch两个值的持久化,分为读写

private long readLongFromFile(String name) throws IOException {//从snapdir中读取对应name的文件
        File file = new File(logFactory.getSnapDir(), name);
        BufferedReader br = new BufferedReader(new FileReader(file));
        String line = "";
        try {
            line = br.readLine();
            return Long.parseLong(line);
        } catch(NumberFormatException e) {
            throw new IOException("Found " + line + " in " + file);
        } finally {
            br.close();
        }
    }

    private void writeLongToFile(String name, long value) throws IOException {//向snapdir中对应name的文件写入long值
        File file = new File(logFactory.getSnapDir(), name);
        AtomicFileOutputStream out = new AtomicFileOutputStream(file);
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
        boolean aborted = false;
        try {
            bw.write(Long.toString(value));
            bw.flush();
            
            out.flush();
        } catch (IOException e) {
            LOG.error("Failed to write new file " + file, e);
            // worst case here the tmp file/resources(fd) are not cleaned up
            //   and the caller will be notified (IOException)
            aborted = true;
            out.abort();
            throw e;
        } finally {
            if (!aborted) {
                // if the close operation (rename) fails we'll get notified.
                // worst case the tmp file may still exist
                out.close();
            }
        }
    }

思考

bcVote和currentVote区别

bcVote应该是老版本的用法,新版本用的是currentVote,这里没有深究,应该是处理兼容性的问题

在server中启动步骤中所处的位置

QuorumPeer在server中启动的位置

在server从非looking状态转化成另外一个状态时,原有状态的清空

比如从following变化到leading时
ans:对应状态结束时finally都有对应处理


状态变化时,已有状态的清空

QuorumPeer#run中state变化都是哪里完成的

上一节讲的FastLeaderElection#lookForLeader调用了QuorumPeer#setPeerState

QuorumPeer#run体现zk集群的两个状态

崩溃恢复以及原子广播

acceptedEpoch 和 currentEpoch是干吗用的

http://blog.csdn.net/damacheng/article/details/42393793
acceptedEpoch 是即将成为的epoch,但是可能有比较之后会变化
currentEpoch 是当前的epoch,是稳定的,每次acceptedEpoch比较完了最终确定了之后会赋值给currentEpoch

相当于源码30中思考提出的 getVote()和self.getCurrentVote()的区别是什么

tick是什么,和tickTime区别,有什么意义

tickTime代表周期长短,比如3s是一个周期
tick代表当前是第几个周期,按时递增的,初始为0
在后面讲Leader和LearnerHandler源码时会提到,用于Leader和Learner的交互的超时检测

问题

QuorumPeer#OBSERVER_ID的意义是什么

只看到QuorumCnxManager#receiveConnection接收连接时判断,没看到发送请求时会写入这个值???

吐槽

三个值follower,leader,observer三个属性同时存在?

这三个表示不同的状态,一次只能存在一个,不知道为什么不用enum写状态机

QuorumPeer#quorumStats属性存在的必要性

没有用到啊

相关文章

网友评论

      本文标题:zk源码阅读31:集群server中QuorumPeer源码解析

      本文链接:https://www.haomeiwen.com/subject/pbnrlxtx.html