美文网首页
Zookeeper(六)-选主-源码分析

Zookeeper(六)-选主-源码分析

作者: 进击的蚂蚁zzzliu | 来源:发表于2021-01-01 17:21 被阅读0次

概述

选举-信息流.png

继续参考该图进行分析

1. 集群模式启动QuorumPeer.start
public synchronized void start() {
    // 加载数据库,比对各个obverser zxid进行数据同步
    loadDataBase();
    // 启动socket服务端
    cnxnFactory.start();
    // 执行选举流程
    startLeaderElection();
    super.start();
}
  • startLeaderElection开启领导者选举流程;
  • super.start选举结束后,判断当前服务器最终是什么状态然后进行响应的流程处理;
2. 创建选举算法QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
            
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        qcm = createCnxnManager();
        // 等待连接请求的侦听器线程
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();
            // 构造快速选举领导者算法对象
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}
  • case 3其他三种选举算法都已经过期,默认快速选举算法;
  • createCnxnManager()创建QuorumCnxManager,用来跟其他服务器进行底层通信管理;
  • listener.start()启动连接监听器,默认监听2888端口,等待其他服务器创建连接进行投票选主;
  • new FastLeaderElection创建快速选举算法,默认会启动处理发送选票和接受选票的线程
3. Listener.run
public void run() {
    // 重试次数
    int numRetries = 0;
    InetSocketAddress addr;
    while((!shutdown) && (numRetries < 3)){
        try {
            ss = new ServerSocket();
            ss.setReuseAddress(true);
            // 监听所有网卡,需要配置quorumListenOnAllIPS=true
            if (listenOnAllIPs) {
                // Map<Long, QuorumPeer.QuorumServer> view :: config.getServers()
                int port = view.get(QuorumCnxManager.this.mySid).electionAddr.getPort();
                addr = new InetSocketAddress(port);
            } else {
                addr = view.get(QuorumCnxManager.this.mySid).electionAddr;
            }
            LOG.info("My election bind port: " + addr.toString());
            setName(view.get(QuorumCnxManager.this.mySid).electionAddr.toString());
            ss.bind(addr);
            while (!shutdown) {
                // 接收其他服务器连接请求,启动第一台时并不会接收到其他服务器连接,因此会阻塞
                Socket client = ss.accept();
                setSockOpts(client);
                // 是否开启异步方式接收连接
                if (quorumSaslAuthEnabled) {
                    // 其实就是在线程池中执行 receiveConnection(client);
                    receiveConnectionAsync(client);
                } else {
                    // 同步接收连接
                    receiveConnection(client);
                }

                numRetries = 0;
            }
        }
        ......
    }
}
  • ss.setReuseAddress(true)一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。SO_REUSEADDR用于对TCP套接字处于TIME_WAIT状态下的socket,才可以重复绑定使用。server程序总是应该在调用bind()之前设置SO_REUSEADDR套接字选项。TCP,先调用close()的一方会进入TIME_WAIT状态;
  • ss.bind(addr)绑定监听地址,默认端口2888;
  • ss.accept()接收其他服务器连接请求,启动第一台时并不会接收到其他服务器连接,因此会阻塞;
  • setSockOpts(client)给Socket设置tcpNoDelay,keepAlive,timeout;
  • receiveConnection(client)有连接建立后,通过socket接受请求流;
4. 处理连接QuorumCnxManager.handleConnection
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
    Long sid = null;
    try {
        // Read server id
        sid = din.readLong();
        ......
    } 
    ......
    // 不允许myid小于自己的机器连自己,减少通信连接
    if (sid < this.mySid) {
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }
        // 关闭小sid连接自己的连接
        closeSocket(sock);
        // 自己去连接这个小sid
        connectOne(sid);
    } else {
        // 向挑战者服务器发送投票信息的线程
        SendWorker sw = new SendWorker(sock, sid);
        // 接收胜利者的投票信息的线程
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);
        SendWorker vsw = senderWorkerMap.get(sid);  
        if(vsw != null)
            vsw.finish();
        
        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
        
        sw.start();
        rw.start(); 
        return;
    }
}
  • sid < this.mySid关闭该socket并主动创建当前服务器到sid服务器的连接,即不允许myid小于自己的机器连自己,减少通信连接;
  • sw.finish()senderWorkerMap中已经存在该sid的SendWorker即已经建立连接,则关闭之前连接,终止原SendWorker线程;
  • closeSocket(sock)关闭socket;
  • connectOne(sid)创建当前服务器到sid服务器的连接;
  • new SendWorker(sock, sid)当sid >= this.mySid时,构造当前服务器向sid服务器发送投票信息的线程;
  • new RecvWorker(sock, din, sid, sw)构造当前服务器接收sid服务器发送投票信息的线程;
  • queueSendMap.putIfAbsent构造用于存放发送到sid服务器信息的队列ArrayBlockingQueue,放入queueSendMap中;
  • sw.start()/rw.start()启动SendWorker/RecvWorker线程;
5. 创建连接QuorumCnxManager.connectOne
synchronized public void connectOne(long sid){
    // sid的SendWorker不存在说明还没有建立当前server到sid server的连接
    if (!connectedToPeer(sid)){
        InetSocketAddress electionAddr;
        if (view.containsKey(sid)) {
            // sid server连接地址
            electionAddr = view.get(sid).electionAddr;
        } else {
            LOG.warn("Invalid server id: " + sid);
            return;
        }
        try {

            LOG.debug("Opening channel to server " + sid);
            Socket sock = new Socket();
            setSockOpts(sock);
            // 创建到sid server的连接
            sock.connect(view.get(sid).electionAddr, cnxTO);
            LOG.debug("Connected to server " + sid);
            if (quorumSaslAuthEnabled) {
                // 异步初始化连接,最终也是调用 initiateConnection(sock, sid);
                initiateConnectionAsync(sock, sid);
            } else {
                initiateConnection(sock, sid);
            }
        } 
        ......
}
  • view.get(sid).electionAddr从配置中获取该sid的地址;
  • sock.connect(view.get(sid).electionAddr, cnxTO);将socket连接到指定服务器;
  • initiateConnection(sock, sid)创建连接,并发送初始连接请求(sid);
6. 启动连接QuorumCnxManager.startConnection
private boolean startConnection(Socket sock, Long sid)
            throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        // Sending id and challenge
        dout = new DataOutputStream(sock.getOutputStream());
        // 发送mysid到sid server
        dout.writeLong(this.mySid);
        dout.flush();

        din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
    ......
}
  • dout.writeLong(this.mySid)把当前服务器sid发送到对方服务器;
  • new DataInputStream(new BufferedInputStream(sock.getInputStream()))等待对方服务器通过该socket发送消息;
  • ......下面流程跟QuorumCnxManager.handleConnection中逻辑一致,即已经建立连接后开始启动SendWorker/RecvWorker线程;
7. SendWorker.run
public void run() {
        threadCnt.incrementAndGet();
        try {
            ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
            // 如果bq为空,取出当前sid最后一次发送的投票信息,进行发送
            if (bq == null || isSendQueueEmpty(bq)) {
               ByteBuffer b = lastMessageSent.get(sid);
               if (b != null) {
                   LOG.debug("Attempting to send lastMessage to sid=" + sid);
                   // 尝试将lastMessage发送到sid
                   send(b);
               }
            }
        } catch (IOException e) {
            LOG.error("Failed to send last message. Shutting down thread.", e);
            this.finish();
        }
        
        try {
            while (running && !shutdown && sock != null) {
                ByteBuffer b = null;
                try {
                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                    if (bq != null) {
                        // 从bq取出投票信息,为空时阻塞1s,返回null
                        b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                    } else {
                        LOG.error("No queue of incoming messages for " + "server " + sid);
                        break;
                    }

                    if(b != null){
                        // 放入lastMessageSent
                        lastMessageSent.put(sid, b);
                        // 发送投票信息
                        send(b);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting for message on queue", e);
                }
            }
        } catch (Exception e) {
            LOG.warn("Exception when using channel: for id " + sid
                     + " my id = " + QuorumCnxManager.this.mySid + " error = " + e);
        }
        this.finish();
        LOG.warn("Send worker leaving thread");
    }
}
  • queueSendMap.get(sid)获取存放需要发送到sid服务器的消息的阻塞队列;
  • bq == null || isSendQueueEmpty(bq)如果自身或对方在读取/处理最后一条消息之前关闭了它们的连接(并退出线程),有可能会丢失该消息,因此bq为空时重复发送最后一次发送的消息;
  • running && !shutdown && sock != null死循环;
  • pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS)从bq取出消息,为空时阻塞1s,返回null;
  • send(b)把阻塞队列中取出的ByteBuffer(即需要发送出去的投票信息)发送出去;
8. RecvWorker.run
public void run() {
        threadCnt.incrementAndGet();
        try {
            while (running && !shutdown && sock != null) {
                /**
                 * Reads the first int to determine the length of the message
                 * 读取第一个int以确定消息的长度,没有数据时会阻塞
                 */
                int length = din.readInt();
                if (length <= 0 || length > PACKETMAXSIZE) {
                    throw new IOException("Received packet with invalid packet: " + length);
                }
                byte[] msgArray = new byte[length];
                din.readFully(msgArray, 0, length);
                // 读取投票信息
                ByteBuffer message = ByteBuffer.wrap(msgArray);
                // 投票信息封装成Message放到recvQueue里面
                addToRecvQueue(new Message(message.duplicate(), sid));
            }
        } 
        ......
    }
}
  • din.readInt()din为创建RecvWorker时构造的socket输入流DataInputStream;从中读取第一个int以确定消息的长度,没有数据时会阻塞;
  • ByteBuffer.wrap(msgArray)读取到ByteBuffer投票信息;
  • new Message(message.duplicate(), sid)把ByteBuffer包装成Message;
  • addToRecvQueue包装后的Message入队recvQueue;

至此Listen的启动流程分析完毕,下面回到2分析快速选举算法的启动

9. 启动快速选举算法FastLeaderElection.starter
private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    // 用于发送投票信息或ACKS
    sendqueue = new LinkedBlockingQueue<ToSend>();
    // 用于接受投票信息
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

Messenger(QuorumCnxManager manager) {
    // 处理发送投票信息线程
    this.ws = new WorkerSender(manager);
    Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();

    // 处理接收投票信息线程
    this.wr = new WorkerReceiver(manager);
    t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
}
  • sendqueue = new LinkedBlockingQueue<ToSend>()用于存放包装成ToSend的需要发送出去的消息的队列;
  • recvqueue = new LinkedBlockingQueue<Notification>()于存放包装成Notification的从其他服务器接受到的消息的队列;
  • WorkerSender.start()启动处理发送投票信息线程;
  • WorkerReceiver.start()启动处理接收投票信息线程;
10. WorkerSender.run
public void run() {
    while (!stop) {
        try {
            // 从sendqueue取出首位对象,3秒内没取出返回null
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;
            // 处理需要发送的ToSend
            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
}

void process(ToSend m) {
     ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), 
                                           m.leader,
                                           m.zxid, 
                                           m.electionEpoch, 
                                           m.peerEpoch);
       manager.toSend(m.sid, requestBuffer);
}
  • sendqueue.poll(3000, TimeUnit.MILLISECONDS)从sendqueue取出头元素(QuorumPeer.run中选举时或WorkerReceiver.run中选票PK后发送选票信息时放入),3秒内没取出返回null;
  • process(m)处理需要发送的ToSend;
  • buildMsgToSend转成ByteBuffer;
  • manager.toSend委托QuorumCnxManager进行发送;
11. 发送请求 QuorumCnxManager.toSend
public void toSend(Long sid, ByteBuffer b) {
    if (this.mySid == sid) {
         b.position(0);
         // 向自己发送消息,直接入队recvQueue
         addToRecvQueue(new Message(b.duplicate(), sid));
    } else {
         // 需要发送的信息放入queueSendMap
         ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
         ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);
         if (bqExisting != null) {
             addToSendQueue(bqExisting, b);
         } else {
             addToSendQueue(bq, b);
         }
         // 尝试与ID为sid的服务器建立连接
         connectOne(sid);   
    }
}
  • this.mySid == sid自己向自己发送信息,不用经过Socket,直接转成Message入队recvQueue;
  • addToSendQueue需要发送的ByteBuffer放到sid对应的ArrayBlockingQueue<ByteBuffer>中,在SendWorker.run中通过socket发送出去;
12. WorkerReceiver.run
public void run() {

        Message response;
        while (!stop) {
            // Sleeps on receive
            try{
                // 别的server发送的投票信息
                response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                if(response == null) continue;
                if(!self.getVotingView().containsKey(response.sid)){
                    Vote current = self.getCurrentVote();
                    ToSend notmsg = new ToSend(ToSend.mType.notification,
                            current.getId(),
                            current.getZxid(),
                            logicalclock.get(),
                            self.getPeerState(),
                            response.sid,
                            current.getPeerEpoch());

                    sendqueue.offer(notmsg);
                } else {
                    ......
                    boolean backCompatibility = (response.buffer.capacity() == 28);
                    response.buffer.clear();

                    // Instantiate Notification and set its attributes
                    Notification n = new Notification();
                    
                    // State of peer that sent this message
                    // 发送此消息的server状态
                    QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                    switch (response.buffer.getInt()) {
                    case 0:
                        ackstate = QuorumPeer.ServerState.LOOKING;
                        break;
                    case 1:
                        ackstate = QuorumPeer.ServerState.FOLLOWING;
                        break;
                    case 2:
                        ackstate = QuorumPeer.ServerState.LEADING;
                        break;
                    case 3:
                        ackstate = QuorumPeer.ServerState.OBSERVING;
                        break;
                    default:
                        continue;
                    }
                    
                    n.leader = response.buffer.getLong();
                    n.zxid = response.buffer.getLong();
                    n.electionEpoch = response.buffer.getLong();
                    n.state = ackstate;
                    n.sid = response.sid;
                    if(!backCompatibility){
                        n.peerEpoch = response.buffer.getLong();
                    } else {
                        if(LOG.isInfoEnabled()){
                            LOG.info("Backward compatibility mode, server id=" + n.sid);
                        }
                        n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
                    }

                    if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                        // 如果当前server是LOOKING状态,把当前Notification入队recvqueue
                        recvqueue.offer(n);

                        // 如果发送此消息的对等方也是LOOKING状态,并且其逻辑时钟滞后,则发送回通知
                        if((ackstate == QuorumPeer.ServerState.LOOKING)
                                && (n.electionEpoch < logicalclock.get())){
                            Vote v = getVote();
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                    v.getId(),
                                    v.getZxid(),
                                    logicalclock.get(), // electionEpoch
                                    self.getPeerState(),
                                    response.sid,
                                    v.getPeerEpoch()); // 提案的epoch
                            sendqueue.offer(notmsg);
                        }
                    } else {
                        // 当前server不是LOOKING,对方是LOOKING,发回当前server认为的leader
                        Vote current = self.getCurrentVote();
                        if(ackstate == QuorumPeer.ServerState.LOOKING){
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Sending new notification. My id =  " +
                                        self.getId() + " recipient=" +
                                        response.sid + " zxid=0x" +
                                        Long.toHexString(current.getZxid()) +
                                        " leader=" + current.getId());
                            }
                            
                            ToSend notmsg;
                            if(n.version > 0x0) {
                                notmsg = new ToSend(
                                        ToSend.mType.notification,
                                        current.getId(),
                                        current.getZxid(),
                                        current.getElectionEpoch(),
                                        self.getPeerState(),
                                        response.sid,
                                        current.getPeerEpoch());
                                
                            } else {
                                Vote bcVote = self.getBCVote();
                                notmsg = new ToSend(
                                        ToSend.mType.notification,
                                        bcVote.getId(),
                                        bcVote.getZxid(),
                                        bcVote.getElectionEpoch(),
                                        self.getPeerState(),
                                        response.sid,
                                        bcVote.getPeerEpoch());
                            }
                            sendqueue.offer(notmsg);
                        }
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("Interrupted Exception while waiting for new message" +
                        e.toString());
            }
        }
        LOG.info("WorkerReceiver is down");
    }
}
  • manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS)从recvQueue中取Message;
  • !self.getVotingView().containsKey(response.sid)如果是Learner类型的服务器发送的消息,直接回复当前服务器为leader;
  • ackstate对方服务器状态;
  • recvqueue.offer(n)当前服务器是LOOKING状态,把Notification入队recvqueue;
  • ackstate == QuorumPeer.ServerState.LOOKING && (n.electionEpoch < logicalclock.get())当前服务器是LOOKING状态,并且对方也是LOOKING状态,并且其选举轮次滞后,则用当前proposedLeader, proposedZxid, proposedEpoch构造ToSend入队sendqueue,即回复对方;
  • ackstate == QuorumPeer.ServerState.LOOKING当前服务器不是LOOKING,对方是LOOKING,则回复当前服务器认为的leader,即上一节Follower重启的流程;

至此WorkerSender/WorkerReceiver线程启动的流程也分析完了,即1中 startLeaderElection()分析完了;下面分析1中super.start(),即QuorumPeer线程的执行

13. QuorumPeer.run
public void run() {
    ......
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                ......
                setBCVote(null);
                // lookForLeader()选主即当前server认为的leader
                setCurrentVote(makeLEStrategy().lookForLeader());
                ......
                break;
            case OBSERVING:
                ......
                LOG.info("OBSERVING");
                setObserver(makeObserver(logFactory));
                observer.observeLeader();
                ......
                break;
            case FOLLOWING:
                ......
                LOG.info("FOLLOWING");
                setFollower(makeFollower(logFactory));
                follower.followLeader();
                ......
                break;
            case LEADING:
                .......
                setLeader(makeLeader(logFactory));
                leader.lead();
                setLeader(null);
                .......
                break;
            }
        }
    } 
    ......
}
  • switch (getPeerState())不停遍历查看当前服务器状态,选举完成之前默认LOOKING状态;
  • makeLEStrategy().lookForLeader()通过快速选举算法进行选主;
    本节仅分析LOOKING状态,其他已经结束选主的状态处理后续单独分析
14. FastLeaderElection.lookForLeader
public Vote lookForLeader() throws InterruptedException {
    try {
        // 投票箱
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
        int notTimeout = finalizeWait;
        synchronized(this){
            // 逻辑时钟自增
            logicalclock.incrementAndGet();
            // 更新提议,如果当前learnerType是PARTICIPANT则proposedLeader设置为本机myid
            // 也就是说,在选举之初,所有server刚开始选的都是自己
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        // 读取config.servers,每个server构造一个ToSend放入sendqueue;此时会激活WorkerSender.run
        sendNotifications();
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    // 发送请求连接的信息,跟其他所有server建立连接
                    manager.connectAll();
                }
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            } else if(self.getVotingView().containsKey(n.sid)) {
                switch (n.state) {
                case LOOKING:
                    // If notification > current, replace and send messages out
                    // 对方epoch大于当前逻辑时钟,即选举轮次
                    if (n.electionEpoch > logicalclock.get()) {
                        // 当前轮次更新为对方epoch
                        logicalclock.set(n.electionEpoch);
                        // 清空票箱
                        recvset.clear();
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            // 更新提案的Leader、zxid、epoch
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        // 把当前认为的提案发送给其他所有server
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        // 对方epoch小于当前选举轮次,直接忽略它的投票信息
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        // 更新提案的Leader、zxid、epoch
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        // 把当前认为的提案发送给其他所有server
                        sendNotifications();
                    }

                    if(LOG.isDebugEnabled()){
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }
                    // 构造选票Vote放入票箱,对方sid -> 对方sid投出的选票
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    // 通过票箱和当前server的选票,判断当前server选票认为的leader否有过半的选票
                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) {

                        // Verify if there is any change in the proposed leader
                        // 验证提议的领导者是否有变化
                        while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)){
                                recvqueue.put(n);
                                break;
                            }
                        }
                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(proposedLeader,
                                                    proposedZxid,
                                                    logicalclock.get(),
                                                    proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch together.
                     */
                    if(n.electionEpoch == logicalclock.get()){
                        recvset.put(n.sid, new Vote(n.leader,
                                                      n.zxid,
                                                      n.electionEpoch,
                                                      n.peerEpoch));
                       
                        if(ooePredicate(recvset, outofelection, n)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(n.leader, 
                                    n.zxid, 
                                    n.electionEpoch, 
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                                                        n.leader,
                                                        n.zxid,
                                                        n.electionEpoch,
                                                        n.peerEpoch,
                                                        n.state));
       
                    if(ooePredicate(outofelection, outofelection, n)) {
                        synchronized(this){
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                                n.zxid,
                                                n.electionEpoch,
                                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        try {
            if(self.jmxLeaderElectionBean != null){
                MBeanRegistry.getInstance().unregister(
                        self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
        LOG.debug("Number of connection processing threads: {}",
                manager.getConnectionThreadCount());
    }
}
  • recvset票箱,对方sid -> 对方sid投出的选票;
  • logicalclock.incrementAndGet()逻辑时钟,即选举轮次加1;
  • updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch())更新提案,在选举之初,所有server刚开始选的都是自己;
  • sendNotifications()读取config.servers,为每个server构造一个ToSend放入sendqueue,此时先启动的服务器就会在Listener.run的accept中收到请求;

-----LOOKING------

  • recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS)从recvqueue中取出Notification,200ms超时返回null;
  • manager.connectAll()发送请求连接的信息,跟其他所有server建立连接;
  • n.electionEpoch > logicalclock.get()对方选举轮次大于当前服务器:
    1. logicalclock.set(n.electionEpoch) 更新当前logicalclock为对方轮次;
    2. recvset.clear()当前轮次投票已经有结果,清空票箱;
    3. totalOrderPredicate选票PK,先比对epoch,再比对zxid,再比对id;
    4. updateProposal更新提案,更新为PK获胜的服务器的选票;
    5. sendNotifications()把刚更新后的提案广播给其他服务器;
  • n.electionEpoch < logicalclock.get()对方选举轮次小于当前服务器,忽略该选票;
  • n.electionEpoch = logicalclock.get()并且对方选票获胜:
    1. updateProposal更新提案,更新为PK获胜的服务器的选票;
    2. sendNotifications()把刚更新后的提案广播给其他服务器;
  • recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch))构造选票Vote放入票箱;
  • termPredicate通过票箱和当前server的选票,判断当前server选票认为的leader否有过半的选票;
  • while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null)在完成选举之前会再等待finalizeWait时间,如果在这段时间收到更新的vote,继续执行选举算法;防止长时间的消息发送延迟导致选举出两个leader的问题;
  • n == nullfinalizeWait时间后仍然没有消息,则结束选举,清空票箱返回endVote;

-----OBSERVING------

  • OBSERVING若一个server可以接受到n.state为OBSERVING状态的通知,说明该server是刚刚挂掉的Leader,内容是心跳通讯;

-----FOLLOWING/LEADING------

  • FOLLOWING/LEADING有三种场景会出现FOLLOWING或LEADING状态消息:
    1.新server(非Observer)加入到正常运行的集群中,其初始状态为LOOKING,默认调用lookForLeader()方法发送推荐自己为leader的消息,当前集群中的Leader和follower收到消息后会给其回复通知,通知的状态分别是FOLLOWING和LEADING;
    2.当前Leader宕机,并不是所有的follower都同时能够感知到Leader挂掉,先感知到的server改变状态为LOOKING,并发送消息给其它server,但其它server还未感知到,所以它们回复给感知到的server的通知状态是FOLLOWING;
    3.本轮选举中其它server已经选举出了新的Leader,并且已经改变了状态,但还没有通知到当前server,已经选举完毕的server向当前server发送通知的状态就是LEADING或FOLLOWING;

  • n.electionEpoch == logicalclock.get()即上面情况3的场景,此时选票过半,则结束选举;

  • outofelection.put此时为正常运行的集群,有新的server加入的场景,这是即LOOKING以外的状态,所有存入到outofelection集合中;、

  • ooePredicate(outofelection, outofelection, n)正常运行的集群,有新的server加入,该server收到过半的leader选票,则结束选举;
    至此,整个选主流程分析完毕
    -----------over----------

相关文章

网友评论

      本文标题:Zookeeper(六)-选主-源码分析

      本文链接:https://www.haomeiwen.com/subject/xzkroktx.html