美文网首页
zk源码阅读50:ToBeAppliedRequestProce

zk源码阅读50:ToBeAppliedRequestProce

作者: 赤子心_d709 | 来源:发表于2017-08-31 13:20 被阅读90次

介绍

该处理器有一个toBeApplied队列,用来存储那些已经被CommitProcessor处理过的可被提交的Proposal。其会将这些请求交付给FinalRequestProcessor处理器处理,待其处理完后,再将其从toBeApplied队列中移除。

源码在Leader.ToBeAppliedRequestProcessor,是leader的内部类

只有leader角色才有,其他角色没有该处理器,在leader中的位置如下

在leader中的位置

源码

代码很简单

static class ToBeAppliedRequestProcessor implements RequestProcessor {
        private RequestProcessor next;

        private ConcurrentLinkedQueue<Proposal> toBeApplied;//注意是concurrent的

        /**
         * This request processor simply maintains the toBeApplied list. For
         * this to work next must be a FinalRequestProcessor and
         * FinalRequestProcessor.processRequest MUST process the request
         * synchronously!
         * 
         * @param next
         *                a reference to the FinalRequestProcessor
         */
        ToBeAppliedRequestProcessor(RequestProcessor next,
                ConcurrentLinkedQueue<Proposal> toBeApplied) {
            if (!(next instanceof FinalRequestProcessor)) {//下一个必须是FinalRequestProcessor
                throw new RuntimeException(ToBeAppliedRequestProcessor.class
                        .getName()
                        + " must be connected to "
                        + FinalRequestProcessor.class.getName()
                        + " not "
                        + next.getClass().getName());
            }
            this.toBeApplied = toBeApplied;
            this.next = next;
        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
         */
        public void processRequest(Request request) throws RequestProcessorException {
            // request.addRQRec(">tobe");
            next.processRequest(request);
            Proposal p = toBeApplied.peek();
            if (p != null && p.request != null
                    && p.request.zxid == request.zxid) {
                toBeApplied.remove();//进行相关清除操作
            }
        }

        /*
         * (non-Javadoc)
         * 
         * @see org.apache.zookeeper.server.RequestProcessor#shutdown()
         */
        public void shutdown() {
            LOG.info("Shutting down");
            next.shutdown();
        }
    }

思考

这个类的意义是什么

这个类就是消费toBeApplied队列的
之前讲leader源码的时候,processAck是用来生产toBeApplied队列的(用来代表),见源码42分析,来告诉leader,某事务请求已经被某个服务器写完了日志,发送ACK了

toBeApplied消费时的检查

其实这个和上一节讲的CommitProcessor有点像,本身是想维护一个事务顺序的,但是这里又没有对不按事务顺序的逻辑进行检查

没有检查

可以参考refer最新的代码,会检查事务,然后检查顺序,这样才体现出来toBeApplied的意义

refer

《paxos到zk》
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/Leader.java

相关文章

网友评论

      本文标题:zk源码阅读50:ToBeAppliedRequestProce

      本文链接:https://www.haomeiwen.com/subject/bfrqjxtx.html