美文网首页
zookeeper集群的分布式事务请求处理过程

zookeeper集群的分布式事务请求处理过程

作者: sadamu0912 | 来源:发表于2020-05-30 23:02 被阅读0次

    集群处理请求分两种:事务和非事务,对于非事务,请求处理和单机类似,节点本地就可以完成数据的请求;事务请求需要提交给Leader处理Leader以投票的形式,等待半数的Follower的投票,完成同步后才将操作结果返回

    这里,无论什么模式、节点类型,处理客户端请求的都是ServerCnxnFactory的子类,默认为NIOServerCnxnFactory,只是其内部处理调用链的zkServer实例不同,单机模式为ZooKeeperServer的实例,其他类型的节点使用ZooKeeperServer类的子类. ZooKeeperServer的子类UML类图如下:


    zookeeper请求处理的各个processor.jpg

    1 这么多ZooKeeperServer的子类,一个事物请求来了,调用什么方法去处理事物

    1.1 在org.apache.zookeeper.server.quorum.QuorumPeer#run方法中,首先确定角色。

    while (running) {
                    switch (getPeerState()) {
                    case LOOKING:
                                 "省略n行代码====================="
                                "当peerState是LOOKING的时候,进行选举投票,选举出leader"
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } 
                        break;
                    case OBSERVING:
                        try {
                            LOG.info("OBSERVING");
                            "当选举过之后是OBSERVING 状态,那么实例化的是ObserverZooKeeperServer"
                            setObserver(makeObserver(logFactory));
                            "同步leader"
                            observer.observeLeader();
                        } 
                     "省略n行代码====================="
                        break;
                    case FOLLOWING:
                        try {
                            LOG.info("FOLLOWING");
                          "当选举过之后是FOLLOWING状态,那么实例化的是          
                          FollowerZooKeeperServer"
                            setFollower(makeFollower(logFactory));
                           "同步leader"
                            follower.followLeader();
                        }
                             "省略n行代码====================="
                        break;
                    case LEADING:
                        LOG.info("LEADING");
                        try {
                            setLeader(makeLeader(logFactory));
                            leader.lead();
                            setLeader(null);
                        }
                       "省略n行代码====================="
                        break;
                    }
                }
    

    从上面的代码可以看出,在选举成功之后,就确定了,每个服务器是什么状态,也就确定是什么ZooKeeperServer实例。

    1.2 对于每个ZooKeeperServer实例,他的业务处理链是不同的。

    责任链由setupRequestProcessors方法确定

    比如FollowerZooKeeperServer实例

    protected void setupRequestProcessors() {
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            commitProcessor = new CommitProcessor(finalProcessor,
                    Long.toString(getServerId()), true,
                    getZooKeeperServerListener());
            commitProcessor.start();
            firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
            ((FollowerRequestProcessor) firstProcessor).start();
            syncProcessor = new SyncRequestProcessor(this,
                    new SendAckRequestProcessor((Learner)getFollower()));
            syncProcessor.start();
        }
    

    firstProcessor 是FollowerRequestProcessor,下一个是CommitProcessor,再下一个是
    FinalRequestProcessor。而且还另外聚合了SyncRequestProcessor,下一个是SendAckRequestProcessor 。

    1.3 那这个处理链是什么时候确定的呢?

    case OBSERVING:
                        try {
    "实例化ObserverZookeeperServer"
                            setObserver(makeObserver(logFactory));
                      "从 leader那边同步代码,并且完成ObserverZookeeperServer的初始化,包括责任链的建立"
                            observer.observeLeader();
                        } 
                         "省略n行代码====================="
                        break;
                    case FOLLOWING:
                        try {
                            LOG.info("FOLLOWING");
    "实例化FollowerZookeeperServer"
                            setFollower(makeFollower(logFactory));
        "从 leader那边同步代码,并且完成FollowerZookeeperServer的初始化,包括责任链的建立"
                            follower.followLeader();
                        } 
      "省略n行代码====================="
                        break;
                    case LEADING:
                        LOG.info("LEADING");
                        try {
                            setLeader(makeLeader(logFactory));
                            
                            leader.lead();
                            setLeader(null);
                        } 
                        break;
                    }
    

    org.apache.zookeeper.server.quorum.Learner#syncWithLeader的469行 zk.startup(); org.apache.zookeeper.server.quorum.Leader#lead 431 行 startZkServer();
    调用下面的setupRequestProcessors方法,构建责任链。
    因为各自都继承了zookeeperServer(继承结构看上图),并且重写了setupRequestProcessors方法。
    所以这里实际上是调用了各种ZookeeperServer实例的setupRequestProcessors方法。

    public synchronized void startup() {
            if (sessionTracker == null) {
                createSessionTracker();
            }
            startSessionTracker();
            setupRequestProcessors();
    
            registerJMX();
    
            setState(State.RUNNING);
            notifyAll();
        }
    

    2 请求的入口在哪里?

    "org.apache.zookeeper.server.NIOServerCnxnFactory#run"
    当读写事件就绪时,NIOServerCnxn对象进行IO任务。
    else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                            c.doIO(k);
                        }
    
    "org.apache.zookeeper.server.ZooKeeperServer#processPacket"
    Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                      h.getType(), incomingBuffer, cnxn.getAuthInfo());
                    si.setOwner(ServerCnxn.me);
                    submitRequest(si);
    
    try {
                touch(si.cnxn);
                boolean validpacket = Request.isValid(si.type);
                if (validpacket) {
                  "丢给对应的firstProcessor去处理,事物逻辑。
    而对于不同的角色,比如说leader,对应的是LeaderZooKeeperServer 
    ,而 follower对应的是FollowerZookeeperServer"
                    firstProcessor.processRequest(si);
                    if (si.cnxn != null) {
                        incInProcess();
                    }
                } else {
                    LOG.warn("Received packet at server of unknown type " + si.type);
                    new UnimplementedRequestProcessor().processRequest(si);
                }
            }
    

    submitRequest之后,已经丢给具体的责任链去处理了。而不同角色,不同的ZOokeeperServer实例,对应的firstProcessor是不同的。

    大概流程图如下:


    zookeeper事务处理的事务流程.png

    3 当客户端请求到达leader的时候,事物的流程是怎么样的?

    leader的firstProcessor,从LeaderZooKeeperServer#setupRequestProcessors方法中,可以看出PrepRequestProcessor是firstProcessor。processRequest方法,只是把request对象添加到submittedRequests阻塞队列中。业务处理在run方法中。

    3.1 PrepRequestProcessor 对事物请求加事物头,非事物请求,checkSession

    while (true) {
                    Request request = submittedRequests.take();
                    pRequest(request);
                }
    
    switch (request.type) {
                    case OpCode.create:
                    CreateRequest createRequest = new CreateRequest();
    "如果是 create,delete ,set等改变内存数据库,zkDatabase的请求,转化成事物请求"
                    pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
                    break;
                    "省略n行代码====================="
    
                //create/close session don't require request record
                case OpCode.createSession:
                case OpCode.closeSession:
                    "因为zookeeper的事物请求都是leader处理的,
                      所以他的分布式唯一id,
                    只要在leader侧ks.getNextZxid(),
                  唯一即可。AtomicLong 类型,保证事物请求并发时,线程安全。"
                    pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                    break;
               "对内存数据库zkDatabase不做改变的,就checkSession"
                //All the rest don't need to create a Txn - just verify session
                case OpCode.sync:
                case OpCode.exists:
                case OpCode.getData:
                case OpCode.getACL:
                case OpCode.getChildren:
                case OpCode.getChildren2:
                case OpCode.ping:
                case OpCode.setWatches:
                    zks.sessionTracker.checkSession(request.sessionId,
                            request.getOwner());
                    break;
                default:
                    LOG.warn("unknown type " + request.type);
                    break;
      }
     request.zxid = zks.getZxid();
     nextProcessor.processRequest(request);
    

    3.2 ProposalRequestProcessor。对事物请求和非事物请求分流

    ProposalRequestProcessor#processRequest
    if(request instanceof LearnerSyncRequest){
                zks.getLeader().processSync((LearnerSyncRequest)request);
            } else {
                    "非事物请求交给下一个CommitProcessor"
                    nextProcessor.processRequest(request);
                "hdr不为空,说明是事物请求,委托给leader,发送proposal消息"
                if (request.hdr != null) {
                    // We need to sync and get consensus on any transactions
                    try {
                        zks.getLeader().propose(request);
                    } catch (XidRolloverException e) {
                        throw new RequestProcessorException(e.getMessage(), e);
                    }
                    "并且自己先持久化到txnLog日志里面"
                    syncProcessor.processRequest(request);
                }
            }
    
    3.2.2 首先讲一下,事物消息刷新到txnLog的过程。syncProcessor处理持久化事物日志的过程。
    public void run() {
            try {
                int logCount = 0;
    
                // we do this in an attempt to ensure that not all of the servers
                // in the ensemble take a snapshot at the same time
                "不让所有的zookeeperServer一起发起快照"
                setRandRoll(r.nextInt(snapCount/2));
                while (true) {
                    Request si = null;
                      "第一轮循环是toFlush为空,进入下面的si!=null 的判断"
                    if (toFlush.isEmpty()) {
                        si = queuedRequests.take();
                    } else {
                        si = queuedRequests.poll();
                        if (si == null) {
                            flush(toFlush);
                            continue;
                        }
                    }
                    if (si == requestOfDeath) {
                        break;
                    }
                    if (si != null) {
                        // track the number of records written to the log
                        "调用FileTxnLog#append,追加transaction log 到日志文件"
                        if (zks.getZKDatabase().append(si)) {
                            logCount++;
                            if (logCount > (snapCount / 2 + randRoll)) {
                                setRandRoll(r.nextInt(snapCount/2));
                                // roll the log
                                zks.getZKDatabase().rollLog();
                                // take a snapshot
                                if (snapInProcess != null && snapInProcess.isAlive()) {
                                    LOG.warn("Too busy to snap, skipping");
                                } else {
                                    snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                            public void run() {
                                                try {
                                                    zks.takeSnapshot();
                                                } catch(Exception e) {
                                                    LOG.warn("Unexpected exception", e);
                                                }
                                            }
                                        };
                                    snapInProcess.start();
                                }
                                logCount = 0;
                            }
                        } else if (toFlush.isEmpty()) {
                            // optimization for read heavy workloads
                            // iff this is a read, and there are no pending
                            // flushes (writes), then just pass this to the next
                            // processor
                            if (nextProcessor != null) {
                                nextProcessor.processRequest(si);
                                if (nextProcessor instanceof Flushable) {
                                    ((Flushable)nextProcessor).flush();
                                }
                            }
                            continue;
                        }
                      "第一轮最后,添加到toFlush,第二轮就会flush,也就是hit the disk "
                        
                        toFlush.add(si);
                        if (toFlush.size() > 1000) {
                            flush(toFlush);
                        }
                    }
                }
            } catch (Throwable t) {
                handleException(this.getName(), t);
                running = false;
            }
            LOG.info("SyncRequestProcessor exited!");
        }
    

    下面是flush方法

    private void flush(LinkedList<Request> toFlush)
            throws IOException, RequestProcessorException
        {
            if (toFlush.isEmpty())
                return;
            "这里的commit才是真正的hit the disk , "
    "而前面的append只是加入到groupcommit的数组中"
    "LinkedList<FileOutputStream> streamsToFlush"
            zks.getZKDatabase().commit();
            while (!toFlush.isEmpty()) {
                Request i = toFlush.remove();
                if (nextProcessor != null) {
                    "这里进入AckRequestProcessor"
                    nextProcessor.processRequest(i);
                }
            }
            if (nextProcessor != null && nextProcessor instanceof Flushable) {
                ((Flushable)nextProcessor).flush();
            }
        }
    

    tip:
    磁盘写缓存只有强制事务日志刷到磁盘后,server才能对proposal进行ack操作。说得更明白一点,server会调用ZKDatabase的commit方法,这最终会调用FileChannel.force方法。这样,server会在ack之前保证事务已被持久化到磁盘。关于此事还有一点要注意,现代磁盘有一个写缓存,可以保存要写到磁盘的数据。如果启用了写缓存,强行刷新不能保证返回的时候数据已落到磁盘,数据会落到写缓存中。为了保证在FileChannel.force()返回后数据落到磁盘,要禁用写磁盘缓存。操作系统有许多方式可以禁用

    3.2.1 发送proposal消息
    Leader#propose
    public Proposal propose(Request request) throws XidRolloverException {
             "省略n行代码======================"
              "packetType =PROPOSAL "
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, 
                    baos.toByteArray(), null);
            
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
            synchronized (this) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Proposing:: " + request);
                }
    
                lastProposed = p.packet.getZxid();
                "ConcurrentMap<Long, Proposal> outstandingProposals 发送出去的提案"
                outstandingProposals.put(lastProposed, p);
                sendPacket(pp);
            }
            return p;
        }
      void sendPacket(QuorumPacket qp) {
            "HashSet<LearnerHandler> forwardingFollowers"
              "给所有的follower发送proposal消息"
              "LearnerHandler 是leader和learner的通讯纽带"
            synchronized (forwardingFollowers) {
                for (LearnerHandler f : forwardingFollowers) {                
                    f.queuePacket(qp);
                }
            }
        }
    

    3.3 AckRequestProcessor 判断ackSet是否过半。如果是,就发送COMMIT和INFORM 消息

     public void processRequest(Request request) {
            QuorumPeer self = leader.self;
            if(self != null)
                leader.processAck(self.getId(), request.zxid, null);
            else
                LOG.error("Null QuorumPeer");
        }
    
    Leader#processAck
    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
           
        
          "省略n行代码======================"
    "前面ProposalProcessor已经放进去一个了"
            Proposal p = outstandingProposals.get(zxid);
            
            "leader添加自己到ackSet中,自己肯定投自己的提案,只要再接受一个ACK就"
            "可以通过了"
            p.ackSet.add(sid);
             "判断是否过半"
            if (self.getQuorumVerifier().containsQuorum(p.ackSet)){             
                if (zxid != lastCommitted+1) {
                    LOG.warn("Commiting zxid 0x{} from {} not first!",
                            Long.toHexString(zxid), followerAddr);
                    LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
                }
                outstandingProposals.remove(zxid);
                if (p.request != null) {
                    toBeApplied.add(p);
                }
    
                if (p.request == null) {
                    LOG.warn("Going to commmit null request for proposal: {}", p);
                }
                commit(zxid);
                inform(p);
                zk.commitProcessor.commit(p.request);
                if(pendingSyncs.containsKey(zxid)){
                    for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                        sendSync(r);
                    }
                }
            }
        }
    

    这儿有个疑问
    就是加入执行过半判断,还没有接受到过半的ACK怎么办?直接认定失败?

    3.4 follower.followLeader的时候,接受PROPOSAL消息

    Follower#followLeader
    while (this.isRunning()) {
                        readPacket(qp);
                        processPacket(qp);
                    }
    
    Follower#processPacket
    "处理PROPOSAL消息"
    case Leader.PROPOSAL:            
                TxnHeader hdr = new TxnHeader();
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
                if (hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                            + Long.toHexString(hdr.getZxid())
                            + " expected 0x"
                            + Long.toHexString(lastQueued + 1));
                }
                lastQueued = hdr.getZxid();
                "进入FollowerZooKeeperServer#logRequest方法,进入syncProcessor"
              "然后进入SyncRequestProcessor#flush方法,进入下一个Processor"
                fzk.logRequest(hdr, txn);
                break;
    

    3.5 SendAckRequestProcessor follower发送ACK 给leader,发现observer是没有发的

    public void processRequest(Request si) {
            if(si.type != OpCode.sync){
                QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
                    null);
                try {
                    learner.writePacket(qp, false);
                } catch (IOException e) {
                    LOG.warn("Closing connection to leader, exception during packet send", e);
                    try {
                        if (!learner.sock.isClosed()) {
                            learner.sock.close();
                        }
                    } catch (IOException e1) {
                        // Nothing to do, we are shutting things down, so an exception here is irrelevant
                        LOG.debug("Ignoring error closing the connection", e1);
                    }
                }
            }
        }
    

    3.6 LearnerHandler接受ACK

    LearnerHandler#run 576 行
    switch (qp.getType()) {
                    case Leader.ACK:
                        if (this.learnerType == LearnerType.OBSERVER) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Received ACK from Observer  " + this.sid);
                            }
                        }
                        syncLimitCheck.updateAck(qp.getZxid());
                        "这里的sid,指的就是发送ACK对应的Follower"
                        leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                        break;
    

    然后Leader#processAck方法,把上面的follower的sid,假如到对应Proposal的ackSet中。

    Leader#processAck
    "过半之后"
    if (self.getQuorumVerifier().containsQuorum(p.ackSet)){             
                if (zxid != lastCommitted+1) {
                    LOG.warn("Commiting zxid 0x{} from {} not first!",
                            Long.toHexString(zxid), followerAddr);
                    LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
                }
                outstandingProposals.remove(zxid);
                if (p.request != null) {
                    "将要被使用的proposal"
                    toBeApplied.add(p);
                }
    
                if (p.request == null) {
                    LOG.warn("Going to commmit null request for proposal: {}", p);
                }
                "给follower发送COMMIT 消息,给observer发送INFORM消息"
                commit(zxid);
                inform(p);
                zk.commitProcessor.commit(p.request);
                if(pendingSyncs.containsKey(zxid)){
                    for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                        sendSync(r);
                    }
                }
            }
    
    3.6.1 follower接受COMMIT消息,并且commit
    Follower#processPacket
    case Leader.COMMIT:
                fzk.commit(qp.getZxid());
    

    CommitProcessor

    CommitProcessor#run
    public void run() {
            try {
                Request nextPending = null;            
                while (!finished) {
                    "第一轮循环toProcess长度是0 ,committedRequests长度是1 "
                    int len = toProcess.size();
                    for (int i = 0; i < len; i++) {
                        nextProcessor.processRequest(toProcess.get(i));
                    }
                    toProcess.clear();
                    synchronized (this) {
                        if ((queuedRequests.size() == 0 || nextPending != null)
                                && committedRequests.size() == 0) {
                            wait();
                            continue;
                        }
                        // First check and see if the commit came in for the pending
                        // request
       "第一轮循环toProcess长度是0 ,committedRequests长度是1 走这里,加入到toProcess "
                        if ((queuedRequests.size() == 0 || nextPending != null)
                                && committedRequests.size() > 0) {
                            Request r = committedRequests.remove();
                            /*
                             * We match with nextPending so that we can move to the
                             * next request when it is committed. We also want to
                             * use nextPending because it has the cnxn member set
                             * properly.
                             */
                            if (nextPending != null
                                    && nextPending.sessionId == r.sessionId
                                    && nextPending.cxid == r.cxid) {
                                // we want to send our version of the request.
                                // the pointer to the connection in the request
                                nextPending.hdr = r.hdr;
                                nextPending.txn = r.txn;
                                nextPending.zxid = r.zxid;
                                toProcess.add(nextPending);
                                nextPending = null;
                            } else {
                                // this request came from someone else so just
                                // send the commit packet
                                toProcess.add(r);
                            }
                        }
                    }
    
                    // We haven't matched the pending requests, so go back to
                    // waiting
                    if (nextPending != null) {
                        continue;
                    }
    
                    synchronized (this) {
                        // Process the next requests in the queuedRequests
                        while (nextPending == null && queuedRequests.size() > 0) {
                            Request request = queuedRequests.remove();
                            switch (request.type) {
                            case OpCode.create:
                            case OpCode.delete:
                            case OpCode.setData:
                            case OpCode.multi:
                            case OpCode.setACL:
                            case OpCode.createSession:
                            case OpCode.closeSession:
                                nextPending = request;
                                break;
                            case OpCode.sync:
                                if (matchSyncs) {
                                    nextPending = request;
                                } else {
                                    toProcess.add(request);
                                }
                                break;
                            default:
                                toProcess.add(request);
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                LOG.warn("Interrupted exception while waiting", e);
            } catch (Throwable e) {
                LOG.error("Unexpected exception causing CommitProcessor to exit", e);
            }
            LOG.info("CommitProcessor exited loop!");
        }
    
    3.6.2 然后就是follower 的FinalRequestProcessor
    synchronized (zks.outstandingChanges) {
                while (!zks.outstandingChanges.isEmpty()
                        && zks.outstandingChanges.get(0).zxid <= request.zxid) {
                    ChangeRecord cr = zks.outstandingChanges.remove(0);
                    if (cr.zxid < request.zxid) {
                        LOG.warn("Zxid outstanding "
                                + cr.zxid
                                + " is less than current " + request.zxid);
                    }
                    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                        zks.outstandingChangesForPath.remove(cr.path);
                    }
                }
                if (request.hdr != null) {
                   TxnHeader hdr = request.hdr;
                   Record txn = request.txn;
                    "进入ZkDatabase处理txn"
                   rc = zks.processTxn(hdr, txn);
                }
                // do not add non quorum packets to the queue.
                if (Request.isQuorum(request.type)) {
                    zks.getZKDatabase().addCommittedProposal(request);
                }
            }
    

    以及构建response对象返回。
    leader侧同上。

    4 假如request是发送到follower或者observer的话,发送REQUEST消息给leader,让leader来处理

    FollowerRequestProcessor#run
    while (!finished) {
                    Request request = queuedRequests.take();
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                                'F', request, "");
                    }
                    if (request == Request.requestOfDeath) {
                        break;
                    }
                    // We want to queue the request to be processed before we submit
                    // the request to the leader so that we are ready to receive
                    // the response
                    nextProcessor.processRequest(request);
                    
                    // We now ship the request to the leader. As with all
                    // other quorum operations, sync also follows this code
                    // path, but different from others, we need to keep track
                    // of the sync operations this follower has pending, so we
                    // add it to pendingSyncs.
                    switch (request.type) {
                    case OpCode.sync:
                        zks.pendingSyncs.add(request);
                          "发送REQUEST消息给leader"
                        zks.getFollower().request(request);
                        break;
                    case OpCode.create:
                    case OpCode.delete:
                    case OpCode.setData:
                    case OpCode.setACL:
                    case OpCode.createSession:
                    case OpCode.closeSession:
                    case OpCode.multi:
                      "发送REQUEST消息给leader"
                        zks.getFollower().request(request);
                        break;
                    }
                }
    

    相关文章

      网友评论

          本文标题:zookeeper集群的分布式事务请求处理过程

          本文链接:https://www.haomeiwen.com/subject/tmxjahtx.html