美文网首页Zookeeper
Zookeeper之集群数据同步源码分析

Zookeeper之集群数据同步源码分析

作者: loveFXX | 来源:发表于2020-06-27 10:10 被阅读0次

    在完成领导者选举之后,确定服务器角色之后,需要进行数据同步。然后才能构建请求处理链RequestProcessor处理请求

    集群数据同步

    image.png

    整体流程:当角色确立之后,leader调用leader.lead();方法运行,创建一个接收连接的LearnerCnxAcceptor线程,在LearnerCnxAcceptor线程内部又建立一个阻塞的LearnerCnxAcceptorHandler线程等待Learner端的连接。Learner端以follower为例,follower调用follower.followLeader();方法首先查找leader的Socket服务端,然后建立连接。当follower建立连接后,leader端会建立一个LearnerHandler线程相对应,用来处理follower与leader的数据包传输。
    1、follower端封装当前zk服务器的Zxid和Leader.FOLLOWERINFO的LearnerInfo数据包发送给leader
    2、leader端这时处于getEpochToPropose方法的阻塞时期,需要得到Learner端超过一半的服务器发送Epoch
    3、getEpochToPropose解阻塞之后,LearnerHandler线程会把超过一半的Epoch与leader比较得到最新的newLeaderZxid,并封装成Leader.LEADERINFO包发送给Learner端
    4、Learner端得到最新的Epoch,会更新当前服务器的Epoch。并把当前服务器所处的lastLoggedZxid位置封装成Leader.ACKEPOCH发送给leader
    5、此时leader端处于waitForEpochAck方法的阻塞时期,需要得到Learner端超过一半的服务器发送EpochACK
    6、当waitForEpochAck阻塞之后便可以在LearnerHandler线程内决定用那种方式进行同步。如果Learner端的lastLoggedZxid>leader端的,Learner端将会被删除多余的部分。如果小于leader端的,将会以不同方式进行同步
    7、leader端发送Leader.NEWLEADER数据包给Learner端(6、7步骤都是另开一个线程来发送这些数据包)
    8、Learner端同步之后,会在一个while循环内处理各种leader端发送数据包,包括两阶段提交的Leader.PROPOSAL、Leader.COMMIT、Leader.INFORM等。在同步数据后会处理Leader.NEWLEADER数据包,然后发送Leader.ACK给leader端
    9、此时leader端处于waitForNewLeaderAck阻塞等待超过一半节点发送ACK。

    当角色确立之后,需要结合着看

    leader调用leader.lead();方法


    image.png

    follower调用follower.followLeader();方法


    image.png
    observer调用observer.observeLeader();方法
    image.png

    Leader

    首先看看Leader做了些什么
    org.apache.zookeeper.server.quorum.Leader#lead
    1、loadData加载数据


    image.png

    2、创建LearnerCnxAcceptor线程,接收followers的连接


    image.png
    ①、LearnerCnxAcceptorHandler接收连接线程
    这里加了CountDownLatch根据配置的同步的地址的数量(例如:server.2=127.0.0.1:12882:13882 配置同步的端口是12882只有一个),所以serverSockets大小是一个
    org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor#run
    等待其他follower或observer连接
    image.png

    ②、LearnerCnxAcceptorHandler
    这里接收到连接后,便会调用latch.countDown()解阻塞
    org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor.LearnerCnxAcceptorHandler#run


    image.png
    ③、acceptConnections
    在这里阻塞接收followers的连接,当有连接过来会生成一个socket对象。然后根据当前socket生成一个LearnerHandler线程
    org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor.LearnerCnxAcceptorHandler#acceptConnections
    每个Learner者都会开启一个LearnerHandler线程
    image.png
    ④、LearnerHandler
    org.apache.zookeeper.server.quorum.LearnerHandler#run
    这里就是读取或写数据包与Learner交换数据包。如果没有数据包读取,则会阻塞当前方法ia.readRecord(qp, "packet");
    image.png
    3、getEpochToPropose
    org.apache.zookeeper.server.quorum.Leader#lead
    Leader节点在开启LearnerHandler线程之后会继续调用getEpochToPropose方法
    image.png

    org.apache.zookeeper.server.quorum.Leader#getEpochToPropose
    判断connectingFollowers发给leader端的Epoch是否过半,如果过半则会解阻塞


    image.png
    如果不过半则会一直阻塞在这里,直到Follower把自己的Epoch数据包发送过来并符合过半机制
    image.png
    4、发送的Epoch过半之后,把当前zxid设置到zk
    image.png
    5、waitForEpochAck
    等待EpochAck
    image.png
    org.apache.zookeeper.server.quorum.Leader#waitForEpochAck
    是electingFollowers符合过半后唤醒,在Follower发送过半的Leader.ACKEPOCH数据包
    image.png
    image.png
    6、等待EpochAck解阻塞后
    把得到最新的epoch更新到当前服务,设置当前leader节点的zab状态是SYNCHRONIZATION
    image.png
    7、等待NewLeaderAck
    等待同步数据结束
    org.apache.zookeeper.server.quorum.Leader#waitForNewLeaderAck
    image.png
    image.png
    8、startZkServer
    启动zk服务端,并设置ZAB是ZabState.BROADCAST
    image.png
    org.apache.zookeeper.server.quorum.Leader#startZkServer
    reconfigEnabled开启
    image.png
    启动ZkServer服务端,更新currentVote、设置数据树dataTree.lastProcessedZxid
    image.png
    9、ping和不超过过半停止运行
    image.png
    image.png
    image.png

    LearnerHandler

    LearnerHandler线程是对应于Learner连接Leader端后,建立的一个与Learner端交换数据的线程。每一个Learner端都会创建一个
    org.apache.zookeeper.server.quorum.LearnerHandler#run
    1、readRecord读取数据包
    不断从learner节点读数据,如果没读到将会阻塞readRecord


    image.png

    2、FOLLOWERINFO或OBSERVERINFO的数据包
    如果数据包类型不是Leader.FOLLOWERINFO或Leader.OBSERVERINFO将会返回


    image.png
    3、获取到learnerInfoData
    image.png
    4、得到followerInfo和lastAcceptedEpoch
    image.png
    5、获取newEpoch通过getEpochToPropose方法
    image.png

    6、发送Leader.LEADERINFO数据包和waitForEpochAck


    image.png
    7、syncFollower同步
    needSnap表示是否需要快照同步
    image.png
    org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower
    如果是新机器启动isPeerNewEpochZxid=0
    image.png
    8、数据树DataTree同步到其他节点
    反序列化serializeSnapshot 数据树DataTree到其他节点
    image.png
    9、Leader.NEWLEADER数据包
    把Leader.NEWLEADER数据包放入到queuedPackets
    image.png
    10、startSendingPackets开始发送数据包
    org.apache.zookeeper.server.quorum.LearnerHandler#startSendingPackets
    image.png

    11、waitForNewLeaderAck


    image.png
    12、发送Leader.ACK数据包
    image.png
    image.png

    Follower

    然后再看看Follower节点做了些那些事情
    org.apache.zookeeper.server.quorum.Follower#followLeader
    1、这里首先会寻找领导者leader的findLeader


    image.png

    org.apache.zookeeper.server.quorum.Learner#findLeader
    根据投票信息currentVote得到那台是领导者


    image.png
    连接到leader端
    image.png
    2、registerWithLeader
    在建立连接后,会把follower的信息发送给lead。这些信息包括zxid(就是epoch届数)和sid。目的就是为了统一Epoch

    org.apache.zookeeper.server.quorum.Learner#registerWithLeader


    image.png
    image.png
    当有数据包传输,LearnerHandler的readRecord将会被解阻塞。会获取到这个数据包
    org.apache.zookeeper.server.quorum.LearnerHandler#run
    image.png
    获取到follower节点的followerInfo信息之后,便会调用到getEpochToPropose方法。
    org.apache.zookeeper.server.quorum.LearnerHandler#run
    image.png
    3、把得到的Epoch数据包发送
    此时数据包类型参数是Leader.LEADERINFO,newLeaderZxid最新的Leader的Zxid
    image.png
    4、读取lead返回的Leader.LEADERINFO数据包
    org.apache.zookeeper.server.quorum.Learner#registerWithLeader
    image.png

    5、向leader写Leader.ACKEPOCH数据包
    org.apache.zookeeper.server.quorum.Learner#registerWithLeader


    image.png
    6、syncWithLeader
    设置当前Follower的ZAB状态是SYNCHRONIZATION
    image.png
    org.apache.zookeeper.server.quorum.Learner#syncWithLeader
    image.png
    image.png
    Leader.TRUNC删除超过的数据zxid
    image.png
    7、Leader.NEWLEADER数据包
    发送Leader.ACK数据包给leader
    image.png

    数据同步

    image.png

    同步数据可以从CommittedLog(提交的日志)、TxnLog日志记录、ZKDatabase zk内存数据数DataTree进行同步
    TxnLog日志记录和zk内存数据数DataTree是全的,而CommittedLog是只有一部分
    org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower


    image.png

    这里首先判断这台Learner是否第一次启动,里面是否没有任何数据。如果没有isPeerNewEpochZxid=0
    把同步的位置peerLastZxid赋值给currentZxid。默认情况下needSnap=true进行快照同步


    image.png
    获取CommittedLog最大maxCommittedLog和最小的minCommittedLog位置,并且获取DataTree的lastProcessedZxid
    image.png
    如果CommittedLog是空的,则把DataTree的lastProcessedZxid赋值给最大maxCommittedLog和最小的minCommittedLog
    1、forceSnapSync和lastProcessedZxid == peerLastZxid
    image.png

    forceSnapSync=true是强制快照同步,这里不符合。
    lastProcessedZxid == peerLastZxid说明Leader节点与需要同步Learner节点的相等,则不需要同步。Leader.DIFF类型数据包放入queuedPackets队列中
    2、peerLastZxid>maxCommittedLog和minCommittedLog<peerLastZxid<maxCommittedLog


    image.png
    如果Learner节点需要同步的的位置比Leader节点的maxCommittedLog还大,说明Learner节点数据比较多(可能peerLastZxid=120),要把这部分多的给删除掉(100到120)。发送Leader.TRUNC数据包到queuedPackets队列中,把超过maxCommittedLog的都给删了
    如果同步的位置peerLastZxid在maxCommittedLog和minCommittedLog之间(peerLastZxid=85),可以通过CommittedLog直接进行同步。只需要同步peerLastZxid到maxCommittedLog最大位置zxid的数据(85到100)。
    image.png
    3、peerLastZxid<minCommittedLog
    如果同步的位置比CommittedLog最小的minCommittedLog还要小(peerLastZxid=60),那么将会要从TxnLog日志中同步peerLastZxid到minCommittedLog和CommittedLog的所有日志。如果TxnLog日志也不全,那么只能通过快照同步needSnap=true,这里没有把needSnap设置为false
    image.png
    org.apache.zookeeper.server.quorum.LearnerHandler#run
    image.png
    needSnap=true就是把数据树DataTree反序列封装成Leader.SNAP数据包发送给Learner节点。

    committedLog

    committedLog表示处理过的写请求。committedLog不是一个无限的队列commitLogCount表示存储多少个默认commitLogCount=DEFAULT_COMMIT_LOG_COUNT=500,用两个值minCommittedLog、maxCommittedLog表示这个区间[minCommittedLog,maxCommittedLog],如果超过commitLogCount大小committedLog将会被替换。committedLog的添加是在FinalRequestProcessor处理的
    1、processRequest
    org.apache.zookeeper.server.FinalRequestProcessor#processRequest


    image.png

    2、applyRequest
    这里也是更新ZkDataBase,触发Watch,集群模式下把Request添加到committedLog
    org.apache.zookeeper.server.FinalRequestProcessor#applyRequest


    image.png
    3、processTxn
    org.apache.zookeeper.server.ZooKeeperServer#processTxn(org.apache.zookeeper.server.Request)
    image.png
    boolean quorumRequest = request.isQuorum();
    image.png
    image.png

    4、addCommittedProposal
    org.apache.zookeeper.server.ZKDatabase#addCommittedProposal(org.apache.zookeeper.server.Request)
    把协议Proposal添加到committedLog队列中,


    image.png

    总结:

    Zookeeper集群数据同步是在确立角色之后进行的,其关键步骤是通过Epoch和EpochACK的过半验证阻塞之后进行的。在同步之后也需要收到NewLeaderAck超过一半才能正常启动初始化zookeeper的Leader端。
    同步的原理是根据Zxid所处的位置来寻找最佳的同步方式

    相关文章

      网友评论

        本文标题:Zookeeper之集群数据同步源码分析

        本文链接:https://www.haomeiwen.com/subject/xzjmfktx.html