简介
事务投票处理器。Leader服务器事务处理流程的发起者。
对于非事务性请求,ProposalRequestProcessor会直接将请求转发到CommitProcessor处理器,不再做任何处理
对于事务性请求,除了将请求转发到CommitProcessor外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follower服务器来发起一次集群内的事务投票。同时,ProposalRequestProcessor还会将事务请求交付给SyncRequestProcessor进行事务日志的记录。
源码解析
不长,直接贴出来
public class ProposalRequestProcessor implements RequestProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(ProposalRequestProcessor.class);
LeaderZooKeeperServer zks;//leader角色才有这个请求处理器,这里能拿到LeaderZooKeeperServer
RequestProcessor nextProcessor;//下一个处理器
SyncRequestProcessor syncProcessor;//同步处理器
public ProposalRequestProcessor(LeaderZooKeeperServer zks,
RequestProcessor nextProcessor) {
this.zks = zks;
this.nextProcessor = nextProcessor;//一般是CommitProcessor
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);//syncProcessor的后续是AckRequestProcessor
}
/**
* initialize this processor
*/
public void initialize() {//启动同步处理器
syncProcessor.start();
}
public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop");
/* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/
if(request instanceof LearnerSyncRequest){//如果是client的同步的请求
zks.getLeader().processSync((LearnerSyncRequest)request);//特殊处理,不臫走调用链的,根据lastProposed记录,processAck函数异步处理时时给对应的LearnerHandler发送Sync的消息
} else {
nextProcessor.processRequest(request);//交给CommitProcessor处理
if (request.hdr != null) {//如果请求头不为空(是事务请求)
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);//leader发出提议,集群进行投票
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request);//事务请求需要syncProcessor进行处理
}
}
}
public void shutdown() {
LOG.info("Shutting down");
nextProcessor.shutdown();//ProposalRequestProcessor后面两个Processor都要关闭
syncProcessor.shutdown();
}
}
这里就注意他有两个后续的处理器,CommitProcessor(默认)和syncProcessor(事务请求才会涉及)
思考
ProposalRequestProcessor和leader的关系
只有leader才有ProposalRequestProcessor
LeaderZooKeeperServer#setupRequestProcessors
所处位置位于
ProposalRequestProcessor在leader请求处理链的位置
processRequest逻辑
如果是LearnerSyncRequest就直接处理掉(参照42节相关讲解)
否则先让commitProcessor处理,如果是事务请求,还要提议,集群投票,然后再让syncProcessor处理
也就是说,“经过了syncProcessor”处理的请求一定经过了"commitProcessor"的处理
网友评论