美文网首页
zk源码阅读49:CommitProcessor源码解析

zk源码阅读49:CommitProcessor源码解析

作者: 赤子心_d709 | 来源:发表于2017-08-31 12:43 被阅读248次

摘要

事务提交处理器。对于非事务请求,该处理器会直接将其交付给下一级处理器处理;对于事务请求,其会等待集群内针对Proposal的投票直到该Proposal可被提交,利用CommitProcessor,每个服务器都可以很好地控制对事务请求的顺序处理。

属性

    private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);

    /**
     * Requests that we are holding until the commit comes in.
     */
    LinkedList<Request> queuedRequests = new LinkedList<Request>();//请求队列

    /**
     * Requests that have been committed.
     */
    LinkedList<Request> committedRequests = new LinkedList<Request>();

    RequestProcessor nextProcessor;//下一个处理器
    ArrayList<Request> toProcess = new ArrayList<Request>();//待处理的队列

    /**
     * This flag indicates whether we need to wait for a response to come back from the
     * leader or we just let the sync operation flow through like a read. The flag will
     * be true if the CommitProcessor is in a Leader pipeline.
     */
    boolean matchSyncs;//看sync的请求是等待leader回复,还是说直接处理,像读请求一样。对于leader是false,对于learner是true
    
    volatile boolean finished = false;

说明:

commitProcessor区分事务请求和非事务请求
matchSyncs 在leader端是false,learner端是true,因为learner端sync请求需要等待leader回复,而leader端本身则不需要

函数

构造函数

    public CommitProcessor(RequestProcessor nextProcessor, String id,
            boolean matchSyncs, ZooKeeperServerListener listener) {
        super("CommitProcessor:" + id, listener);
        this.nextProcessor = nextProcessor;
        this.matchSyncs = matchSyncs;
    }

processRequest

处理请求

    synchronized public void processRequest(Request request) {
        // request.addRQRec(">commit");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing request:: " + request);
        }
        
        if (!finished) {
            queuedRequests.add(request);//生产到请求队列
            notifyAll();
        }
    }

注意上锁

commit

提交请求请求

   synchronized public void commit(Request request) {//事务请求提交
        if (!finished) {//只要没有结束
            if (request == null) {
                LOG.warn("Committed a null!",
                         new Exception("committing a null! "));
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Committing request:: " + request);
            }
            committedRequests.add(request);//进入已提交队列
            notifyAll();//通知
        }
    }

shutdown

关闭

    public void shutdown() {
        LOG.info("Shutting down");
        synchronized (this) {
            finished = true;
            queuedRequests.clear();
            notifyAll();
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }

run

核心的线程方法,先贴代码再分析

@Override
    public void run() {
        try {
            Request nextPending = null;//下一个未处理的事务请求(不含leader端的sync请求),只要为null,都会while循环从queuedRequests里面找到第一个事务请求,或者直到队列为空
            while (!finished) {//只要没有shutdown
                int len = toProcess.size();
                for (int i = 0; i < len; i++) {
                    nextProcessor.processRequest(toProcess.get(i));//待处理队列交给下个处理器,按顺序处理
                }
                toProcess.clear();//队列清空
                synchronized (this) {//注意这里上锁,不会出现执行到过程中,queuedRequests的size变了
                    if ((queuedRequests.size() == 0 || nextPending != null) //这部分结合尾部的while来读,要么 请求队列remove干净,要么从中找到一个事务请求,赋值给nextPending, 不允许size>0且nextPending == null的情况
                            && committedRequests.size() == 0) {//且 没有已提交事务
                        wait();
                        continue;
                    }
                    // First check and see if the commit came in for the pending
                    // request
                    if ((queuedRequests.size() == 0 || nextPending != null)// 不允许size>0且nextPending == null的情况
                            && committedRequests.size() > 0) {//如果有 已提交的请求
                        Request r = committedRequests.remove();
                        /*
                         * We match with nextPending so that we can move to the
                         * next request when it is committed. We also want to
                         * use nextPending because it has the cnxn member set
                         * properly.
                         */
                        if (nextPending != null
                                && nextPending.sessionId == r.sessionId
                                && nextPending.cxid == r.cxid) {//如果和nextPending匹配
                            // we want to send our version of the request.
                            // the pointer to the connection in the request
                            nextPending.hdr = r.hdr;
                            nextPending.txn = r.txn;
                            nextPending.zxid = r.zxid;
                            toProcess.add(nextPending);//加入待处理队列
                            nextPending = null;//下一个pend的请求清空
                        } else {
                            // this request came from someone else so just
                            // send the commit packet
                            toProcess.add(r);//这种情况是nextPending还没有来的及设置,nextPending==null的情况(代码应该再细分一下if else),不可能出现nextPending!=null而走到了这里的情况(算异常)
                        }
                    }
                }

                // We haven't matched the pending requests, so go back to
                // waiting
                if (nextPending != null) {//如果还有 未处理的事务请求(不含leader端的sync请求),就continue
                    continue;
                }

                synchronized (this) {//这一段的目的是找到一个 给nextPending赋值
                    // Process the next requests in the queuedRequests
                    while (nextPending == null && queuedRequests.size() > 0) {//只要queuedRequests队列不空,从中找到第一个 事务请求(不含leader端的sync请求),前面的其他请求全部加入待处理队列
                        Request request = queuedRequests.remove();
                        switch (request.type) {
                        case OpCode.create:
                        case OpCode.delete:
                        case OpCode.setData:
                        case OpCode.multi:
                        case OpCode.setACL:
                        case OpCode.createSession:
                        case OpCode.closeSession:
                            nextPending = request;
                            break;//大部分事务请求直接赋给nextPending,然后break
                        case OpCode.sync:
                            if (matchSyncs) {//如果需要等leader返回,该值learner端为true
                                nextPending = request;
                            } else {
                                toProcess.add(request);//不需要的话,直接加入待处理队列里
                            }
                            break;//leader端matchSyncs是false,learner端才需要等leader回复,这里也break
                        default:
                            toProcess.add(request);//非事务请求,都直接加入待处理队列
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted exception while waiting", e);
        } catch (Throwable e) {
            LOG.error("Unexpected exception causing CommitProcessor to exit", e);
        }
        LOG.info("CommitProcessor exited loop!");
    }

注意各种上锁控制并发

里面的代码写的晦涩难懂,是我看过zk代码里面最想吐槽的代码了。现在最新版本的zk这个类已经改的面目全非了。
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

代码可以拆成几个部分

1,2部分 3,4部分

完全可以按照 1,4,2,3的顺序来读,

1部分:遍历toProcess队列(非事务请求或者已经提交的事务请求),交给下一个处理器处理,清空
4部分:只要不存在pend住的事务请求并且请求队列不为空,一直遍历请求队列直到出现第一个事务请求或者队列遍历完,其间所有非事务请求全部加入toProcess队列,代表可以直接交给下一个处理器处理的
2部分:在请求队列remove干净或者找到了事务请求的情况下,
如果没有提交的请求,就等待。
如果有提交的请求,取出来,看和之前记录的下一个pend的请求是否match。
  match的话,进入toProcess队列,nextPending置空
  不match的话,(基本上是nextPending为null,不会出现不为null且不匹配的情况),进入toProcess处理
3部分:如果 nextPending非空,就不用再去遍历请求队列,找到下一个事务请求(即4部分),因此continue掉

思考

事务连续性怎么保证的

《paoxs到zk》说这里保证的,对此强烈怀疑。
事务连续性看代码应该是各角色机器单线程处理保证的。(refer中 新版本就多线程了,一写多读)
因为run方法2部分里面的else根本没有检测和nextPending不match的情况
因此个人理解2部分的else中,基本都是nextPending为null,属于还没来的及找nextPending,然后commit方法就被调用了,就直接处理了
完善的写法应该是这里写清楚,至少做一个不为空且不match的检查才好

else里的不合理

run方法第2部分if语句的理解

(queuedRequests.size() == 0 || nextPending != null)

这个是针对第4部分while循环的条件,取!
就是说要么队列清空了 要么 找到nextPending
不允许 请求队列不为空 且不存在 nextPending的情况

run方法nextPending的意义

下一个要处理的事务请求

吐槽

run方法

这是我看zk以来最糟心的代码。
顺序上面已经说过了,按1,4,2,3来看
然后if条件,第二部分直接把
(queuedRequests.size() == 0 || nextPending != null)
抽到上层去不行吗,一定要写两遍吗。

然后else根本没有完成检查,让人一开始根本搞不清楚nextPending的意义是什么,
反正匹配不匹配,大家都进入toProcess队列。何必要写nextPending。

看起来像是保证事务顺序的,实际上事务顺序是单线程保证的,和nextPending也没关系。

refer

http://www.jianshu.com/p/68c91b42ccd8
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
《paxos到zk》

相关文章

网友评论

      本文标题:zk源码阅读49:CommitProcessor源码解析

      本文链接:https://www.haomeiwen.com/subject/rqvxjxtx.html