概述
选举-信息流.png继续参考该图进行分析
1. 集群模式启动QuorumPeer.start
public synchronized void start() {
// 加载数据库,比对各个obverser zxid进行数据同步
loadDataBase();
// 启动socket服务端
cnxnFactory.start();
// 执行选举流程
startLeaderElection();
super.start();
}
-
startLeaderElection
开启领导者选举流程; -
super.start
选举结束后,判断当前服务器最终是什么状态然后进行响应的流程处理;
2. 创建选举算法QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = createCnxnManager();
// 等待连接请求的侦听器线程
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
// 构造快速选举领导者算法对象
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
-
case 3
其他三种选举算法都已经过期,默认快速选举算法; -
createCnxnManager()
创建QuorumCnxManager,用来跟其他服务器进行底层通信管理; -
listener.start()
启动连接监听器,默认监听2888端口,等待其他服务器创建连接进行投票选主; -
new FastLeaderElection
创建快速选举算法,默认会启动处理发送选票和接受选票的线程
3. Listener.run
public void run() {
// 重试次数
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
// 监听所有网卡,需要配置quorumListenOnAllIPS=true
if (listenOnAllIPs) {
// Map<Long, QuorumPeer.QuorumServer> view :: config.getServers()
int port = view.get(QuorumCnxManager.this.mySid).electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid).electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid).electionAddr.toString());
ss.bind(addr);
while (!shutdown) {
// 接收其他服务器连接请求,启动第一台时并不会接收到其他服务器连接,因此会阻塞
Socket client = ss.accept();
setSockOpts(client);
// 是否开启异步方式接收连接
if (quorumSaslAuthEnabled) {
// 其实就是在线程池中执行 receiveConnection(client);
receiveConnectionAsync(client);
} else {
// 同步接收连接
receiveConnection(client);
}
numRetries = 0;
}
}
......
}
}
-
ss.setReuseAddress(true)
一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。SO_REUSEADDR用于对TCP套接字处于TIME_WAIT状态下的socket,才可以重复绑定使用。server程序总是应该在调用bind()之前设置SO_REUSEADDR套接字选项。TCP,先调用close()的一方会进入TIME_WAIT状态; -
ss.bind(addr)
绑定监听地址,默认端口2888; -
ss.accept()
接收其他服务器连接请求,启动第一台时并不会接收到其他服务器连接,因此会阻塞; -
setSockOpts(client)
给Socket设置tcpNoDelay,keepAlive,timeout; -
receiveConnection(client)
有连接建立后,通过socket接受请求流;
4. 处理连接QuorumCnxManager.handleConnection
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
Long sid = null;
try {
// Read server id
sid = din.readLong();
......
}
......
// 不允许myid小于自己的机器连自己,减少通信连接
if (sid < this.mySid) {
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
// 关闭小sid连接自己的连接
closeSocket(sock);
// 自己去连接这个小sid
connectOne(sid);
} else {
// 向挑战者服务器发送投票信息的线程
SendWorker sw = new SendWorker(sock, sid);
// 接收胜利者的投票信息的线程
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();
return;
}
}
-
sid < this.mySid
关闭该socket并主动创建当前服务器到sid服务器的连接,即不允许myid小于自己的机器连自己,减少通信连接; -
sw.finish()
senderWorkerMap中已经存在该sid的SendWorker即已经建立连接,则关闭之前连接,终止原SendWorker线程; -
closeSocket(sock)
关闭socket; -
connectOne(sid)
创建当前服务器到sid服务器的连接; -
new SendWorker(sock, sid)
当sid >= this.mySid时,构造当前服务器向sid服务器发送投票信息的线程; -
new RecvWorker(sock, din, sid, sw)
构造当前服务器接收sid服务器发送投票信息的线程; -
queueSendMap.putIfAbsent
构造用于存放发送到sid服务器信息的队列ArrayBlockingQueue,放入queueSendMap中; -
sw.start()/rw.start()
启动SendWorker/RecvWorker线程;
5. 创建连接QuorumCnxManager.connectOne
synchronized public void connectOne(long sid){
// sid的SendWorker不存在说明还没有建立当前server到sid server的连接
if (!connectedToPeer(sid)){
InetSocketAddress electionAddr;
if (view.containsKey(sid)) {
// sid server连接地址
electionAddr = view.get(sid).electionAddr;
} else {
LOG.warn("Invalid server id: " + sid);
return;
}
try {
LOG.debug("Opening channel to server " + sid);
Socket sock = new Socket();
setSockOpts(sock);
// 创建到sid server的连接
sock.connect(view.get(sid).electionAddr, cnxTO);
LOG.debug("Connected to server " + sid);
if (quorumSaslAuthEnabled) {
// 异步初始化连接,最终也是调用 initiateConnection(sock, sid);
initiateConnectionAsync(sock, sid);
} else {
initiateConnection(sock, sid);
}
}
......
}
-
view.get(sid).electionAddr
从配置中获取该sid的地址; -
sock.connect(view.get(sid).electionAddr, cnxTO);
将socket连接到指定服务器; -
initiateConnection(sock, sid)
创建连接,并发送初始连接请求(sid);
6. 启动连接QuorumCnxManager.startConnection
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Sending id and challenge
dout = new DataOutputStream(sock.getOutputStream());
// 发送mysid到sid server
dout.writeLong(this.mySid);
dout.flush();
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
......
}
-
dout.writeLong(this.mySid)
把当前服务器sid发送到对方服务器; -
new DataInputStream(new BufferedInputStream(sock.getInputStream()))
等待对方服务器通过该socket发送消息; -
......
下面流程跟QuorumCnxManager.handleConnection中逻辑一致,即已经建立连接后开始启动SendWorker/RecvWorker线程;
7. SendWorker.run
public void run() {
threadCnt.incrementAndGet();
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
// 如果bq为空,取出当前sid最后一次发送的投票信息,进行发送
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
// 尝试将lastMessage发送到sid
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
// 从bq取出投票信息,为空时阻塞1s,返回null
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " + "server " + sid);
break;
}
if(b != null){
// 放入lastMessageSent
lastMessageSent.put(sid, b);
// 发送投票信息
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue", e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid
+ " my id = " + QuorumCnxManager.this.mySid + " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread");
}
}
-
queueSendMap.get(sid)
获取存放需要发送到sid服务器的消息的阻塞队列; -
bq == null || isSendQueueEmpty(bq)
如果自身或对方在读取/处理最后一条消息之前关闭了它们的连接(并退出线程),有可能会丢失该消息,因此bq为空时重复发送最后一次发送的消息; -
running && !shutdown && sock != null
死循环; -
pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS)
从bq取出消息,为空时阻塞1s,返回null; -
send(b)
把阻塞队列中取出的ByteBuffer(即需要发送出去的投票信息)发送出去;
8. RecvWorker.run
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the message
* 读取第一个int以确定消息的长度,没有数据时会阻塞
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
// 读取投票信息
ByteBuffer message = ByteBuffer.wrap(msgArray);
// 投票信息封装成Message放到recvQueue里面
addToRecvQueue(new Message(message.duplicate(), sid));
}
}
......
}
}
-
din.readInt()
din为创建RecvWorker时构造的socket输入流DataInputStream;从中读取第一个int以确定消息的长度,没有数据时会阻塞; -
ByteBuffer.wrap(msgArray)
读取到ByteBuffer投票信息; -
new Message(message.duplicate(), sid)
把ByteBuffer包装成Message; -
addToRecvQueue
包装后的Message入队recvQueue;
至此Listen的启动流程分析完毕,下面回到2分析快速选举算法的启动
9. 启动快速选举算法FastLeaderElection.starter
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
// 用于发送投票信息或ACKS
sendqueue = new LinkedBlockingQueue<ToSend>();
// 用于接受投票信息
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
Messenger(QuorumCnxManager manager) {
// 处理发送投票信息线程
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
// 处理接收投票信息线程
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
-
sendqueue = new LinkedBlockingQueue<ToSend>()
用于存放包装成ToSend的需要发送出去的消息的队列; -
recvqueue = new LinkedBlockingQueue<Notification>()
于存放包装成Notification的从其他服务器接受到的消息的队列; -
WorkerSender.start()
启动处理发送投票信息线程; -
WorkerReceiver.start()
启动处理接收投票信息线程;
10. WorkerSender.run
public void run() {
while (!stop) {
try {
// 从sendqueue取出首位对象,3秒内没取出返回null
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
// 处理需要发送的ToSend
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch);
manager.toSend(m.sid, requestBuffer);
}
-
sendqueue.poll(3000, TimeUnit.MILLISECONDS)
从sendqueue取出头元素(QuorumPeer.run中选举时或WorkerReceiver.run中选票PK后发送选票信息时放入),3秒内没取出返回null; -
process(m)
处理需要发送的ToSend; -
buildMsg
ToSend转成ByteBuffer; -
manager.toSend
委托QuorumCnxManager进行发送;
11. 发送请求 QuorumCnxManager.toSend
public void toSend(Long sid, ByteBuffer b) {
if (this.mySid == sid) {
b.position(0);
// 向自己发送消息,直接入队recvQueue
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
// 需要发送的信息放入queueSendMap
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);
if (bqExisting != null) {
addToSendQueue(bqExisting, b);
} else {
addToSendQueue(bq, b);
}
// 尝试与ID为sid的服务器建立连接
connectOne(sid);
}
}
-
this.mySid == sid
自己向自己发送信息,不用经过Socket,直接转成Message入队recvQueue; -
addToSendQueue
需要发送的ByteBuffer放到sid对应的ArrayBlockingQueue<ByteBuffer>中,在SendWorker.run中通过socket发送出去;
12. WorkerReceiver.run
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try{
// 别的server发送的投票信息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
if(!self.getVotingView().containsKey(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
} else {
......
boolean backCompatibility = (response.buffer.capacity() == 28);
response.buffer.clear();
// Instantiate Notification and set its attributes
Notification n = new Notification();
// State of peer that sent this message
// 发送此消息的server状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
n.state = ackstate;
n.sid = response.sid;
if(!backCompatibility){
n.peerEpoch = response.buffer.getLong();
} else {
if(LOG.isInfoEnabled()){
LOG.info("Backward compatibility mode, server id=" + n.sid);
}
n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
}
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
// 如果当前server是LOOKING状态,把当前Notification入队recvqueue
recvqueue.offer(n);
// 如果发送此消息的对等方也是LOOKING状态,并且其逻辑时钟滞后,则发送回通知
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(), // electionEpoch
self.getPeerState(),
response.sid,
v.getPeerEpoch()); // 提案的epoch
sendqueue.offer(notmsg);
}
} else {
// 当前server不是LOOKING,对方是LOOKING,发回当前server认为的leader
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id = " +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg;
if(n.version > 0x0) {
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
} else {
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
System.out.println("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
}
-
manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS)
从recvQueue中取Message; -
!self.getVotingView().containsKey(response.sid)
如果是Learner类型的服务器发送的消息,直接回复当前服务器为leader; -
ackstate
对方服务器状态; -
recvqueue.offer(n)
当前服务器是LOOKING状态,把Notification入队recvqueue; -
ackstate == QuorumPeer.ServerState.LOOKING && (n.electionEpoch < logicalclock.get())
当前服务器是LOOKING状态,并且对方也是LOOKING状态,并且其选举轮次滞后,则用当前proposedLeader, proposedZxid, proposedEpoch构造ToSend入队sendqueue,即回复对方; -
ackstate == QuorumPeer.ServerState.LOOKING
当前服务器不是LOOKING,对方是LOOKING,则回复当前服务器认为的leader,即上一节Follower重启的流程;
至此WorkerSender/WorkerReceiver线程启动的流程也分析完了,即1中 startLeaderElection()分析完了;下面分析1中super.start(),即QuorumPeer线程的执行
13. QuorumPeer.run
public void run() {
......
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
......
setBCVote(null);
// lookForLeader()选主即当前server认为的leader
setCurrentVote(makeLEStrategy().lookForLeader());
......
break;
case OBSERVING:
......
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
......
break;
case FOLLOWING:
......
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
......
break;
case LEADING:
.......
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
.......
break;
}
}
}
......
}
-
switch (getPeerState())
不停遍历查看当前服务器状态,选举完成之前默认LOOKING状态; -
makeLEStrategy().lookForLeader()
通过快速选举算法进行选主;
本节仅分析LOOKING状态,其他已经结束选主的状态处理后续单独分析
14. FastLeaderElection.lookForLeader
public Vote lookForLeader() throws InterruptedException {
try {
// 投票箱
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
// 逻辑时钟自增
logicalclock.incrementAndGet();
// 更新提议,如果当前learnerType是PARTICIPANT则proposedLeader设置为本机myid
// 也就是说,在选举之初,所有server刚开始选的都是自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 读取config.servers,每个server构造一个ToSend放入sendqueue;此时会激活WorkerSender.run
sendNotifications();
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
// 发送请求连接的信息,跟其他所有server建立连接
manager.connectAll();
}
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
} else if(self.getVotingView().containsKey(n.sid)) {
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
// 对方epoch大于当前逻辑时钟,即选举轮次
if (n.electionEpoch > logicalclock.get()) {
// 当前轮次更新为对方epoch
logicalclock.set(n.electionEpoch);
// 清空票箱
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
// 更新提案的Leader、zxid、epoch
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
// 把当前认为的提案发送给其他所有server
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
// 对方epoch小于当前选举轮次,直接忽略它的投票信息
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
// 更新提案的Leader、zxid、epoch
updateProposal(n.leader, n.zxid, n.peerEpoch);
// 把当前认为的提案发送给其他所有server
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// 构造选票Vote放入票箱,对方sid -> 对方sid投出的选票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 通过票箱和当前server的选票,判断当前server选票认为的leader否有过半的选票
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
// 验证提议的领导者是否有变化
while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch together.
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
-
recvset
票箱,对方sid -> 对方sid投出的选票; -
logicalclock.incrementAndGet()
逻辑时钟,即选举轮次加1; -
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch())
更新提案,在选举之初,所有server刚开始选的都是自己; -
sendNotifications()
读取config.servers,为每个server构造一个ToSend放入sendqueue,此时先启动的服务器就会在Listener.run的accept中收到请求;
-----LOOKING------
-
recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS)
从recvqueue中取出Notification,200ms超时返回null; -
manager.connectAll()
发送请求连接的信息,跟其他所有server建立连接; -
n.electionEpoch > logicalclock.get()
对方选举轮次大于当前服务器:
1. logicalclock.set(n.electionEpoch)
更新当前logicalclock为对方轮次;
2. recvset.clear()
当前轮次投票已经有结果,清空票箱;
3. totalOrderPredicate
选票PK,先比对epoch,再比对zxid,再比对id;
4. updateProposal
更新提案,更新为PK获胜的服务器的选票;
5. sendNotifications()
把刚更新后的提案广播给其他服务器; -
n.electionEpoch < logicalclock.get()
对方选举轮次小于当前服务器,忽略该选票; -
n.electionEpoch = logicalclock.get()
并且对方选票获胜:
1. updateProposal
更新提案,更新为PK获胜的服务器的选票;
2. sendNotifications()
把刚更新后的提案广播给其他服务器; -
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch))
构造选票Vote放入票箱; -
termPredicate
通过票箱和当前server的选票,判断当前server选票认为的leader否有过半的选票; -
while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null)
在完成选举之前会再等待finalizeWait时间,如果在这段时间收到更新的vote,继续执行选举算法;防止长时间的消息发送延迟导致选举出两个leader的问题; -
n == null
finalizeWait时间后仍然没有消息,则结束选举,清空票箱返回endVote;
-----OBSERVING------
-
OBSERVING
若一个server可以接受到n.state为OBSERVING状态的通知,说明该server是刚刚挂掉的Leader,内容是心跳通讯;
-----FOLLOWING/LEADING------
-
FOLLOWING/LEADING
有三种场景会出现FOLLOWING或LEADING状态消息:
1.
新server(非Observer)加入到正常运行的集群中,其初始状态为LOOKING,默认调用lookForLeader()方法发送推荐自己为leader的消息,当前集群中的Leader和follower收到消息后会给其回复通知,通知的状态分别是FOLLOWING和LEADING;
2.
当前Leader宕机,并不是所有的follower都同时能够感知到Leader挂掉,先感知到的server改变状态为LOOKING,并发送消息给其它server,但其它server还未感知到,所以它们回复给感知到的server的通知状态是FOLLOWING;
3.
本轮选举中其它server已经选举出了新的Leader,并且已经改变了状态,但还没有通知到当前server,已经选举完毕的server向当前server发送通知的状态就是LEADING或FOLLOWING; -
n.electionEpoch == logicalclock.get()
即上面情况3的场景,此时选票过半,则结束选举; -
outofelection.put
此时为正常运行的集群,有新的server加入的场景,这是即LOOKING以外的状态,所有存入到outofelection集合中;、 -
ooePredicate(outofelection, outofelection, n)
正常运行的集群,有新的server加入,该server收到过半的leader选票,则结束选举;
至此,整个选主流程分析完毕
-----------over----------
网友评论