美文网首页CODE REVIEW INDEX
APACHE ZOOKEEPER 3.5.3 CODE REVI

APACHE ZOOKEEPER 3.5.3 CODE REVI

作者: hnail | 来源:发表于2018-06-18 20:26 被阅读0次

    导语

    1.zab协议崩溃恢复如何实现leader选举及数据同步?
    2.zab消息广播阶段如何实现发起投票、收集选票、提交事务,并保证事务的顺序一致性?
    3.paxos、zab、raft vs pbft vs pow、pos、ripple 等区块链共识算法的区别是什么,分别适合什么场景?//TODO:
    

    zab协议包含两个阶段崩溃恢复消息广播,基于zookeeper 3.5.3集群启动以及ZooKeeper.setData来分别说明下两阶段的流程。


    一.Index

    1.1 崩溃恢复

    org.apache.zookeeper.server.quorum.QuorumPeerMain.main
        ->QuorumPeerMain.runFromConfig
            ->NettyServerCnxnFactory.configure //配置ServerCnxnFactory
            ->quorumPeer = getQuorumPeer();
            ->quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir())) //设置FileTxnSnapLog,管理txn以及snap
            ->quorumPeer.setMyid(config.getServerId()) //设置myid
            ->quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit()); //超时时间
            ->quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));//初始化一个空的ZKDatabase
            ->quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); //设置QuorumVerifier
            ->quorumPeer.setCnxnFactory(cnxnFactory)
            ->org.apache.zookeeper.server.quorum.QuorumPeer.start() //启动QuorumPeer
                ->QuorumPeer.loadDataBase
                    ->zkDb.loadDataBase() //恢复挂机之前的内存状态
                        ->long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
                            ->processTransaction(hdr,dt,sessions, itr.getTxn());//重放所有的Transaction,
                            ->listener.onTxnLoaded(hdr, itr.getTxn()); //将已提交的日志放入org.apache.zookeeper.server.ZKDatabase.committedLog中并更新minCommittedLog、maxCommittedLog两个offset便于同步时使用
                                ->addCommittedProposal(r)
                    ->currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); //获取之前记录的Epoch
                    ->acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
                ->QuorumPeer.startServerCnxnFactory //绑定端口,启动服务
                ->QuorumPeer.startLeaderElection //开启选举
                    ->currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());//设置当前服务器的投票,使用之前记录的epoch以及zxid
                    ->this.electionAlg = createElectionAlgorithm(electionType); //设置选举算法,默认
                        ->qcm = new QuorumCnxManager(this);
                        ->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.run
                            ->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.ss.bind(addr)//绑定选举端口
                            ->client = ss.accept();//等待连接
                            ->org.apache.zookeeper.server.quorum.QuorumCnxManager.receiveConnection //如果建立链接的对端的的sid小于当前服务器id,则当前服务器作为客户端去建立链接,否则启动发送接收线程开始选举流程,此逻辑是为了避免两台服务器建立多个链接
                                ->closeSocket(sock);//关闭之前的连接
                                ->org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne
                                    ->QuorumCnxManager.initiateConnection //net层面的数据接收发发送
                                        ->dout.writeLong(self.getId());dout.flush(); //建立链接发送的第一条为当前服务器端myid
                                        ->SendWorker sw = new SendWorker(sock, sid); //将QuorumCnxManager.queueSendMap发送出去
                                        ->RecvWorker rw = new RecvWorker(sock, sid, sw);//将收到的消息加入QuorumCnxManager.recvQueue中
                                        ->sw.start();rw.start(); 
                        ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.start //启动消息处理器
                            ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender.run
                            ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver.run
                ->QuorumPeer.run
                    ->while (running) {
                        ->case LOOKING //注意leader选举用的当前节点的zxid为org.apache.zookeeper.server.DataTree.lastProcessedZxid,而本字段为FinalRequestProcessor.processRequest方法更新,FinalRequestProcessor为ProposalRequestProcessor->CommitProcessor之后的,即commit阶段才会更新,则zk其实是按照进入提交阶段zxid为准,即写入过半后发出大于1个commit后算写入成功
                            ->setCurrentVote(makeLEStrategy().lookForLeader());
                                ->org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader
                                    ->synchronized(this){logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());} //更新选举周期,并设置当前服务器的leader选举propose为自己:FastLeaderElection.proposedLeader,proposedZxid,proposedEpoch
                                    ->sendNotifications //将自己的投票发给集群中所有节点
                                        ->ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, qv.toString().getBytes());
                                        ->sendqueue.offer(notmsg);
                                    ->while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){ //执行循环,直到确定leader
                                        ->Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); //获取收到的Notification
                                        ->switch (n.state) {
                                            ->case LOOKING:
                                                ->if (n.electionEpoch > logicalclock.get()) { //如果收到的投票周期大于当前节点认为的周期,则更新周期、清空已经收到的投票、
                                                    ->logicalclock.set(n.electionEpoch);
                                                    ->recvset.clear();
                                                    ->if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else{updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch())} //更新当前节点的投票,先比较周期,周期相同再比较zxid,zxid相同然后就比较myid,然后选大的作为当前节点后续的投票
                                                    ->sendNotifications();//将最新的投票发给集群节点
                                                ->else if (n.electionEpoch < logicalclock.get()) {} //如果收到的投票周期小于当前节点周期,则忽略,继续取下一条Notification
                                                ->else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); //收到的投票比较大,则更新为收到的投票,并发送给集群其他节点
                                                ->recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //以上条件都不符合,则为收到的投票为其他节点认可当前节点的投票,把选票更新到recvset里
                                                ->if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) { //查看收到的选票是否达到了要求,QuorumMaj为过半策略,没有达到要求继续循环获取Notification,否则设置投票结果,结束leader选举
                                                    ->SyncedLearnerTracker.addAck(entry.getKey());
                                                        ->QuorumVerifierAcksetPair.getAckset().add(sid);
                                                    ->SyncedLearnerTracker.hasAllQuorums
                                                        ->qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())
                                                            ->QuorumMaj.containsQuorum
                                                -> while((n = recvqueue.poll(finalizeWait..//读取剩余的notification,直到读取完成
                                                -> self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState()); //如果选举的leader为自己,则QuorumPeer.state设为LEADING,否则设为FOLLOWING
                                                ->Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
                                                ->return endVote;//结束选举
                                        ->}
                        ->case LEADING: //如果当前节点为lead节点
                            ->QuorumPeer.makeLeader
                            ->org.apache.zookeeper.server.quorum.Leader.Leader
                                ->Leader.ss.bind(self.getQuorumAddress());//leader绑定端口号等待follower建立连接,本端口用来同步数据,leader与follower之间心跳、投票等数据交换
                            ->org.apache.zookeeper.server.quorum.Leader.lead
                                ->zk.loadData();
                                    ->setZxid(zkDb.getDataTreeLastProcessedZxid()); //设置zxid
                                    ->killSession(session, zkDb.getDataTreeLastProcessedZxid()); //清理过期的session
                                    ->takeSnapshot();//落地一个干净的Snapshot
                                ->cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();
                                    ->LearnerCnxAcceptor.run
                                        -> while (!stop) {Socket s = ss.accept();LearnerHandler fh = new LearnerHandler(s, Leader.this);fh.start();} //等待链接,初始化follower的LearnerHandler作为消息处理器
                                            ->LearnerHandler.run //leader处理flower请求的主流程
                                                ->ia.readRecord(qp, "packet");//读取第一个follower发送过来的消息[#1.1],读取出sid,设置LearnerHandler的sid
                                                    ->this.sid = bbsid.getLong();
                                                ->QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); //发送leaderinof
                                                ->[#1.2]oa.writeRecord(newEpochPacket, "packet");
                                                ->ia.readRecord(ackEpochPacket, "packet");//读取Follower的ack消息[#1.3]
                                                ->ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                                                ->leader.waitForEpochAck(this.getSid(), ss);
                                                    ->Leader.waitForEpochAck //阻塞,直到过半Follower返回ack消息
                                                        ->electingFollowers.add(id);
                                                        ->if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) 
                                                            ->electionFinished = true;
                                                            ->electingFollowers.notifyAll();
                                                ->boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); //与follower同步
                                                    ->long maxCommittedLog = db.getmaxCommittedLog(); //获取三个值,zxid以及内存中ZKDatabase.committedLog保存的最大最小日志
                                                    ->long minCommittedLog = db.getminCommittedLog();
                                                    ->long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
                                                    ->[#1.4]if (lastProcessedZxid == peerLastZxid) {queueOpPacket(Leader.DIFF, peerLastZxid);}//如果相同,则发送Leader.DIFF消息
                                                    ->[#1.5]if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) //如果leader节点的log小于当前节点,则发送TRUNC消息截断
                                                        ->queueOpPacket(Leader.TRUNC, maxCommittedLog);
                                                        ->currentZxid = maxCommittedLog;
                                                    ->if ((maxCommittedLog >= peerLastZxid)&& (minCommittedLog <= peerLastZxid)) //如果处在maxCommittedLog与minCommittedLog中间,则说明缺了部分数据,则按照普通投票方式把ZKDatabase.committedLog内存中保存的这部分数据按照普通的Proposal发送出去,并提交Committed
                                                        ->LearnerHandler.queueCommittedProposals
                                                            ->while (itr.hasNext()) 
                                                                ->Proposal propose = itr.next()
                                                                ->queuePacket(propose.packet);//发送propose
                                                                ->queueOpPacket(Leader.COMMIT, packetZxid);//提交propose
                                                            ->queueOpPacket(Leader.DIFF, lastCommitedZxid); //不管啥情况下,都发个DIFF消息表名同步完成
                                                    ->if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {//如果内存中log不够,则把Log以及snap中的消息都发过去
                                                    ->leaderLastZxid = leader.startForwarding(this, currentZxid);[TODO://]//处理toBeApplied、outstandingProposals的目的是啥
                                                ->[#1.6]QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);queuedPackets.add(newLeaderQP);bufferedOutput.flush();//发送NEWLEADER消息
                                                ->ia.readRecord(qp, "packet");//读取[#1.7]的消息,即NEWLEADER的ack
                                                ->leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());//阻塞,直到收到足够的Follower的NEWLEADER的ack
                                                    ->Leader.newLeaderProposal.addAck(sid);
                                                ->[#1.9]while(!leader.zk.isRunning() && !this.isInterrupted()){leader.zk.wait(20);}//阻塞,等待leader的ZooKeeperServer.state变为RUNNING
                                                ->[#1.10]queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); //发送UPTODATE消息给Follower
                                                ->while (true) //崩溃恢复阶段完成,进入普通消息处理阶段,leader与follower之前的消息处理参考消息广播阶段示例
                                                    ->ia.readRecord(qp, "packet");
                                                    ->.....
                                ->zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
                                ->lastProposed = zk.getZxid();
                                ->waitForEpochAck(self.getId(), leaderStateSummary);//主节点也是投票节点,参与新节点选举的策略
                                ->self.setCurrentEpoch(epoch);
                                ->[#1.6]newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);
                                ->waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);//阻塞,并投出本leader的票,与LearnerHandler一块参与newLeaderProposal的过半策略
                                    ->Leader.newLeaderProposal.addAck(sid);
                                ->startZkServer(); //启动leader,
                                    ->Leader.startZkServer
                                        ->lastCommitted = zk.getZxid()
                                        ->zk.startup();
                                            ->ZooKeeperServer.startup
                                                ->startSessionTracker();
                                                ->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.setupRequestProcessors();//启动处理链,处理链的逻辑下面消息广播详细说明
                                                ->setState(State.RUNNING);//标志leader启动完成,并触发LearnerHandler[#1.9]继续向下走
                                        ->self.updateElectionVote(getEpoch());//设置QuorumPeer.currentVote
                                        ->zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
                                ->while (true) //循环向follower发送pin消息,获取session等信息,PING处理不再展开
                                    ->f.ping();
                                        ->LearnerHandler.ping
                                            ->QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
                                            ->queuePacket(ping);
                        ->case FOLLOWING: //FOLLOWING节点
                            ->setFollower(makeFollower(logFactory));
                            ->org.apache.zookeeper.server.quorum.Follower.followLeader
                                ->connectToLeader(addr);
                                ->long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                                    ->Learner.registerWithLeader
                                        ->qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
                                        ->boa.writeRecord(li, "LearnerInfo");
                                        ->[#1.1]writePacket(qp, true); //发送给leader第一个消息注册
                                        ->readPacket(qp);//读取leader返回的LEADERINFO[#1.2]
                                        ->if (qp.getType() == Leader.LEADERINFO) 
                                            ->QuorumPeer.setAcceptedEpoch
                                                ->acceptedEpoch = e; //设置QuorumPeer.acceptedEpoch
                                                ->writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
                                        ->QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
                                        ->[#1.3]writePacket(ackNewEpoch, true);
                                    ->syncWithLeader(newEpochZxid);
                                        ->Learner.syncWithLeader
                                            ->readPacket(qp);
                                            ->if (qp.getType() == Leader.DIFF) {snapshotNeeded = false;} //获取[#1.4]发送的DIFF消息,本消息不做任何操作,仅作为标志
                                            ->if (qp.getType() == Leader.TRUNC) //处理[#1.5]消息,截断多余的消息
                                                ->boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                                            ->zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                                            ->zk.createSessionTracker();
                                            ->outerLoop:
                                                ->while (self.isRunning()) 
                                                    ->readPacket(qp);
                                                    ->case Leader.PROPOSAL:
                                                        ->packetsNotCommitted.add(pif);//将日志加入到LinkedList packetsNotCommitted中
                                                    ->case Leader.COMMIT:
                                                        ->pif = packetsNotCommitted.peekFirst()//取出刚才放入packetsNotCommitted的日志
                                                        ->packetsCommitted.add(qp.getZxid());//放入packetsCommitted中
                                                    ->case Leader.NEWLEADER: //处理[#1.6]消息
                                                        ->[#1.7]writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                                                    ->case Leader.UPTODATE:
                                                        ->break outerLoop;跳出outerLoop,完成Follower的崩溃恢复阶段
                                    ->while (this.isRunning()) {
                                        ->Follower.readPacket(qp);
                                        ->Follower.processPacket(qp);//不再展开,具体参考消息广播阶段示例
                                    ->}     
                    ->}
    

    1.2 消息广播

    [Client]org.apache.zookeeper.test.ClientTest.performClientTest
        ->MyWatcher watcher = new MyWatcher();
        ->org.apache.zookeeper.ZooKeeper.ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
            ->clientConfig = new ZKClientConfig();
            ->watchManager = defaultWatchManager();
            ->cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly)
                ->ClientCnxn.ClientCnxn()
                    ->sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
            ->cnxn.start();
                ->ClientCnxn.SendThread.run()
                    ->while (state.isAlive())
                        ->SendThread.startConnect();//选择任何节点,建立连接
                            ->clientCnxnSocket.connect(addr);
                                ->org.apache.zookeeper.ClientCnxnSocketNetty.connect
                                    ->bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
                                        ->pipeline.addLast("handler", new ZKClientHandler()); //ZKClientHandler作为客户端的ChannelHandler
                                    ->connectFuture = bootstrap.connect(addr); 
                                        ->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.channelConnected
                                            ->allChannels.add(ctx.getChannel());
                                            ->addCnxn(cnxn);
                                    ->org.apache.zookeeper.ClientCnxnSocketNetty.connect.operationComplete //完成连接
                                        ->org.apache.zookeeper.ClientCnxn.SendThread.primeConnection();
                                            ->ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
                                            ->[#2.1]outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));
                                                ->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.processMessage
                                                    ->org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
                                                      ->zks.processConnectRequest(this, bb);//由于第一次发消息org.apache.zookeeper.server.NettyServerCnxn.initialized为false,所以走这个分支
                                                        ->org.apache.zookeeper.server.ZooKeeperServer.processConnectRequest //处理[#2.1]的ConnectRequest请求
                                                            ->if (sessionId == 0) createSession(cnxn, passwd, sessionTimeout);
                                                                ->long sessionId = sessionTracker.createSession(timeout);
                                                                    ->org.apache.zookeeper.server.quorum.LearnerSessionTracker.nextSessionId.getAndIncrement();
                                                                ->Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
                                                                ->submitRequest(si);//createSession为事务请求,需要走正常的事务请求的流程,由leader发起Proposal,并最终ack后commit,此处不再展开,后面基于setdata来说下事务流程
                                                                    ->org.apache.zookeeper.server.ZooKeeperServer.firstProcessor.processRequest(si);
                                                                        ->FollowerRequestProcessor.processRequest(si);
                                                      ->initialized = true;
                                            ->clientCnxnSocket.connectionPrimed();
                                        ->wakeupCnxn();
                                            ->outgoingQueue.add(WakeupPacket.getInstance());//发送空Packet
                        ->sendPing();//达到阈值后发送心跳
                            ->RequestHeader h = new RequestHeader(-2, OpCode.ping);
                            ->queuePacket(h, null, null, null, null, null, null, null, null);
                        ->clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);//循环发送org.apache.zookeeper.ClientCnxn.pendingQueue里的Packet
                ->ClientCnxn.EventThread.run()
        ->zk.setData("/benwashere", "hi".getBytes(), 57);//事务流程
            ->final String serverPath = prependChroot(clientPath);
            ->h.setType(ZooDefs.OpCode.setData);
            ->org.apache.zookeeper.ClientCnxn.submitRequest
                ->Packet packet = queuePacket(h, r, request, response, null, null, null,null, watchRegistration, watchDeregistration); //将消息放入org.apache.zookeeper.ClientCnxn.outgoingQueue
                    ->[Follower]org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
                        ->org.apache.zookeeper.server.ZooKeeperServer.processPacket
                            ->Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),h.getType(), incomingBuffer, cnxn.getAuthInfo());
                            ->si.setOwner(ServerCnxn.me);
                            ->org.apache.zookeeper.server.ZooKeeperServer.submitRequest
                                ->touch(si.cnxn);
                                ->firstProcessor.processRequest(si);
                                    ->FollowerRequestProcessor.processRequest
                                        ->org.apache.zookeeper.server.quorum.FollowerRequestProcessor.run
                                            ->nextProcessor.processRequest(request);//先放入queuedRequests等待commit之后放入到committedRequests中
                                                ->CommitProcessor.processRequest(request);
                                                    ->org.apache.zookeeper.server.quorum.CommitProcessor.run
                                            ->zks.getFollower().request(request); //事务请求发送给leader
                                                ->org.apache.zookeeper.server.quorum.Learner.request
                                                    ->QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
                                                    ->writePacket(qp, true);
                                                        ->leaderOs.writeRecord(pp, "packet");
                                                            ->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.REQUEST ->leader.zk.submitLearnerRequest(si);
                                                                ->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.submitLearnerRequest -> prepRequestProcessor.processRequest(request);
                                                                    ->org.apache.zookeeper.server.PrepRequestProcessor.processRequest //由于请求处理是放入submittedRequests中,然后等待PrepRequestProcessor单线程run方法顺序处理,即PrepRequestProcessor.pRequest为严格单线程执行,不存在并发问题
                                                                        ->org.apache.zookeeper.server.PrepRequestProcessor.pRequest
                                                                            ->SetDataRequest setDataRequest = new SetDataRequest();
                                                                            ->org.apache.zookeeper.server.PrepRequestProcessor.pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); //注意此处生成Zxid,即org.apache.zookeeper.server.ZooKeeperServer.hzxid.incrementAndGet();
                                                                                ->zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                                                                                ->checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
                                                                                ->request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
                                                                                ->[#2.2]org.apache.zookeeper.server.PrepRequestProcessor.addChangeRecord(nodeRecord);
                                                                                    ->zks.outstandingChanges.add(c);
                                                                                    ->zks.outstandingChangesForPath.put(c.path, c);
                                                                            ->request.zxid = zks.getZxid();//设置上面方法生成的自增zxid
                                                                            ->nextProcessor.processRequest(request); //分叉到两个条线路CommitProcessor、SyncRequestProcessor,发送propose给集群follower
                                                                                ->ProposalRequestProcessor.processRequest
                                                                                    ->nextProcessor.processRequest(request);
                                                                                        ->CommitProcessor.processRequest(request);
                                                                                    ->zks.getLeader().propose(request); -> org.apache.zookeeper.server.quorum.Leader.propose
                                                                                        ->QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,baos.toByteArray(), null);
                                                                                        ->Proposal p = new Proposal();p.packet = pp;
                                                                                        ->p.addQuorumVerifier(self.getQuorumVerifier());
                                                                                        ->lastProposed = p.packet.getZxid();
                                                                                        ->outstandingProposals.put(lastProposed, p);//发出的投票放到org.apache.zookeeper.server.quorum.Leader.outstandingProposals中
                                                                                        ->sendPacket(pp);//投票发送给各个follower
                                                                                            ->for (LearnerHandler f : forwardingFollowers) {f.queuePacket(qp);} //放入org.apache.zookeeper.server.quorum.LearnerHandler.queuedPackets中
                                                                                                ->org.apache.zookeeper.server.quorum.LearnerHandler.sendPackets
                                                                                                    ->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.PROPOSAL: 
                                                                                                        ->lastQueued = hdr.getZxid(); //保存到Follower.lastQueued,follower处理txn为严格有序的
                                                                                                        ->FollowerZooKeeperServer.logRequest(hdr, txn);
                                                                                                            ->Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
                                                                                                            ->pendingTxns.add(request); //保存到org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns中,供后续COMMIT时使用
                                                                                                            ->syncProcessor.processRequest(request);
                                                                                                                ->org.apache.zookeeper.server.SyncRequestProcessor.run
                                                                                                                    ->si = queuedRequests.take();
                                                                                                                    ->zks.getZKDatabase().append(si)
                                                                                                                        ->this.snapLog.append(si);
                                                                                                                            ->org.apache.zookeeper.server.persistence.FileTxnLog.append
                                                                                                                    ->zks.getZKDatabase().rollLog(); || zks.takeSnapshot(); //检查l是否需要做Snapshot以及roll
                                                                                                                    ->toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);} //达到阈值刷新或是没有新的日志了都刷新,刷新的时候才触发持久化到日志文件,并继续向下流转
                                                                                                                        ->SyncRequestProcessor.flush
                                                                                                                            ->zks.getZKDatabase().commit(); //将上面的日志刷写到磁盘上
                                                                                                                                ->org.apache.zookeeper.server.persistence.FileTxnLog.commit
                                                                                                                            ->nextProcessor.processRequest(i);
                                                                                                                                ->SendAckRequestProcessor.processRequest
                                                                                                                                    ->QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
                                                                                                                                    ->learner.writePacket(qp, false); -> leaderOs.writeRecord(pp, "packet"); //将ack信息返回leader
                                                                                                                                        ->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.ACK
                                                                                                                                            ->leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); -> Leader.processAck
                                                                                                                                                ->Proposal p = outstandingProposals.get(zxid);
                                                                                                                                                ->p.addAck(sid); //当前sid加入到确认列表中
                                                                                                                                                ->boolean hasCommitted = Leader.tryToCommit(p, zxid, followerAddr); //每收到一个ack,检查一次commit的阈值,由于leader单线程发送proposal,所以为严格有序的,而且过半策略情况下,当前zxid未达到commmit条件,zxid+1也必然不会达到commit条件
                                                                                                                                                    ->if (!p.hasAllQuorums()) {return false;} ->QuorumMaj.containsQuorum -> (ackSet.size() > half); //没达到阈值继续下次触发,QuorumMaj为过半策略
                                                                                                                                                    ->if (zxid != lastCommitted+1) {...} //检查本次commit的zxid是否为上次的zxid+1,由于zab协议为严格顺序执行,且没有事务提交失败情况
                                                                                                                                                    ->outstandingProposals.remove(zxid); //移除proposal阶段存的txn
                                                                                                                                                    ->toBeApplied.add(p);//将待提交的Proposal放入org.apache.zookeeper.server.quorum.Leader.toBeApplied中
                                                                                                                                                    ->commit(zxid);
                                                                                                                                                        ->lastCommitted = zxid; //设置Leader.lastCommitted
                                                                                                                                                        ->QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
                                                                                                                                                        ->sendPacket(qp);
                                                                                                                                                            ->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.COMMIT: 
                                                                                                                                                                ->long firstElementZxid = pendingTxns.element().zxid; //取出proposal阶段保存的org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns
                                                                                                                                                                ->if (firstElementZxid != zxid) {System.exit(12);} //如果提交的zxid与proposal阶段的zxid不一致,说明系统出问题了,直接exit
                                                                                                                                                                ->Request request = pendingTxns.remove(); //从pendingTxns移除
                                                                                                                                                                ->commitProcessor.commit(request);
                                                                                                                                                                    ->committedRequests.add(request); -> CommitProcessor.run() //加入CommitProcessor.committedRequests
                                                                                                                                                                        ->nextPending.set(request); //放入org.apache.zookeeper.server.quorum.CommitProcessor.nextPending
                                                                                                                                                                        ->processCommitted();
                                                                                                                                                                            ->request = committedRequests.poll();//取出nextPending以及committedRequests对比,如果一致
                                                                                                                                                                            ->Request pending = nextPending.get();
                                                                                                                                                                            ->if (pending != null &&pending.sessionId == request.sessionId &&pending.cxid == request.cxid)
                                                                                                                                                                                ->currentlyCommitting.set(pending);//保存到CommitProcessor.currentlyCommitting中
                                                                                                                                                                                ->sendToNextProcessor(pending); //workerPool此处按照request.sessionId分组了
                                                                                                                                                                                    ->workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
                                                                                                                                                                                        ->org.apache.zookeeper.server.quorum.CommitProcessor.CommitWorkRequest.doWork
                                                                                                                                                                                            ->nextProcessor.processRequest(request); -> Leader.ToBeAppliedRequestProcessor.processRequest
                                                                                                                                                                                                ->next.processRequest(request);
                                                                                                                                                                                                    ->FinalRequestProcessor.processRequest(request);
                                                                                                                                                                                                        ->rc = zks.processTxn(request); //讲事务应用到内存数据库中
                                                                                                                                                                                                            ->org.apache.zookeeper.server.DataTree.processTxn ->case OpCode.setData:
                                                                                                                                                                                                                ->DataTree.setData
                                                                                                                                                                                                                    ->DataNode n = nodes.get(path);n.data = data; //更新内存数据节点
                                                                                                                                                                                                                    ->dataWatches.triggerWatch(path, EventType.NodeDataChanged); //触发trigger
                                                                                                                                                                                                            ->lastProcessedZxid = rc.zxid; //更新org.apache.zookeeper.server.DataTree.lastProcessedZxid //此字段表明commit的offset,选leader同步数据都会用到
                                                                                                                                                                                                        ->rsp = new SetDataResponse(rc.stat);                                                                                                                                                                                                                                                                                                                                                                                           ->
                                                                                                                                                                                                ->leader.toBeApplied.iterator().remove();//将提交完成的从Leader.toBeApplied移除
                                                                                                                                                                                            ->currentlyCommitting.compareAndSet(request, null); //currentlyCommitting置空
                                                                                                                                                    ->inform(p); //发送给observer本次Proposal
                                                                                                                                                        ->QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,proposal.packet.getData(), null);
                                                                                                                                                        ->sendObserverPacket(qp);
                                                                                                                                                    ->zk.commitProcessor.commit(p.request); //执行Leader的commit,执行逻辑参考Follower,区别仅仅是移除[#2.2]阶段放入outstandingChanges、outstandingChangesForPath的数据
    
    
                                                                                                                                                    ->...//pendingSyncs跳过,暂不考虑leader接收client请求的情况
                                                                                                                            ->((Flushable)nextProcessor).flush();
                                                                                                                                ->SendAckRequestProcessor.flush
                                                                                    ->syncProcessor.processRequest(request);
                ->while (!packet.finished) {packet.wait();} //阻塞,等待回复
    

    二.reference

    三.下篇

    REDIS 3.2.8 CODE REVIEW

    趁着公司618备战期间把 zk review下,看源码感觉看得快忘的也快,还是写个index记录思路来的清晰,本想把redis、spark-core、spark-streaming、spark-graphx、hive,jdk、netty什么的都写下,发现每review一个还真是挺废精力的,很多当时想明白的问题,现在看之前的写的TODO,发现一脸懵逼,已经忘了。有的当时看的版本比较老,比如spark1.8版本也不打算再review了,回头直接看2.x 版本SQL的实现了;有的看只看了一部分,hive的词法解析器、物理执行计划基本都没咋看,所以写个review也没有整体概念,单写个别模块貌似没啥意思。先给自己挖个坑了,剩下的以后再补了...

    相关文章

      网友评论

        本文标题:APACHE ZOOKEEPER 3.5.3 CODE REVI

        本文链接:https://www.haomeiwen.com/subject/lqtoeftx.html