美文网首页Java面试
zookeeper源码分析之集群模式服务端(下)

zookeeper源码分析之集群模式服务端(下)

作者: 1d96ba4c1912 | 来源:发表于2018-11-04 23:20 被阅读11次

    接上篇文章,本文主要分析一下一个ZK集群从刚启动到对外提供服务这段时间发生了什么

    一、执行流程概述

    首先在ZK集群中,不管是什么类型的节点,刚刚启动时都是LOOKING状态然后发起选举寻找Leader,只有确定Leader以后才最终确定自己以Leader,Follower,Observer中哪种方式启动对外提供服务。

    ZK整个恢复过程分为三步:

    1. 选取Leader,一个节点想要成为Leader首先它的epoch要大,已处理的事务要最多,如果有这两个条件都相同的多个节点则ServerId最大的节点成为Leader,之所以要这样是因为其他节点都会按照Leader节点的事务同步数据,如果Leader不是最新的就会造成数据的丢失。
    2. 数据同步,Leader选出来以后各个节点会以Leader为标准更新自己的事务。
    3. 提供服务,数据同步完成以后开始正式对外提供服务,接收客户端连接,处理读写请求。

    二、源码分析

    集群模式下启动所有的ZK节点启动入口都是QuorumPeerMain类的main方法。
    main方法加载配置文件以后,最终会调用到QuorumPeer的start方法,来看下:

    public synchronized void start() {
        //校验ServerId是否合法
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        //载入之前持久化的一些信息
        loadDataBase();
        //启动线程监听
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        //初始化选举投票以及算法
        startLeaderElection();
        //当前也是一个线程,注意run方法
        super.start();
    }
    

    我们已经知道了当一个节点启动时需要先发起选举寻找Leader节点,然后再根据Leader节点的事务信息进行同步,最后开始对外提供服务,这里我们先来看下初始化选举的逻辑,即上面的startLeaderElection方法:

    synchronized public void startLeaderElection() {
        try {
            //所有节点启动的初始状态都是LOOKING,因此这里都会是创建一张投自己为Leader的票
            if (getPeerState() == ServerState.LOOKING) {
                currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
            }
        } catch(IOException e) {
            //异常处理
        }
        //初始化选举算法,electionType默认为3
        this.electionAlg = createElectionAlgorithm(electionType);
    }
    
    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le = null;
        switch (electionAlgorithm) {
        case 1:
            //忽略
        case 2:
            //忽略
        case 3:
            //electionAlgorithm默认是3,直接走到这里
            qcm = createCnxnManager();
            //监听选举事件的listener
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                //开启监听器
                listener.start();
                //初始化选举算法
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                //发起选举
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            //忽略
        }
        return le;
    }
    

    初始化选举的地方一下开启两个线程,一个是Listener,一个是FastLeaderElection,接下来我们按照上面的顺序,先来看下Listener做的事情,由于它是一个线程,因此我们还是直接看run方法,为了简单起见,还是删除了很多代码,我们重点关注主流程:

    public void run() {
        while((!shutdown) && (numRetries < 3)){
            try {
                ss = new ServerSocket();
                ss.setReuseAddress(true);
                addr=根据配置信息获取地址
                setName(addr.toString());
                //监听选举端口
                ss.bind(addr);
                while (!shutdown) {
                    try {
                        //接收客户端连接
                        client = ss.accept();
                        //设置连接参数
                        setSockOpts(client);
                        //开始处理
                        receiveConnection(client);
                    } catch (SocketTimeoutException e) {
                    }
                }
            } catch (IOException e) {
            }
        }
        }
    }
    

    接下来重点看下当连接到来的时候receiveConnection方法的处理逻辑:

     //这个方法很简单不多说了
    public void receiveConnection(final Socket sock) {
        DataInputStream din = null;
        try {
            din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
            handleConnection(sock, din);
        } catch (IOException e) {
        }
    }
    
    private void handleConnection(Socket sock, DataInputStream din) throws IOException {
        //省略中从客户端发来的数据中解析serverId以及选举地址的代码...
    
        //这里的思路是如果请求连接的节点的ServerId小于当前节点,则关闭连接,并由当前节点发起连接
        //隐含的意思就是ZK集群中节点的连接都是由ServerId大的连ServerId小的
        if (sid < self.getId()) {
            //如果连接已经建立则关闭
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }
            closeSocket(sock);
            //当前节点去连接对方节点
            if (electionAddr != null) {
                connectOne(sid, electionAddr);
            } else {
                connectOne(sid);
            }
        } else {
            //如果接受该连接,则创建对应的读写worker
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            //如果已经创建则关闭旧的
            SendWorker vsw = senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
            //启动读写事件处理
            sw.start();
            rw.start();
        }
    }
    

    上面处理连接的这段代码只需要关注两个重点:

    1. 只允许serverId大的节点去连接serverId小的节点
    2. 针对每个连接启动了读写两个worker负责IO处理

    对于两个worker来说,它们本身的逻辑很简单,SendWorker就是不断的把queueSendMap中存放的对应serverId的数据发出去。RecvWorker就是把收到的数据加入recvQueue队列中,这里就不再贴代码了。

    看完了Listener的逻辑,我们接着上面的代码看下FastLeaderElection选举算法的思路,从它的start方法开始看:

    public void start() {
        this.messenger.start();
    }
    
    void start(){
        //对应WorkerSender类
        this.wsThread.start();
        //对应WorkerReceiver类
        this.wrThread.start();
    }
    

    这里可以看到FastLeaderElection内部也是开启了两个线程负责读写,这里需要跟前面Listener的逻辑结合考虑。Listener开启的线程一个负责读取数据放入队列,一个负责把队列中的数据发出去,但读取的数据给谁用呢?发送的数据是哪来的呢?FastLeaderElection里的两线程就是跟它们交互的。

    先来看下WorkerSender的run方法:

    public void run() {
        while (!stop) {
            try {
                ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                if(m == null) continue;
                //处理发送消息
                process(m);
            } catch (InterruptedException e) {
                break;
            }
        }
    }
    
    void process(ToSend m) {
        //序列化消息
        ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                            m.leader,
                                            m.zxid,
                                            m.electionEpoch,
                                            m.peerEpoch,
                                            m.configData);
        //发送数据
        manager.toSend(m.sid, requestBuffer);
    
    }
    public void toSend(Long sid, ByteBuffer b) {
        //如果数据时发送给自己的那么绕过IO直接加入到recv队列
        if (this.mySid == sid) {
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
        } else {
             //否则把数据加入到指定ServerId的待发送队列
             ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
             ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
             if (oldq != null) {
                 addToSendQueue(oldq, b);
             } else {
                 addToSendQueue(bq, b);
             }
             //连接指定ServerId,该方法内部如果连接已经建立则会返回,否则创建连接
             connectOne(sid);
                
        }
    }
    

    上面的代码总结一下就是FastLeaderElection内部的WorkerSender线程会从sendqueue队列中读取数据包然后放到queueSendMap里,而Listener里面的SendWorker又会不断从queueSendMap取出数据进行发送。

    再来看一下WorkerReceiver的run方法:

    public void run() {
    
        Message response;
        while (!stop) {
            try {
                //这里本质上是从recvQueue里取出数据
                response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                //没有数据则继续等待
                if(response == null) continue;
                //这里省略掉协议兼容,解析以及处理reconfig指令的逻辑..
    
                int rstate = response.buffer.getInt();
                long rleader = response.buffer.getLong();
                long rzxid = response.buffer.getLong();
                long relectionEpoch = response.buffer.getLong();
                long rpeerepoch;
                QuorumVerifier rqv = null;
                //如果不是一个有投票权的节点,例如Observer节点
                if(!validVoter(response.sid)) {
                    //直接把自己的投票信息返回
                    Vote current = self.getCurrentVote();
                    QuorumVerifier qv = self.getQuorumVerifier();
                    ToSend notmsg = new ToSend(ToSend.mType.notification,
                            current.getId(),
                            current.getZxid(),
                            logicalclock.get(),
                            self.getPeerState(),
                            response.sid,
                            current.getPeerEpoch(),
                            qv.toString().getBytes());
                    sendqueue.offer(notmsg);
                } else {
                    //获取发消息的节点的状态
                    QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                    switch (rstate) {
                    case 0:
                        ackstate = QuorumPeer.ServerState.LOOKING;
                        break;
                    case 1:
                        ackstate = QuorumPeer.ServerState.FOLLOWING;
                        break;
                    case 2:
                        ackstate = QuorumPeer.ServerState.LEADING;
                        break;
                    case 3:
                        ackstate = QuorumPeer.ServerState.OBSERVING;
                        break;
                    default:
                        continue;
                    }
    
                    //赋值Notification
                    n.leader = rleader;
                    n.zxid = rzxid;
                    n.electionEpoch = relectionEpoch;
                    n.state = ackstate;
                    n.sid = response.sid;
                    n.peerEpoch = rpeerepoch;
                    n.version = version;
                    n.qv = rqv;
                    //如果当前节点正在寻找Leader
                    if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                        //把收到的消息加入队列
                        recvqueue.offer(n);
                        //如果对方节点也是LOOKING状态,且周期小于自己,则把自己投票信息发回去
                        if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){
                            Vote v = getVote();
                            QuorumVerifier qv = self.getQuorumVerifier();
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                    v.getId(),
                                    v.getZxid(),
                                    logicalclock.get(),
                                    self.getPeerState(),
                                    response.sid,
                                    v.getPeerEpoch(),
                                    qv.toString().getBytes());
                            sendqueue.offer(notmsg);
                        }
                    } else {
                        //如果当前节点不是LOOKING状态,那么它已经知道谁是Leader了
                        Vote current = self.getCurrentVote();
                        //如果对方是LOOKING状态,那么就把自己认为的Leader信息返给对方
                        if(ackstate == QuorumPeer.ServerState.LOOKING){
                            QuorumVerifier qv = self.getQuorumVerifier();
                            ToSend notmsg = new ToSend(
                                    ToSend.mType.notification,
                                    current.getId(),
                                    current.getZxid(),
                                    current.getElectionEpoch(),
                                    self.getPeerState(),
                                    response.sid,
                                    current.getPeerEpoch(),
                                    qv.toString().getBytes());
                            sendqueue.offer(notmsg);
                        }
                    }
                }
            } catch (InterruptedException e) {
            }
        }
    }
    

    上面的代码比较长,但逻辑不复杂,总结一下就是:

    1. 如果消息是Observer节点发来的,则直接返回自己的投票信息,结束。
    2. 如果当前节点是LOOKING状态,则把消息存入recvqueue队列用于统计投票。
    3. 如果发消息的节点也是LOOKING状态,且当前节点的周期大,则把当前节点的投票信息返回。
    4. 如果当前节点不是LOOKING状态且发消息的节点是LOOKING状态,则返回当前节点认为的Leader信息。

    最后我们整理一下上面的四个处理IO的线程逻辑,首先是当前节点发送消息的时候是通过WorkerSender经由SendWorker发送出去,而接受消息是通过RecvWorker再传递到WorkerReceiver并且如果是投票节点,WorkerReceiver又会把收到的数据封装成Notification对象加入到recvqueue中用于统计票数。

    以上四个IO处理类只是经行了数据的转发,封装及保存,那么真正的选举逻辑在哪里呢?其实是在本文最开始的代码片段,也就是QuorumPeer类中start方法的最后一行super.start(),QuorumPeer本身也是一个线程类,一起来看下它的run方法:

    public void run() {
        try {
            while (running) {
                //根据当前节点的状态执行不同流程
                switch (getPeerState()) {
                case LOOKING:
                        try {
                            //寻找Leader节点
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            setPeerState(ServerState.LOOKING);
                        }                        
                    break;
                case OBSERVING:
                    try {
                        //当前节点启动模式为Observer
                        setObserver(makeObserver(logFactory));
                        //与Leader节点进行数据同步
                        observer.observeLeader();
                    } catch (Exception e) {
                    } finally {
                    }
                    break;
                case FOLLOWING:
                    try {
                        //当前节点启动模式为Follower
                        setFollower(makeFollower(logFactory));
                        //与Leader节点进行数据同步
                        follower.followLeader();
                    } catch (Exception e) {
                    } finally {
                    }
                    break;
                case LEADING:
                    try {
                        //当前节点启动模式为Leader
                        setLeader(makeLeader(logFactory));
                        //发送自己成为Leader的通知
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                    } finally {
                    }
                    break;
                }
            }
        } 
    }
    

    节点初始化的状态为LOOKING,因此启动时直接会调用lookForLeader方法发起Leader选举,一起看下:

    public Vote lookForLeader() throws InterruptedException {
            try {
                Map<Long, Vote> recvset = new HashMap<Long, Vote>();
                Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
                //向所有投票节点发送自己的投票信息
                sendNotifications();
                while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
                    //读取各个节点返回的投票信息
                    Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
                    //超时重发
                    if(n == null){
                        //如果前面待发送的消息已经全部发送,则重新发送
                        if(manager.haveDelivered()){
                            sendNotifications();
                        } else {
                            //否则尝试与各个节点建立连接
                            manager.connectAll();
                        }
                        //退避算法修改下次等待时间
                        int tmpTimeOut = notTimeout*2;
                        notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
                    } 
                    else if (validVoter(n.sid) && validVoter(n.leader)) {
                        switch (n.state) {
                        case LOOKING:
                            //如果节点的周期大于自己的
                            if (n.electionEpoch > logicalclock.get()) {
                                logicalclock.set(n.electionEpoch);
                                //清除已收到的投票信息
                                recvset.clear();
                                //两个节点根据epoch,zxid,serverId来判断新的投票信息
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                                } else {
                                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                                }
                                //修改选举周期以及投票信息,发起新一轮投票
                                sendNotifications();
                            } else if (n.electionEpoch < logicalclock.get()) {
                                //这里的break是跳出switch语句,别跟循环弄混
                                break;
                            } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                //如果对方的epoch,zxid,serverId比自己大
                                //则更新自己的投票给n的投票节点
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                                //重新发送自己新的投票信息
                                sendNotifications();
                            }
                            //把节点的投票信息记录下
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            //统计投票信息,判断当前选举是否可以结束,也就是收到的票数信息已经足够确认Leader
                            if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
                                            logicalclock.get(), proposedEpoch))) {
                                while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
                                    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){
                                        recvqueue.put(n);
                                        break;
                                    }
                                }
                                //如果没有多余的投票信息则可以结束本次选举周期
                                if (n == null) {
                                    //根据serverId修改当前节点的类型
                                    self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
                                    Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
                                    //清空接收消息队列
                                    leaveInstance(endVote);
                                    //返回最终的投票信息
                                    return endVote;
                                }
                            }
                            break;
                        case OBSERVING:
                            //Observer节点不参与投票,忽略
                            break;
                        case FOLLOWING:
                        case LEADING:
                            //如果周期相同,说明当前节点参与了这次选举
                            if(n.electionEpoch == logicalclock.get()){
                                //保存投票信息
                                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                                //判断当前节点收到的票数是否可以结束选举
                                if(termPredicate(recvset, new Vote(n.leader,
                                                n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                                && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                    self.setPeerState((n.leader == self.getId()) ?
                                            ServerState.LEADING: learningState());
    
                                    Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                            }
                            //把Leader跟Follower的投票信息加入outofelection,确认下它们的信息是否一致
                            outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                            if (termPredicate(outofelection, new Vote(n.leader,
                                    IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                                    && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                                synchronized(this){
                                    logicalclock.set(n.electionEpoch);
                                    self.setPeerState((n.leader == self.getId()) ?
                                            ServerState.LEADING: learningState());
                                }
                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                            break;
                        default:
                            break;
                        }
                    } 
                }
                return null;
            }
    

    经过上面的发起投票,统计投票信息最终每个节点都会确认自己的身份,节点根据类型的不同会执行以下逻辑:

    1. 如果是Leader节点,首先会想其他节点发送一条NEWLEADER信息,确认自己的身份,等到各个节点的ACK消息以后开始正式对外提供服务,同时开启新的监听器,处理新节点加入的逻辑。
    2. 如果是Follower节点,首先向Leader节点发送一条FOLLOWERINFO信息,告诉Leader节点自己已处理的事务的最大Zxid,然后Leader节点会根据自己的最大Zxid与Follower节点进行同步,如果Follower节点落后的不多则会收到Leader的DIFF信息通过内存同步,如果Follower节点落后的很多则会收到SNAP通过快照同步,如果Follower节点的Zxid大于Leader节点则会收到TRUNC信息忽略多余的事务。
    3. 如果是Observer节点,则与Follower节点相同。

    同步数据的代码比较繁琐,这里就不贴了,但是大体思路就是我说的。

    三、总结

    本文是Zookeeper系列的最后一篇文章,整个系列代码贴的较多,图很少,因为实在不知道怎么画,另外几篇文章都没有深入到各个细节,我个人的习惯是从流程上理解,大方向把控。真遇到问题了再从流程上入手快速定位到代码块深入研究,希望对ZK感兴趣的同学可以有一定帮助。

    最后非常推荐大家去看一下ZAB算法论文,写的很赞,附上链接:
    ZooKeeper’s atomic broadcast protocol:Theory and practice

    相关文章

      网友评论

        本文标题:zookeeper源码分析之集群模式服务端(下)

        本文链接:https://www.haomeiwen.com/subject/tepkxqtx.html