美文网首页
zk源码阅读48:AckRequestProcessor和Sen

zk源码阅读48:AckRequestProcessor和Sen

作者: 赤子心_d709 | 来源:发表于2017-08-29 20:14 被阅读132次

摘要

在SyncRequestProcessor完成日志记录之后,不同角色服务器需要告知ACK代表是如日志记录完成
在leader端,该处理器就是AckRequestProcessor
在Follower端,该处理器就是SendAckRequestProcessor
在observer端,由于observer并没有投票权,不需要对应的处理器

本节主要讲解

AckRequestProcessor的意义,源码
SendAckRequestProcessor的意义,源码
以及两者的异同

AckRequestProcessor

是leader端的处理器
负责在SyncRequestProcessor完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。

源码很简单

class AckRequestProcessor implements RequestProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(AckRequestProcessor.class);
    Leader leader;

    AckRequestProcessor(Leader leader) {
        this.leader = leader;
    }

    /**
     * Forward the request as an ACK to the leader
     */
    public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        if(self != null)
            leader.processAck(self.getId(), request.zxid, null);
        else
            LOG.error("Null QuorumPeer");
    }

    public void shutdown() {
        // XXX No need to do anything
    }
}

核心就是调用leader.processAck函数

这是因为该类本身只有leader端有,并不需要进行网络IO,直接处理ack即可

SendAckRequestProcessor

是Follower端的处理器
负责在SyncRequestProcessor完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。

源码如下

public class SendAckRequestProcessor implements RequestProcessor, Flushable {
    private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
    
    Learner learner;

    SendAckRequestProcessor(Learner peer) {
        this.learner = peer;
    }

    public void processRequest(Request si) {
        if(si.type != OpCode.sync){//不是sync请求就处理
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
                null);//生成ACK包
            try {
                learner.writePacket(qp, false);//发送给leader
            } catch (IOException e) {
                LOG.warn("Closing connection to leader, exception during packet send", e);
                try {
                    if (!learner.sock.isClosed()) {
                        learner.sock.close();
                    }
                } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug("Ignoring error closing the connection", e1);
                }
            }
        }
    }
    
    public void flush() throws IOException {
        try {
            learner.writePacket(null, true);//发送一个空的packet
        } catch(IOException e) {
            LOG.warn("Closing connection to leader, exception during packet send", e);
            try {
                if (!learner.sock.isClosed()) {
                    learner.sock.close();
                }
            } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug("Ignoring error closing the connection", e1);
            }
        }
    }

    public void shutdown() {
        // Nothing needed
    }

}

补充

leader端:AckRequestProcessor调用了Leader#processAck
Follower端:SendAckRequestProcessor 发出ACK后,被LearnerHandler处理到,最终也是调用Leader#processAck,之前在leader的代码讲到过(源码解析42)
这里把该函数在列

synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {//针对提议回复ACK的处理逻辑,如果过半验证了就通知所有Learner
        if (LOG.isTraceEnabled()) {//log相关
            LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
            for (Proposal p : outstandingProposals.values()) {
                long packetZxid = p.packet.getZxid();
                LOG.trace("outstanding proposal: 0x{}",
                        Long.toHexString(packetZxid));
            }
            LOG.trace("outstanding proposals all");
        }

        if ((zxid & 0xffffffffL) == 0) {
            /*
             * We no longer process NEWLEADER ack by this method. However,
             * the learner sends ack back to the leader after it gets UPTODATE
             * so we just ignore the message.
             */
            return;
        }
    
        if (outstandingProposals.size() == 0) {//没有处理当中的提议
            if (LOG.isDebugEnabled()) {
                LOG.debug("outstanding is 0");
            }
            return;
        }
        if (lastCommitted >= zxid) {//提议已经处理过了
            if (LOG.isDebugEnabled()) {
                LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                        Long.toHexString(lastCommitted), Long.toHexString(zxid));
            }
            // The proposal has already been committed
            return;
        }
        Proposal p = outstandingProposals.get(zxid);//获取zxid对应的提议
        if (p == null) {
            LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
                    Long.toHexString(zxid), followerAddr);
            return;
        }
        
        p.ackSet.add(sid);//对应提议的ack集合添加sid记录
        if (LOG.isDebugEnabled()) {
            LOG.debug("Count for zxid: 0x{} is {}",
                    Long.toHexString(zxid), p.ackSet.size());
        }
        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){//过半回复ack
            if (zxid != lastCommitted+1) {
                LOG.warn("Commiting zxid 0x{} from {} not first!",
                        Long.toHexString(zxid), followerAddr);
                LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
            }
            outstandingProposals.remove(zxid);//该proposal已经处理完了
            if (p.request != null) {
                toBeApplied.add(p);//即将应用的队列 添加该proposal
            }

            if (p.request == null) {
                LOG.warn("Going to commmit null request for proposal: {}", p);
            }
            commit(zxid);//提交,发给所有参与者
            inform(p);//告诉所有观察者
            zk.commitProcessor.commit(p.request);//leader自己也提交,调用commitProcessor相关逻辑
            if(pendingSyncs.containsKey(zxid)){//处理pending住的同步请求
                for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                    sendSync(r);//发送同步请求给LearnerSyncRequest记录的server
                }
            }
        }
    }

就是收集ACK,如果通过过半验证了,就告诉所有参与者commit,告诉所有observer INFORM,然后调用commitProcessor逻辑,下面一节讲。涉及LearnerSyncRequest 部分是处理client端sync请求的,
可以参照源码阅读42理解。

思考

AckRequestProcessor和SendAckRequestProcessor 两个类的异同

区别是:

前者位于leader端,属性就记录Leader,不会进行网络IO
后者位于Follower端,属性就记录Learner,会给leader发送ACK
都代表事务日志记录完毕

共同是:

两者都没有用生产消费模型,shutdown函数就是空的

refer

《paxos到zk》

相关文章

网友评论

      本文标题:zk源码阅读48:AckRequestProcessor和Sen

      本文链接:https://www.haomeiwen.com/subject/brjadxtx.html