责任链模式实践之Zookeeper责任链模式
一,责任链模式
定义:责任链模式(Chain of Responsibility)使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系。将这些对象连成一条链,并沿着这条链传递该请求,直到有对象能够处理它。
类型:行为类模式
二,RequestProcessor接口
RequestProcessor接口是Zookeeper责任链模式的接口,这个接口定义了2个方法:
// 处理请求
void processRequest(Request request);
// 关闭请求,清空请求队列。
void shutdown();
三,LinkedBlockingQueue
Zookeeper责任链模式使用阻塞队列LinkedBlockingQueue来保存请求。
其实从这个角度来看的话,Zookeeper责任链模式采用了和消息队列类似的设计思路。责任链模式可以看做是一个微型的MQ。
四,初始化Zookeeper责任链
这里我们以单机版Zookeeper的启动为例来进行说明,集群版的没有找到初始化责任链的代码,有知道的,请不吝赐教。
单机版启动的最后阶段,会创建ServerCnxnFactory,然后配置ServerCnxnFactory。
this.cnxnFactory = ServerCnxnFactory.createFactory();
this.cnxnFactory.configure(config.getClientPortAddress, config.getMaxClientCnxn());
接着就会启动ServerCnxnFactory
this.cnxnFactory.startup(zkServer);
ServerCnxnFactory的具体实现有2个:
NIOServerCnxnFactory
NettyServerCnxnFactory
这里以NIOServerCnxnFactory为例来说明:
// 启动线程
this.start();
// 从ZKDatabase加载数据
zks.startdata();
// 启动ZookeeperServer
zks.startup();
初始化责任链的逻辑就在zks.startup()中,
// 开启会话管理器
this.startSessionTracker();
// 初始化责任链
this.setupRequestProcessors();
// 注册JMX
this.registerJMX();
OK,接下来正式进入Zookeeper责任链模式的初始化。
RequestProcessor finalProcessor = new FinalRequestProcessor(zookeeperServer);
RequestProcessor syncProcessor = new SyncRequestProcessor(zookeeperServer, finalProcessor);
synProcessor.start();
// 设置第一个请求处理器
this.firstProcessor = new PrepRequestProcessor(zookeeperServer, syncProcessor);
this.firstProcessor.start();
五,Zookeeper请求处理器
1,PrepRequestProcessor
PrepRequestProcessor是Zookeeper责任链的第一个请求处理器。PrepRequestProcessor首先会判断当前请求是否是事务请求,如果是事务请求,PrepRequestProcessor会对该请求进行一系列的预处理,比如创建请求事务头、事务体,会话检查,ACL检查,版本检查等等。
Request类中定义了19种操作,有些请求是事务操作,有些请求不是事务操作。针对不同的操作,PrepRequestProcessor会执行相应的逻辑。
2,ProposalRequestProcessor
ProposalRequestProcessor处理器是Leader服务器的事务投票处理器,也是leader服务器事务处理流程的发起者。对于非事务请求,ProposalRequestProcessor会直接将请求流转到CommitRequestProcessor。对于事务请求,除了将请求交给CommitRequestProcessor外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follower服务器进行事务投票,并且还会把事务请求交给SyncRequestProcessor记录事务日志。
3,SyncRequestProcessor
SyncRequestProcessor的主要功能是记录事务日志和快照到ZKDatabase。
4,AckRequestProcessor和SendAckRequestProcessor
AckRequestProcessor是leader特有的处理器。AckRequestProcessor的主要功能是向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。
SendAckRequestProcessor是follower特有的处理器。SendAckRequestProcessor的主要功能是向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录。
observer不参与投票。
5,CommitRequestProcessor
CommitRequestProcessor主要负责将已经完成本机submit的request和已经在集群中达成commit的request匹配,并将匹配后的request交给nextProcessor(即ToBeAppliedRequestProcessor)处理。
6,ToBeAppliedRequestProcessor
ToBeAppliedRequestProcessor是倒数第二个处理器,之后就是FinalRequestProcessor处理器。ToBeAppliedRequestProcessor处理器的功能是收集已经完成事务处理的请求,存入toBeApplied集合中,然后交给FinalRequestProcessor处理器。
7,FinalRequestProcessor
FinalRequestProcessor处理器是最后一个处理器。主要功能是做一些逻辑校验,然后封装响应信息返回客户端。
网友评论