zookeeper在启动时,选举步骤大致有以下几个流程
1.第一轮投票
第一轮投票
2.第二轮投票
第二轮投票
3.随从
第三台机器
注:已上选举将Epoch(周期)字段简化了
zookeeper的ZAB选举选票收发核心实现图
image.png
核心代码解析
WorkerSender
- 从sendQueue取ToSend,不是自己的选票转换成SendWorker里面的queueSendMap对象,给到SendWorker使用,自己的选票直接放入recvQueue
WorkerReceiver
- 从RecvWorker的recvQueue取数据,如果收到的选票都是looking状态,接着将选票放入sendQueue,于此同时,收到的选票要放入无界堵塞recvQueue队列,用于选举统计。
SendWorker
- 开启发送线程,一个myid对应一个SenderWorker,通过sid(客户端发送过来的)找到SenderWorker和ArrayBloclkingQueue,容量为1,将队列消息取出,发送。
RecvWorker
- 开启发送线程,一个myid对应一个RecvWorker,从socket中读取数据,封装成Message(msg,sid),放入recvQueue,recvQueue为数组队列,长度100.
FastLeaderElection
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums();
}
/**
* Verifies if a set is a majority. Assumes that ackSet contains acks only
* from votingMembers
*/
public boolean containsQuorum(Set<Long> ackSet) {
return (ackSet.size() > half);
}
网友评论