美文网首页
zk源码阅读49:CommitProcessor源码解析

zk源码阅读49:CommitProcessor源码解析

作者: 赤子心_d709 | 来源:发表于2017-08-31 12:43 被阅读248次

    摘要

    事务提交处理器。对于非事务请求,该处理器会直接将其交付给下一级处理器处理;对于事务请求,其会等待集群内针对Proposal的投票直到该Proposal可被提交,利用CommitProcessor,每个服务器都可以很好地控制对事务请求的顺序处理。

    属性

        private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
    
        /**
         * Requests that we are holding until the commit comes in.
         */
        LinkedList<Request> queuedRequests = new LinkedList<Request>();//请求队列
    
        /**
         * Requests that have been committed.
         */
        LinkedList<Request> committedRequests = new LinkedList<Request>();
    
        RequestProcessor nextProcessor;//下一个处理器
        ArrayList<Request> toProcess = new ArrayList<Request>();//待处理的队列
    
        /**
         * This flag indicates whether we need to wait for a response to come back from the
         * leader or we just let the sync operation flow through like a read. The flag will
         * be true if the CommitProcessor is in a Leader pipeline.
         */
        boolean matchSyncs;//看sync的请求是等待leader回复,还是说直接处理,像读请求一样。对于leader是false,对于learner是true
        
        volatile boolean finished = false;
    

    说明:

    commitProcessor区分事务请求和非事务请求
    matchSyncs 在leader端是false,learner端是true,因为learner端sync请求需要等待leader回复,而leader端本身则不需要
    
    

    函数

    构造函数

        public CommitProcessor(RequestProcessor nextProcessor, String id,
                boolean matchSyncs, ZooKeeperServerListener listener) {
            super("CommitProcessor:" + id, listener);
            this.nextProcessor = nextProcessor;
            this.matchSyncs = matchSyncs;
        }
    

    processRequest

    处理请求

        synchronized public void processRequest(Request request) {
            // request.addRQRec(">commit");
            if (LOG.isDebugEnabled()) {
                LOG.debug("Processing request:: " + request);
            }
            
            if (!finished) {
                queuedRequests.add(request);//生产到请求队列
                notifyAll();
            }
        }
    

    注意上锁

    commit

    提交请求请求

       synchronized public void commit(Request request) {//事务请求提交
            if (!finished) {//只要没有结束
                if (request == null) {
                    LOG.warn("Committed a null!",
                             new Exception("committing a null! "));
                    return;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Committing request:: " + request);
                }
                committedRequests.add(request);//进入已提交队列
                notifyAll();//通知
            }
        }
    

    shutdown

    关闭

        public void shutdown() {
            LOG.info("Shutting down");
            synchronized (this) {
                finished = true;
                queuedRequests.clear();
                notifyAll();
            }
            if (nextProcessor != null) {
                nextProcessor.shutdown();
            }
        }
    

    run

    核心的线程方法,先贴代码再分析

    @Override
        public void run() {
            try {
                Request nextPending = null;//下一个未处理的事务请求(不含leader端的sync请求),只要为null,都会while循环从queuedRequests里面找到第一个事务请求,或者直到队列为空
                while (!finished) {//只要没有shutdown
                    int len = toProcess.size();
                    for (int i = 0; i < len; i++) {
                        nextProcessor.processRequest(toProcess.get(i));//待处理队列交给下个处理器,按顺序处理
                    }
                    toProcess.clear();//队列清空
                    synchronized (this) {//注意这里上锁,不会出现执行到过程中,queuedRequests的size变了
                        if ((queuedRequests.size() == 0 || nextPending != null) //这部分结合尾部的while来读,要么 请求队列remove干净,要么从中找到一个事务请求,赋值给nextPending, 不允许size>0且nextPending == null的情况
                                && committedRequests.size() == 0) {//且 没有已提交事务
                            wait();
                            continue;
                        }
                        // First check and see if the commit came in for the pending
                        // request
                        if ((queuedRequests.size() == 0 || nextPending != null)// 不允许size>0且nextPending == null的情况
                                && committedRequests.size() > 0) {//如果有 已提交的请求
                            Request r = committedRequests.remove();
                            /*
                             * We match with nextPending so that we can move to the
                             * next request when it is committed. We also want to
                             * use nextPending because it has the cnxn member set
                             * properly.
                             */
                            if (nextPending != null
                                    && nextPending.sessionId == r.sessionId
                                    && nextPending.cxid == r.cxid) {//如果和nextPending匹配
                                // we want to send our version of the request.
                                // the pointer to the connection in the request
                                nextPending.hdr = r.hdr;
                                nextPending.txn = r.txn;
                                nextPending.zxid = r.zxid;
                                toProcess.add(nextPending);//加入待处理队列
                                nextPending = null;//下一个pend的请求清空
                            } else {
                                // this request came from someone else so just
                                // send the commit packet
                                toProcess.add(r);//这种情况是nextPending还没有来的及设置,nextPending==null的情况(代码应该再细分一下if else),不可能出现nextPending!=null而走到了这里的情况(算异常)
                            }
                        }
                    }
    
                    // We haven't matched the pending requests, so go back to
                    // waiting
                    if (nextPending != null) {//如果还有 未处理的事务请求(不含leader端的sync请求),就continue
                        continue;
                    }
    
                    synchronized (this) {//这一段的目的是找到一个 给nextPending赋值
                        // Process the next requests in the queuedRequests
                        while (nextPending == null && queuedRequests.size() > 0) {//只要queuedRequests队列不空,从中找到第一个 事务请求(不含leader端的sync请求),前面的其他请求全部加入待处理队列
                            Request request = queuedRequests.remove();
                            switch (request.type) {
                            case OpCode.create:
                            case OpCode.delete:
                            case OpCode.setData:
                            case OpCode.multi:
                            case OpCode.setACL:
                            case OpCode.createSession:
                            case OpCode.closeSession:
                                nextPending = request;
                                break;//大部分事务请求直接赋给nextPending,然后break
                            case OpCode.sync:
                                if (matchSyncs) {//如果需要等leader返回,该值learner端为true
                                    nextPending = request;
                                } else {
                                    toProcess.add(request);//不需要的话,直接加入待处理队列里
                                }
                                break;//leader端matchSyncs是false,learner端才需要等leader回复,这里也break
                            default:
                                toProcess.add(request);//非事务请求,都直接加入待处理队列
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                LOG.warn("Interrupted exception while waiting", e);
            } catch (Throwable e) {
                LOG.error("Unexpected exception causing CommitProcessor to exit", e);
            }
            LOG.info("CommitProcessor exited loop!");
        }
    

    注意各种上锁控制并发

    里面的代码写的晦涩难懂,是我看过zk代码里面最想吐槽的代码了。现在最新版本的zk这个类已经改的面目全非了。
    https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

    代码可以拆成几个部分

    1,2部分 3,4部分

    完全可以按照 1,4,2,3的顺序来读,

    1部分:遍历toProcess队列(非事务请求或者已经提交的事务请求),交给下一个处理器处理,清空
    4部分:只要不存在pend住的事务请求并且请求队列不为空,一直遍历请求队列直到出现第一个事务请求或者队列遍历完,其间所有非事务请求全部加入toProcess队列,代表可以直接交给下一个处理器处理的
    2部分:在请求队列remove干净或者找到了事务请求的情况下,
    如果没有提交的请求,就等待。
    如果有提交的请求,取出来,看和之前记录的下一个pend的请求是否match。
      match的话,进入toProcess队列,nextPending置空
      不match的话,(基本上是nextPending为null,不会出现不为null且不匹配的情况),进入toProcess处理
    3部分:如果 nextPending非空,就不用再去遍历请求队列,找到下一个事务请求(即4部分),因此continue掉
    

    思考

    事务连续性怎么保证的

    《paoxs到zk》说这里保证的,对此强烈怀疑。
    事务连续性看代码应该是各角色机器单线程处理保证的。(refer中 新版本就多线程了,一写多读)
    因为run方法2部分里面的else根本没有检测和nextPending不match的情况
    因此个人理解2部分的else中,基本都是nextPending为null,属于还没来的及找nextPending,然后commit方法就被调用了,就直接处理了
    完善的写法应该是这里写清楚,至少做一个不为空且不match的检查才好

    else里的不合理

    run方法第2部分if语句的理解

    (queuedRequests.size() == 0 || nextPending != null)
    

    这个是针对第4部分while循环的条件,取!
    就是说要么队列清空了 要么 找到nextPending
    不允许 请求队列不为空 且不存在 nextPending的情况

    run方法nextPending的意义

    下一个要处理的事务请求

    吐槽

    run方法

    这是我看zk以来最糟心的代码。
    顺序上面已经说过了,按1,4,2,3来看
    然后if条件,第二部分直接把
    (queuedRequests.size() == 0 || nextPending != null)
    抽到上层去不行吗,一定要写两遍吗。

    然后else根本没有完成检查,让人一开始根本搞不清楚nextPending的意义是什么,
    反正匹配不匹配,大家都进入toProcess队列。何必要写nextPending。

    看起来像是保证事务顺序的,实际上事务顺序是单线程保证的,和nextPending也没关系。

    refer

    http://www.jianshu.com/p/68c91b42ccd8
    https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    《paxos到zk》

    相关文章

      网友评论

          本文标题:zk源码阅读49:CommitProcessor源码解析

          本文链接:https://www.haomeiwen.com/subject/rqvxjxtx.html