在完成领导者选举之后,确定服务器角色之后,需要进行数据同步。然后才能构建请求处理链RequestProcessor处理请求
集群数据同步
![](https://img.haomeiwen.com/i7310356/52d5b69494ba8cde.png)
整体流程:当角色确立之后,leader调用leader.lead();方法运行,创建一个接收连接的LearnerCnxAcceptor线程,在LearnerCnxAcceptor线程内部又建立一个阻塞的LearnerCnxAcceptorHandler线程等待Learner端的连接。Learner端以follower为例,follower调用follower.followLeader();方法首先查找leader的Socket服务端,然后建立连接。当follower建立连接后,leader端会建立一个LearnerHandler线程相对应,用来处理follower与leader的数据包传输。
1、follower端封装当前zk服务器的Zxid和Leader.FOLLOWERINFO的LearnerInfo数据包发送给leader
2、leader端这时处于getEpochToPropose方法的阻塞时期,需要得到Learner端超过一半的服务器发送Epoch
3、getEpochToPropose解阻塞之后,LearnerHandler线程会把超过一半的Epoch与leader比较得到最新的newLeaderZxid,并封装成Leader.LEADERINFO包发送给Learner端
4、Learner端得到最新的Epoch,会更新当前服务器的Epoch。并把当前服务器所处的lastLoggedZxid位置封装成Leader.ACKEPOCH发送给leader
5、此时leader端处于waitForEpochAck方法的阻塞时期,需要得到Learner端超过一半的服务器发送EpochACK
6、当waitForEpochAck阻塞之后便可以在LearnerHandler线程内决定用那种方式进行同步。如果Learner端的lastLoggedZxid>leader端的,Learner端将会被删除多余的部分。如果小于leader端的,将会以不同方式进行同步
7、leader端发送Leader.NEWLEADER数据包给Learner端(6、7步骤都是另开一个线程来发送这些数据包)
8、Learner端同步之后,会在一个while循环内处理各种leader端发送数据包,包括两阶段提交的Leader.PROPOSAL、Leader.COMMIT、Leader.INFORM等。在同步数据后会处理Leader.NEWLEADER数据包,然后发送Leader.ACK给leader端
9、此时leader端处于waitForNewLeaderAck阻塞等待超过一半节点发送ACK。
当角色确立之后,需要结合着看
leader调用leader.lead();方法
![](https://img.haomeiwen.com/i7310356/560201df6729efa4.png)
follower调用follower.followLeader();方法
![](https://img.haomeiwen.com/i7310356/59bd2550fc1a5bb1.png)
observer调用observer.observeLeader();方法
![](https://img.haomeiwen.com/i7310356/774fc0ccee46ce98.png)
Leader
首先看看Leader做了些什么
org.apache.zookeeper.server.quorum.Leader#lead
1、loadData加载数据
![](https://img.haomeiwen.com/i7310356/fccd79fff1d2f771.png)
2、创建LearnerCnxAcceptor线程,接收followers的连接
![](https://img.haomeiwen.com/i7310356/7f6d7c73c3cfcf8a.png)
①、LearnerCnxAcceptorHandler接收连接线程
这里加了CountDownLatch根据配置的同步的地址的数量(例如:server.2=127.0.0.1:12882:13882 配置同步的端口是12882只有一个),所以serverSockets大小是一个
org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor#run
等待其他follower或observer连接
![](https://img.haomeiwen.com/i7310356/b13280496675ae6d.png)
②、LearnerCnxAcceptorHandler
这里接收到连接后,便会调用latch.countDown()解阻塞
org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor.LearnerCnxAcceptorHandler#run
![](https://img.haomeiwen.com/i7310356/c85f10f9e949ede6.png)
③、acceptConnections
在这里阻塞接收followers的连接,当有连接过来会生成一个socket对象。然后根据当前socket生成一个LearnerHandler线程
org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor.LearnerCnxAcceptorHandler#acceptConnections
每个Learner者都会开启一个LearnerHandler线程
![](https://img.haomeiwen.com/i7310356/6eb7d5660d96f0f1.png)
④、LearnerHandler
org.apache.zookeeper.server.quorum.LearnerHandler#run
这里就是读取或写数据包与Learner交换数据包。如果没有数据包读取,则会阻塞当前方法ia.readRecord(qp, "packet");
![](https://img.haomeiwen.com/i7310356/a2ec8a3ffb045d19.png)
3、getEpochToPropose
org.apache.zookeeper.server.quorum.Leader#lead
Leader节点在开启LearnerHandler线程之后会继续调用getEpochToPropose方法
![](https://img.haomeiwen.com/i7310356/d87db34657d4327f.png)
org.apache.zookeeper.server.quorum.Leader#getEpochToPropose
判断connectingFollowers发给leader端的Epoch是否过半,如果过半则会解阻塞
![](https://img.haomeiwen.com/i7310356/689db05052eb282b.png)
如果不过半则会一直阻塞在这里,直到Follower把自己的Epoch数据包发送过来并符合过半机制
![](https://img.haomeiwen.com/i7310356/d47e3ddf3f1dc001.png)
4、发送的Epoch过半之后,把当前zxid设置到zk
![](https://img.haomeiwen.com/i7310356/393af577928f9ea3.png)
5、waitForEpochAck
等待EpochAck
![](https://img.haomeiwen.com/i7310356/6c977b02f355e634.png)
org.apache.zookeeper.server.quorum.Leader#waitForEpochAck
是electingFollowers符合过半后唤醒,在Follower发送过半的Leader.ACKEPOCH数据包
![](https://img.haomeiwen.com/i7310356/bd1b913459dab0bc.png)
![](https://img.haomeiwen.com/i7310356/d6b188ad708bf3ab.png)
6、等待EpochAck解阻塞后
把得到最新的epoch更新到当前服务,设置当前leader节点的zab状态是SYNCHRONIZATION
![](https://img.haomeiwen.com/i7310356/2f54cbf9381d56f7.png)
7、等待NewLeaderAck
等待同步数据结束
org.apache.zookeeper.server.quorum.Leader#waitForNewLeaderAck
![](https://img.haomeiwen.com/i7310356/90f1ecd502149f11.png)
![](https://img.haomeiwen.com/i7310356/2cd576f21f094928.png)
8、startZkServer
启动zk服务端,并设置ZAB是ZabState.BROADCAST
![](https://img.haomeiwen.com/i7310356/92c0b352b78c6415.png)
org.apache.zookeeper.server.quorum.Leader#startZkServer
reconfigEnabled开启
![](https://img.haomeiwen.com/i7310356/235b097123eecdb4.png)
启动ZkServer服务端,更新currentVote、设置数据树dataTree.lastProcessedZxid
![](https://img.haomeiwen.com/i7310356/fb5f0855f795f25d.png)
9、ping和不超过过半停止运行
![](https://img.haomeiwen.com/i7310356/b1f280962e35b844.png)
![](https://img.haomeiwen.com/i7310356/f4c3470553137464.png)
![](https://img.haomeiwen.com/i7310356/239e170ec9ca0822.png)
LearnerHandler
LearnerHandler线程是对应于Learner连接Leader端后,建立的一个与Learner端交换数据的线程。每一个Learner端都会创建一个
org.apache.zookeeper.server.quorum.LearnerHandler#run
1、readRecord读取数据包
不断从learner节点读数据,如果没读到将会阻塞readRecord
![](https://img.haomeiwen.com/i7310356/8d8344f77a687a1f.png)
2、FOLLOWERINFO或OBSERVERINFO的数据包
如果数据包类型不是Leader.FOLLOWERINFO或Leader.OBSERVERINFO将会返回
![](https://img.haomeiwen.com/i7310356/75df9a80f513c9d8.png)
3、获取到learnerInfoData
![](https://img.haomeiwen.com/i7310356/ec7dcbbb3cb0d37b.png)
4、得到followerInfo和lastAcceptedEpoch
![](https://img.haomeiwen.com/i7310356/be54d4d06e27903a.png)
5、获取newEpoch通过getEpochToPropose方法
![](https://img.haomeiwen.com/i7310356/b0d29d5ce0f06116.png)
6、发送Leader.LEADERINFO数据包和waitForEpochAck
![](https://img.haomeiwen.com/i7310356/77502815fba9e57d.png)
7、syncFollower同步
needSnap表示是否需要快照同步
![](https://img.haomeiwen.com/i7310356/ba9b90bd728e4802.png)
org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower
如果是新机器启动isPeerNewEpochZxid=0
![](https://img.haomeiwen.com/i7310356/3bd8a6d4b158df71.png)
8、数据树DataTree同步到其他节点
反序列化serializeSnapshot 数据树DataTree到其他节点
![](https://img.haomeiwen.com/i7310356/5f3b48183bf48515.png)
9、Leader.NEWLEADER数据包
把Leader.NEWLEADER数据包放入到queuedPackets
![](https://img.haomeiwen.com/i7310356/50ea85b11e7ce5e7.png)
10、startSendingPackets开始发送数据包
org.apache.zookeeper.server.quorum.LearnerHandler#startSendingPackets
![](https://img.haomeiwen.com/i7310356/0e28d0f4ecce1392.png)
11、waitForNewLeaderAck
![](https://img.haomeiwen.com/i7310356/be6fd36e0f7d4327.png)
12、发送Leader.ACK数据包
![](https://img.haomeiwen.com/i7310356/0eba3acad791fd25.png)
![](https://img.haomeiwen.com/i7310356/7fdd74d4b266eb00.png)
Follower
然后再看看Follower节点做了些那些事情
org.apache.zookeeper.server.quorum.Follower#followLeader
1、这里首先会寻找领导者leader的findLeader
![](https://img.haomeiwen.com/i7310356/5b2c1eb708760213.png)
org.apache.zookeeper.server.quorum.Learner#findLeader
根据投票信息currentVote得到那台是领导者
![](https://img.haomeiwen.com/i7310356/6add99e696abffb6.png)
连接到leader端
![](https://img.haomeiwen.com/i7310356/b8c3c6f63b4a2e38.png)
2、registerWithLeader
在建立连接后,会把follower的信息发送给lead。这些信息包括zxid(就是epoch届数)和sid。目的就是为了统一Epoch
org.apache.zookeeper.server.quorum.Learner#registerWithLeader
![](https://img.haomeiwen.com/i7310356/6c62086190497f42.png)
![](https://img.haomeiwen.com/i7310356/6cab5afa77c4bc42.png)
当有数据包传输,LearnerHandler的readRecord将会被解阻塞。会获取到这个数据包
org.apache.zookeeper.server.quorum.LearnerHandler#run
![](https://img.haomeiwen.com/i7310356/9f1539995a8a6d6b.png)
获取到follower节点的followerInfo信息之后,便会调用到getEpochToPropose方法。
org.apache.zookeeper.server.quorum.LearnerHandler#run
![](https://img.haomeiwen.com/i7310356/430172de0d63bcef.png)
3、把得到的Epoch数据包发送
此时数据包类型参数是Leader.LEADERINFO,newLeaderZxid最新的Leader的Zxid
![](https://img.haomeiwen.com/i7310356/a774f46ea994f567.png)
4、读取lead返回的Leader.LEADERINFO数据包
org.apache.zookeeper.server.quorum.Learner#registerWithLeader
![](https://img.haomeiwen.com/i7310356/0aa0c67fb125bfbe.png)
5、向leader写Leader.ACKEPOCH数据包
org.apache.zookeeper.server.quorum.Learner#registerWithLeader
![](https://img.haomeiwen.com/i7310356/b936f0d7ae0c43a0.png)
6、syncWithLeader
设置当前Follower的ZAB状态是SYNCHRONIZATION
![](https://img.haomeiwen.com/i7310356/4c8847544411f245.png)
org.apache.zookeeper.server.quorum.Learner#syncWithLeader
![](https://img.haomeiwen.com/i7310356/537ce5a55de69300.png)
![](https://img.haomeiwen.com/i7310356/0b139e54679d269c.png)
Leader.TRUNC删除超过的数据zxid
![](https://img.haomeiwen.com/i7310356/35b22922d433bf04.png)
7、Leader.NEWLEADER数据包
发送Leader.ACK数据包给leader
![](https://img.haomeiwen.com/i7310356/8707cd2ef9c4daef.png)
数据同步
![](https://img.haomeiwen.com/i7310356/b8e3d60b0e23efbc.png)
同步数据可以从CommittedLog(提交的日志)、TxnLog日志记录、ZKDatabase zk内存数据数DataTree进行同步
TxnLog日志记录和zk内存数据数DataTree是全的,而CommittedLog是只有一部分
org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower
![](https://img.haomeiwen.com/i7310356/3c7f833609257d05.png)
这里首先判断这台Learner是否第一次启动,里面是否没有任何数据。如果没有isPeerNewEpochZxid=0
把同步的位置peerLastZxid赋值给currentZxid。默认情况下needSnap=true进行快照同步
![](https://img.haomeiwen.com/i7310356/9108de2aec6fce5a.png)
获取CommittedLog最大maxCommittedLog和最小的minCommittedLog位置,并且获取DataTree的lastProcessedZxid
![](https://img.haomeiwen.com/i7310356/dfda1d21425054e6.png)
如果CommittedLog是空的,则把DataTree的lastProcessedZxid赋值给最大maxCommittedLog和最小的minCommittedLog
1、forceSnapSync和lastProcessedZxid == peerLastZxid
![](https://img.haomeiwen.com/i7310356/41a98263aca35a90.png)
forceSnapSync=true是强制快照同步,这里不符合。
lastProcessedZxid == peerLastZxid说明Leader节点与需要同步Learner节点的相等,则不需要同步。Leader.DIFF类型数据包放入queuedPackets队列中
2、peerLastZxid>maxCommittedLog和minCommittedLog<peerLastZxid<maxCommittedLog
![](https://img.haomeiwen.com/i7310356/3133f54e8c70fb58.png)
如果Learner节点需要同步的的位置比Leader节点的maxCommittedLog还大,说明Learner节点数据比较多(可能peerLastZxid=120),要把这部分多的给删除掉(100到120)。发送Leader.TRUNC数据包到queuedPackets队列中,把超过maxCommittedLog的都给删了
如果同步的位置peerLastZxid在maxCommittedLog和minCommittedLog之间(peerLastZxid=85),可以通过CommittedLog直接进行同步。只需要同步peerLastZxid到maxCommittedLog最大位置zxid的数据(85到100)。
![](https://img.haomeiwen.com/i7310356/00566d8308d726f6.png)
3、peerLastZxid<minCommittedLog
如果同步的位置比CommittedLog最小的minCommittedLog还要小(peerLastZxid=60),那么将会要从TxnLog日志中同步peerLastZxid到minCommittedLog和CommittedLog的所有日志。如果TxnLog日志也不全,那么只能通过快照同步needSnap=true,这里没有把needSnap设置为false
![](https://img.haomeiwen.com/i7310356/f06a769131dc67d2.png)
org.apache.zookeeper.server.quorum.LearnerHandler#run
![](https://img.haomeiwen.com/i7310356/2e30769b19da134f.png)
needSnap=true就是把数据树DataTree反序列封装成Leader.SNAP数据包发送给Learner节点。
committedLog
committedLog表示处理过的写请求。committedLog不是一个无限的队列commitLogCount表示存储多少个默认commitLogCount=DEFAULT_COMMIT_LOG_COUNT=500,用两个值minCommittedLog、maxCommittedLog表示这个区间[minCommittedLog,maxCommittedLog],如果超过commitLogCount大小committedLog将会被替换。committedLog的添加是在FinalRequestProcessor处理的
1、processRequest
org.apache.zookeeper.server.FinalRequestProcessor#processRequest
![](https://img.haomeiwen.com/i7310356/d3a864050507e4b2.png)
2、applyRequest
这里也是更新ZkDataBase,触发Watch,集群模式下把Request添加到committedLog
org.apache.zookeeper.server.FinalRequestProcessor#applyRequest
![](https://img.haomeiwen.com/i7310356/6c69199d68f6033e.png)
3、processTxn
org.apache.zookeeper.server.ZooKeeperServer#processTxn(org.apache.zookeeper.server.Request)
![](https://img.haomeiwen.com/i7310356/f28c4ebf800a48e8.png)
boolean quorumRequest = request.isQuorum();
![](https://img.haomeiwen.com/i7310356/8566e290ef4a5a27.png)
![](https://img.haomeiwen.com/i7310356/fb25b136dae65fc3.png)
4、addCommittedProposal
org.apache.zookeeper.server.ZKDatabase#addCommittedProposal(org.apache.zookeeper.server.Request)
把协议Proposal添加到committedLog队列中,
![](https://img.haomeiwen.com/i7310356/1fb9e8647e244ea1.png)
总结:
Zookeeper集群数据同步是在确立角色之后进行的,其关键步骤是通过Epoch和EpochACK的过半验证阻塞之后进行的。在同步之后也需要收到NewLeaderAck超过一半才能正常启动初始化zookeeper的Leader端。
同步的原理是根据Zxid所处的位置来寻找最佳的同步方式
网友评论