在zookeeper集群中,分为Leader,Follewer,Observer三种类型的服务器角色,请求是通过各自的请求处理链来处理,所有的请求处理器均实现了RequestProcessor
接口,通过处理链的上一个请求处理器调用该处理器的processRequest
方法将请求传递过来,这个请求的传递过程是由一个线程完成的。
public interface RequestProcessor {
public static class RequestProcessorException extends Exception {
public RequestProcessorException(String msg, Throwable t) {
super(msg, t);
}
}
//请求处理方法
void processRequest(Request request) throws RequestProcessorException;
void shutdown();
}
下面分别看下不同角色的服务器启动时的请求处理链初始化过程。
Leader请求处理链初始化
Leader的主要工作如下:
- 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
- 集群内部各服务器的调度者。
当leader完成集群间数据的同步时,会启动LeaderZooKeeperServer,初始化请求链。
LeaderZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
//设置LeaderZooKeeperServer.firstProcessor
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
请求处理器都持有下一个请求处理器的引用private final RequestProcessor nextProcessor;
,在上面的构造方法中会设置各自的nextProcessor
,并启动处理器。最后会设置LeaderZooKeeperServer.firstProcessor
为LeaderRequestProcessor
,这个处理器主要是对本地session创建临时节点时的请求预处理,将在=======介绍,它的nextProcessor
为PrepRequestProcessor
。
可大体认为Leader的请求处理链如下:
PrepRequestProcessor
Leader服务器的请求预处理器,进行一些创建请求事务头,事务体,ACL检查和版本检查等的预处理操作。初始化方法为:
public PrepRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("ProcessThread(sid:" + zks.getServerId() + " cport:"
+ zks.getClientPort() + "):", zks.getZooKeeperServerListener());
this.nextProcessor = nextProcessor;
this.zks = zks;
}
主要属性为:
//请求存储队列,该线程启动后会不断从队列中获取请求进行预处理
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
//设置为ProposalRequestProcessor
private final RequestProcessor nextProcessor;
//当前zkServer实例
ZooKeeperServer zks;
ProposalRequestProcessor
Leader服务器的事务投票处理器,也是事务处理流程的发起者。对于非事务请求,它会直接将请求流转到 CommitProcessor
处理器。对于事务请求,除了将请求交给CommitProcessor
处理器外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follewer服务器来发起一次集群内的事务投票。同时,它还会将事务请求交给SyncRequestProcessor
处理器进行事务日志的记录。
初始化过程为:
public ProposalRequestProcessor(LeaderZooKeeperServer zks,
RequestProcessor nextProcessor) {
this.zks = zks;
this.nextProcessor = nextProcessor;
//初始化SyncRequestProcessor
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
}
//启动syncProcessor
public void initialize() {
syncProcessor.start();
}
主要属性为:
LeaderZooKeeperServer zks;
//设置为CommitProcessor
RequestProcessor nextProcessor;
SyncRequestProcessor syncProcessor;
SyncRequestProcessor
是事务日志记录处理器,主要用来将事务请求记录到事务日志文件中,同时会根据条件触发zookeeper进行数据快照。
并在数据同步完成后将请求传递给nextProcessor
,
初始化过程为:
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
running = true;
}
主要属性为:
private final ZooKeeperServer zks;
//请求存储队列
private final LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
//leader服务器中中会设置为AckRequestProcessor
private final RequestProcessor nextProcessor;
//用于异步快照的线程
private Thread snapInProcess = null;
//线程运行状态标志
volatile private boolean running;
AckRequestProcessor
负责在SyncRequestProcessor处理器完成事务日志记录后,向Proposal投票收集器发送ACK反馈,表示当前leader服务器已经完成了对该Proposal的事务日志记录。初始化过程为:
//持有leader引用,调用leader.processAck发送反馈
Leader leader;
AckRequestProcessor(Leader leader) {
this.leader = leader;
}
CommitProcessor
事务提交处理器,对于非事务请求,该处理器会直接将请求交给nextProcessor
处理;对于事务请求,它会等待集群内针对Proposal的投票直到Proposal可被提交,它保证了事务请求的顺序处理。初始化过程为:
//LeaderZooKeeperServer.setupRequestProcessors
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
public CommitProcessor(RequestProcessor nextProcessor, String id,
boolean matchSyncs, ZooKeeperServerListener listener) {
super("CommitProcessor:" + id, listener);
this.nextProcessor = nextProcessor;
this.matchSyncs = matchSyncs;
}
主要属性为:
/** Default: numCores */
public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS =
"zookeeper.commitProcessor.numWorkerThreads";
/** Default worker pool shutdown timeout in ms: 5000 (5s) */
public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
"zookeeper.commitProcessor.shutdownTimeout";
//刚进来的请求存储队列
protected LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
//已经提交的请求队列
protected final LinkedBlockingQueue<Request> committedRequests =
new LinkedBlockingQueue<Request>();
//等待Proposal可被提交的请求,key为session id,value为所关联的session's requests.
protected final Map<Long, LinkedList<Request>> pendingRequests =
new HashMap<Long, LinkedList<Request>>(10000);
/** The number of requests currently being processed */
protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0);
//设置为ToBeAppliedProcessor
RequestProcessor nextProcessor;
/** For testing purposes, we use a separated stopping condition for the
* outer loop.*/
protected volatile boolean stoppedMainLoop = true;
protected volatile boolean stopped = true;
private long workerShutdownTimeoutMS;
//处理已提交的请求
protected WorkerService workerPool;
private Object emptyPoolSync = new Object();
/**
* This flag indicates whether we need to wait for a response to come back from the
* leader or we just let the sync operation flow through like a read. The flag will
* be false if the CommitProcessor is in a Leader pipeline.
*/
//leader服务器默认为false
boolean matchSyncs;
start方法主要是初始化workerPool
并启动线程
public void start() {
int numCores = Runtime.getRuntime().availableProcessors();
int numWorkerThreads = Integer.getInteger(
ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
workerShutdownTimeoutMS = Long.getLong(
ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
LOG.info("Configuring CommitProcessor with "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ " worker threads.");
if (workerPool == null) {
workerPool = new WorkerService(
"CommitProcWork", numWorkerThreads, true);
}
stopped = false;
stoppedMainLoop = false;
super.start();
}
ToBeAppliedProcessor
维护了那些已被CommitProcessor处理过的可被提交的Proposal事务请求的队列leader.toBeApplied
,会将请求交给下一个处理器next
处理,如果是事务请求,next处理完之后需要将请求从toBeApplied
中移除。初始化方法为:
/**
* This request processor simply maintains the toBeApplied list. For
* this to work next must be a FinalRequestProcessor and
* FinalRequestProcessor.processRequest MUST process the request
* synchronously!
*
* @param next
* a reference to the FinalRequestProcessor
*/
ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
if (!(next instanceof FinalRequestProcessor)) {
throw new RuntimeException(ToBeAppliedRequestProcessor.class
.getName()
+ " must be connected to "
+ FinalRequestProcessor.class.getName()
+ " not "
+ next.getClass().getName());
}
this.leader = leader;
this.next = next;
}
主要属性为:
//设置为FinalRequestProcessor
private final RequestProcessor next;
private final Leader leader;
FinalRequestProcessor
最后一个请求处理器,处理请求并构造客户端请求的响应。针对事务事情,会负责将事务应用到内存数据库中。仅有ZooKeeperServer zks
成员变量。
Follewer请求处理链初始化
Follewer服务器的主要工作如下:
- 处理客户端非事务请求,转发事务请求给Leader服务器
- 参与事务请求Proposal的投票
- 参与Leader选举投票
当Follewer完成集群间数据的同步时,会启动FollowerZooKeeperServer,初始化请求链。
FollowerZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
可以看出第一个请求处理器firstProcessor=new FollowerRequestProcessor(this, commitProcessor);
同leader服务器请求处理链的初始化过程,会初始化每个请求处理器的nextProcessor
,并启动处理器。
Follewer的请求处理链如下:
FollowerRequestProcessor
Follewer服务器的第一个请求处理器,识别当前请求是否是事务请求。如果是事务请求,不仅将请求交给nextProcessor,还会将事务请求转发给Leader服务器。初始化过程为:
public FollowerRequestProcessor(FollowerZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("FollowerRequestProcessor:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
}
主要属性为:
FollowerZooKeeperServer zks;
//设置为CommitProcessor
RequestProcessor nextProcessor;
LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
boolean finished = false;
CommitProcessor
在Follewer服务器中,初始化过程为:
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
此时,matchSyncs=true
,表示对于OpCode.sync
同步请求,需要等待leader的响应。
SyncRequestProcessor
在Follewer服务器中,nextProcessor
为SendAckRequestProcessor,会接收leader服务器的请求,并将数据强刷到磁盘上,并将请求交给SendAckRequestProcessor处理
SendAckRequestProcessor
承担事务日志记录反馈的角色,在完成强刷事务日志记录后,会向leader服务器发送ACK消息表明自身完成了事务日志的记录工作。
Observer请求处理链初始化
Observer作用是观察zookeeper集群的最新状态并将变更同步过来,原理同Follewer,对于非事务请求,可进行独立处理。对于事务请求,会转发给Leader服务器处理。但是不参与任何形式的投票。
当Observer完成集群间数据的同步时,会启动ObserverZooKeeperServer,初始化请求链。
ObserverZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
/*
* Observer should write to disk, so that the it won't request
* too old txn from the leader which may lead to getting an entire
* snapshot.
*
* However, this may degrade performance as it has to write to disk
* and do periodic snapshot which may double the memory requirements
*/
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}
可以看出在初始化请求处理链的阶段会将SyncRequestProcessor启动,为了能记录事务日志和定期同步快照信息。但是nextProcessor
为null,不需要参与事务请求的ACK回复。第一个请求处理器firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
同FollowerRequestProcessor的作用。
Observer请求处理链为:
感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~
网友评论