zookeeper集群为了保证数据一致性,使用了两阶段提交。
在zookeeper集群的角色有:leader、follower、observer。
在这几个角色中处理读写请求是不同的:
读请求:从当前节点直接读取数据
写请求:在leader直接进行两阶段提交、在非leader则是把请求转交给leader处理
所以,分析两阶段提交就是分析集群模式下的请求处理。在单机模式在请求处理是经过RequestProcessor请求处理链处理。
单个zookeeprt请求处理主要有以下几步:
1、对当前请求生成日志txn
2、持久化日志txn
3、根据日志txn更新Database
两阶段提交(2PC)步骤:
![](https://img.haomeiwen.com/i7310356/973326ad8a475e16.png)
lead节点请求处理链构建
![](https://img.haomeiwen.com/i7310356/1dc59007e8a970cd.png)
其中标绿色的PrepRequestProcessor、SyncRequestProcessor、CommitProcessor都继承了ZooKeeperCriticalThread是一个线程。
org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors
![](https://img.haomeiwen.com/i7310356/4994d97261535867.png)
org.apache.zookeeper.server.quorum.ProposalRequestProcessor#ProposalRequestProcessor
![](https://img.haomeiwen.com/i7310356/a6b58c292e49e08c.png)
ProposalRequestProcessor中包含SyncRequestProcessor和AckRequestProcessor
LeaderRequestProcessor----->PrepRequestProcessor---->ProposalRequestProcessor(SyncRequestProcessor--->AckRequestProcessor)----->CommitProcessor--->Leader.ToBeAppliedRequestProcessor---->FinalRequestProcessor
1、LeaderRequestProcessor
org.apache.zookeeper.server.quorum.LeaderRequestProcessor#processRequest
![](https://img.haomeiwen.com/i7310356/4230a2b8a742ad0e.png)
①、检查是不是local session本地session,创建临时节点会升级session
org.apache.zookeeper.server.quorum.QuorumZooKeeperServer#checkUpgradeSession
![](https://img.haomeiwen.com/i7310356/5fc28152f0a0152d.png)
![](https://img.haomeiwen.com/i7310356/214778f0af364693.png)
![](https://img.haomeiwen.com/i7310356/fe46e694f6af1946.png)
②、交给下一个请求处理器处理
![](https://img.haomeiwen.com/i7310356/1ce410b3345973c4.png)
2、PrepRequestProcessor
作用与单机模式相同,给请求Request的Hdr和Txn赋值,然后交给下一个请求处理器处理
3、ProposalRequestProcessor(2PC提交协议)
如果是写请求(request.getHdr() != null),则会把当前请求封装为协议并发送给follower。发送之后交给SyncRequestProcessor持久化处理
![](https://img.haomeiwen.com/i7310356/8409b07aaa86d879.png)
org.apache.zookeeper.server.quorum.Leader#propose
![](https://img.haomeiwen.com/i7310356/0325c88d97eb3da2.png)
![](https://img.haomeiwen.com/i7310356/d0818f736d8e23a8.png)
![](https://img.haomeiwen.com/i7310356/e54354e353ec5686.png)
org.apache.zookeeper.server.quorum.Leader#sendPacket
发送到所有其他Followe节点forwardingFollowers
![](https://img.haomeiwen.com/i7310356/8f5ae9eb81221da9.png)
org.apache.zookeeper.server.quorum.LearnerHandler#queuePacket
添加到LearnerHandler的queuedPackets队列中
![](https://img.haomeiwen.com/i7310356/c01b3409c69717ac.png)
org.apache.zookeeper.server.quorum.LearnerHandler#sendPackets
![](https://img.haomeiwen.com/i7310356/12ed4a3d9b97ece5.png)
3.1、SyncRequestProcessor(2PC持久化)
把请求放入到queuedRequests阻塞队列
![](https://img.haomeiwen.com/i7310356/643988ec9fa3b3af.png)
①、对请求进行持久化与单机相同
org.apache.zookeeper.server.SyncRequestProcessor#run
![](https://img.haomeiwen.com/i7310356/01a6eb2abe54ec11.png)
![](https://img.haomeiwen.com/i7310356/b7eb22361efce5d9.png)
②、交给下一个AckRequestProcessor处理
3.2、AckRequestProcessor(两阶段提交leader端处理)
向lead发送自己的ack(2PC发送ACK)
![](https://img.haomeiwen.com/i7310356/21fa8372cf2a42d2.png)
org.apache.zookeeper.server.quorum.Leader#processAck
![](https://img.haomeiwen.com/i7310356/ad55a6e2596b11ab.png)
![](https://img.haomeiwen.com/i7310356/07249b7ba30d33e7.png)
![](https://img.haomeiwen.com/i7310356/54e398a889a7a458.png)
org.apache.zookeeper.server.quorum.Leader#tryToCommit
![](https://img.haomeiwen.com/i7310356/27242e2b6c06c8e4.png)
![](https://img.haomeiwen.com/i7310356/fb8d5fef6f320562.png)
![](https://img.haomeiwen.com/i7310356/0c3ed14f54032108.png)
org.apache.zookeeper.server.quorum.Leader#commit
创建一个Leader.COMMIT数据包并发送所有Follower节点
![](https://img.haomeiwen.com/i7310356/bb0004b96c739851.png)
org.apache.zookeeper.server.quorum.Leader#inform
创建一个INFORM通知包发送给所有观察者Observer节点
![](https://img.haomeiwen.com/i7310356/6dc56d97f1116c1f.png)
org.apache.zookeeper.server.quorum.CommitProcessor#commit
提交当前请求,放入到committedRequests,最终会更新database
![](https://img.haomeiwen.com/i7310356/64db5b28fb12542f.png)
4、CommitProcessor
CommitProcessor类参数:
queuedRequests:表示接收到的请求,没有进行两阶段的提交
queuedWriteRequests:表示接收到的写请求,没有进行两阶段的提交
committedRequests:表示可以提交的请求,在两阶段验证过半之后进行会在本地进行committe操作,便添加到这个队列
commitIsWaiting:表示存在可以提交的请求(committedRequests是否有值,有true)
pendingRequests:是一个map集合,表示每个客户端sessionId的请求
Leader类参数:
outstandingProposals:表示记录提议的请求的队列,符合过半机制之后会移除
toBeApplied:表示记录待生效的请求,在FinalRequestProcessor移除
①、processRequest
org.apache.zookeeper.server.quorum.CommitProcessor#processRequest
![](https://img.haomeiwen.com/i7310356/5bc7fbee30af1a9f.png)
首先判断是否需要两阶段提交。如果需要则会添加到queuedWriteRequests队列
org.apache.zookeeper.server.quorum.CommitProcessor#needCommit
如果是更改操作则返回true
![](https://img.haomeiwen.com/i7310356/4cd23800a68ff527.png)
②、CommitProcessor#run
CommitProcessor是一个线程最主要的是运行run方法
org.apache.zookeeper.server.quorum.CommitProcessor#run
a、commitIsWaiting和requestsToProcess获取
![](https://img.haomeiwen.com/i7310356/63afe0290b89a73b.png)
首先获取commitIsWaiting是否有待提交的(committedRequests有值返回true),requestsToProcess待处理的请求大小
b、wait()等待
如果queuedRequests和committedRequests没有数据则会wait();等待
![](https://img.haomeiwen.com/i7310356/71a465bcdcf73eda.png)
c、pendingRequests
![](https://img.haomeiwen.com/i7310356/af6cf149a593626b.png)
这里表示:如果需要提交,则会直接放入到pendingRequests集合中。如果是个读操作,则会查看当前请求的sessionId是否存在pendingRequests集合,如果存在继续添加到pendingRequests集合。如果都不符合,说明是一个客户端的读请求,直接交给下一个sendToNextProcessor(request);处理
![](https://img.haomeiwen.com/i7310356/342c241b11a43fab.png)
d、然后,再看一下这个while的退出条件。
①、从queuedRequests取出的是空
②、如果queuedRequests数据不为空,那么requestsToProcess是大于0的。这时只有maxReadBatchSize < 0或readsProcessed <= maxReadBatchSize才能退出。
maxReadBatchSize < 0表示默认是-1,如果配置了这个参数当连续读了readsProcessed时,也会退出。
③、pendingRequests和committedRequests不为空
e、commitIsWaiting有待提交的
![](https://img.haomeiwen.com/i7310356/23232f6073874407.png)
从committedRequests取出请求,while循环处理写请求
![](https://img.haomeiwen.com/i7310356/8611e3661c07a717.png)
从pendingRequests集合获取此客户端sessionId的等待集合sessionQueue(可能会有读写)
一个pendingRequests可能会存这样的数据,一个客户端发送这样一系列命令: sessionQueue ={读、读、写、写}
![](https://img.haomeiwen.com/i7310356/0611a1b7f8839312.png)
把第一个请求重新赋值给topPending
![](https://img.haomeiwen.com/i7310356/3bdef3903fa8f289.png)
把当前请求放入到queuesToDrain,把此时请求从committedRequests移除,把提交的数量commitsProcessed加1,把commitsToProcess=maxCommitBatchSize提交处理写的减1,这里为了退出while循环while (commitIsWaiting && !stopped && commitsToProcess > 0) 。最后调用processWrite方法处理这个写请求交给下个处理器处理
![](https://img.haomeiwen.com/i7310356/9ab51cf07fee95a4.png)
f、queuesToDrain
这里是与commitsToProcess结合,commitIsWaiting表示还有待提交的,在处理commitsToProcess个写请求之后退出了,在queuesToDrain中再优先处理一部分读
![](https://img.haomeiwen.com/i7310356/7c79f4e71f4d3861.png)
5、ToBeAppliedRequestProcessor
org.apache.zookeeper.server.quorum.Leader.ToBeAppliedRequestProcessor#processRequest
删除toBeApplied
![](https://img.haomeiwen.com/i7310356/b5fab6067313ea03.png)
follower节点请求处理链构建
![](https://img.haomeiwen.com/i7310356/66eac2039a67e4de.png)
其中标绿色的FollowerRequestProcessor、CommitProcessor、SyncRequestProcessor都继承了ZooKeeperCriticalThread是一个线程。
org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#setupRequestProcessors
![](https://img.haomeiwen.com/i7310356/d02b6e16c045cf1f.png)
开了两条链:
FollowerRequestProcessor(firstProcessor)---->CommitProcessor----->FinalRequestProcessor
SyncRequestProcessor---->SendAckRequestProcessor
1、FollowerRequestProcessor
org.apache.zookeeper.server.quorum.FollowerRequestProcessor#processRequest
请求添加到queuedRequests队列
![](https://img.haomeiwen.com/i7310356/73a3ced30eb675d4.png)
![](https://img.haomeiwen.com/i7310356/240912a24ca783ee.png)
FollowerRequestProcessor是一个线程,会从queuedRequests获取请求
org.apache.zookeeper.server.quorum.FollowerRequestProcessor#run
![](https://img.haomeiwen.com/i7310356/aff6478353a15c0b.png)
![](https://img.haomeiwen.com/i7310356/3c2edd6b510db7a6.png)
org.apache.zookeeper.server.quorum.Learner#request
创建请求转发给lead节点处理
![](https://img.haomeiwen.com/i7310356/810ec7249bcb05e1.png)
createSession和closeSession也会转发给lead节点处理
![](https://img.haomeiwen.com/i7310356/0585be43b746c908.png)
2、SendAckRequestProcessor
org.apache.zookeeper.server.quorum.SendAckRequestProcessor#processRequest
在用SendAckRequestProcessor处理之前会先调用SyncRequestProcessor进行持久化处理,由于与单机或lead处理相同就不单独列出来了。
向领导者发送确认ack包
![](https://img.haomeiwen.com/i7310356/a096cd3089b91572.png)
org.apache.zookeeper.server.quorum.Learner#writePacket
![](https://img.haomeiwen.com/i7310356/f28b847b77b02a59.png)
org.apache.zookeeper.server.quorum.Learner#writePacketNow
![](https://img.haomeiwen.com/i7310356/7966bc745cb1b435.png)
LearnerHandler转发请求
在经过FollowerRequestProcessor处理后,lead端会得到一个Request的请求
org.apache.zookeeper.server.quorum.LearnerHandler#run
![](https://img.haomeiwen.com/i7310356/265010efdf466988.png)
org.apache.zookeeper.server.quorum.Leader#submitLearnerRequest
![](https://img.haomeiwen.com/i7310356/b568eb428a2d1826.png)
org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#submitLearnerRequest
转发到leader的prepRequestProcessor
![](https://img.haomeiwen.com/i7310356/21a8527b02892c14.png)
在连接Follower节点的客户端发送更改命令请求会转发到leader节点的prepRequestProcessor进行处理
两阶段提交Follower端处理
1、run
org.apache.zookeeper.server.quorum.QuorumPeer#run
![](https://img.haomeiwen.com/i7310356/69f4552c03961afe.png)
2、followLeader
org.apache.zookeeper.server.quorum.Follower#followLeader
不断读取从lead端的数据包
![](https://img.haomeiwen.com/i7310356/010abd030456e4d4.png)
①、Follower接收到PROPOSAL协议命令请求
org.apache.zookeeper.server.quorum.Follower#processPacket
![](https://img.haomeiwen.com/i7310356/d96c4ef859126f98.png)
![](https://img.haomeiwen.com/i7310356/4df7457e7b1552db.png)
org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#logRequest
![](https://img.haomeiwen.com/i7310356/6246c6ec61fed193.png)
调用到SyncRequestProcessor处理器处理、SyncRequestProcessor处理完之后便交给SendAckRequestProcessor处理器处理发送ACK数据包
②、Follower接收到commit命令请求
![](https://img.haomeiwen.com/i7310356/79a0233018aaf9f4.png)
org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#commit
![](https://img.haomeiwen.com/i7310356/38df1f432ca5d2b6.png)
调用到commitProcessor处理器处理把请求添加到committedRequests队列,处理完之后会交给FinalRequestProcessor处理器处理,这样在连接Follower客户端的更改操作也会有数据返回
observer节点请求处理链构建
![](https://img.haomeiwen.com/i7310356/2fba0702f571ea45.png)
其中标绿色的ObserverRequestProcessor、CommitProcessor、SyncRequestProcessor都继承了ZooKeeperCriticalThread是一个线程。
org.apache.zookeeper.server.quorum.ObserverZooKeeperServer#setupRequestProcessors
![](https://img.haomeiwen.com/i7310356/035f804ed7a73d2b.png)
也是开了两条链:
ObserverRequestProcessor(firstProcessor)---->CommitProcessor----->FinalRequestProcessor
SyncRequestProcessor---->null
observer节点不参与两阶段提交,所以同步SyncRequestProcessor之后没有ACK确认提交。这样既提高了读效率,又对写效率没有影响。请求处理链与leader、follower的功能相同不再累述。
总结:
zookeeper集群的两阶段提交,是在写操作的情况下发生的。2PC的整体实现逻辑是在RequestProcessor请求处理链处理的。只有在接受到的ACK超过一半才会进行提交,提交的实现逻辑是在CommitProcessor中实现的,CommitProcessor处理器中里面涉及多种集合、队列等参数(需要首先了解这些参数意义,然后再读CommitProcessor源码)。
网友评论