摘要
在SyncRequestProcessor完成日志记录之后,不同角色服务器需要告知ACK代表是如日志记录完成
在leader端,该处理器就是AckRequestProcessor
在Follower端,该处理器就是SendAckRequestProcessor
在observer端,由于observer并没有投票权,不需要对应的处理器
本节主要讲解
AckRequestProcessor的意义,源码
SendAckRequestProcessor的意义,源码
以及两者的异同
AckRequestProcessor
是leader端的处理器
负责在SyncRequestProcessor完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。
源码很简单
class AckRequestProcessor implements RequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(AckRequestProcessor.class);
Leader leader;
AckRequestProcessor(Leader leader) {
this.leader = leader;
}
/**
* Forward the request as an ACK to the leader
*/
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}
public void shutdown() {
// XXX No need to do anything
}
}
核心就是调用leader.processAck函数
这是因为该类本身只有leader端有,并不需要进行网络IO,直接处理ack即可
SendAckRequestProcessor
是Follower端的处理器
负责在SyncRequestProcessor完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。
源码如下
public class SendAckRequestProcessor implements RequestProcessor, Flushable {
private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
Learner learner;
SendAckRequestProcessor(Learner peer) {
this.learner = peer;
}
public void processRequest(Request si) {
if(si.type != OpCode.sync){//不是sync请求就处理
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
null);//生成ACK包
try {
learner.writePacket(qp, false);//发送给leader
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}
public void flush() throws IOException {
try {
learner.writePacket(null, true);//发送一个空的packet
} catch(IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
public void shutdown() {
// Nothing needed
}
}
补充
leader端:AckRequestProcessor调用了Leader#processAck
Follower端:SendAckRequestProcessor 发出ACK后,被LearnerHandler处理到,最终也是调用Leader#processAck,之前在leader的代码讲到过(源码解析42)
这里把该函数在列
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {//针对提议回复ACK的处理逻辑,如果过半验证了就通知所有Learner
if (LOG.isTraceEnabled()) {//log相关
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}",
Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
}
if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack by this method. However,
* the learner sends ack back to the leader after it gets UPTODATE
* so we just ignore the message.
*/
return;
}
if (outstandingProposals.size() == 0) {//没有处理当中的提议
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
}
return;
}
if (lastCommitted >= zxid) {//提议已经处理过了
if (LOG.isDebugEnabled()) {
LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted), Long.toHexString(zxid));
}
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);//获取zxid对应的提议
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
Long.toHexString(zxid), followerAddr);
return;
}
p.ackSet.add(sid);//对应提议的ack集合添加sid记录
if (LOG.isDebugEnabled()) {
LOG.debug("Count for zxid: 0x{} is {}",
Long.toHexString(zxid), p.ackSet.size());
}
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){//过半回复ack
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);//该proposal已经处理完了
if (p.request != null) {
toBeApplied.add(p);//即将应用的队列 添加该proposal
}
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
commit(zxid);//提交,发给所有参与者
inform(p);//告诉所有观察者
zk.commitProcessor.commit(p.request);//leader自己也提交,调用commitProcessor相关逻辑
if(pendingSyncs.containsKey(zxid)){//处理pending住的同步请求
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);//发送同步请求给LearnerSyncRequest记录的server
}
}
}
}
就是收集ACK,如果通过过半验证了,就告诉所有参与者commit,告诉所有observer INFORM,然后调用commitProcessor逻辑,下面一节讲。涉及LearnerSyncRequest 部分是处理client端sync请求的,
可以参照源码阅读42理解。
思考
AckRequestProcessor和SendAckRequestProcessor 两个类的异同
区别是:
前者位于leader端,属性就记录Leader,不会进行网络IO
后者位于Follower端,属性就记录Learner,会给leader发送ACK
都代表事务日志记录完毕
共同是:
两者都没有用生产消费模型,shutdown函数就是空的
refer
《paxos到zk》
网友评论