
RequestProcessor初始化结构
RequestProcessor是请求处理链的接口
初始化是在org.apache.zookeeper.server.ZooKeeperServer#setupRequestProcessors

PrepRequestProcessor的下一个处理器是SyncRequestProcessor,SyncRequestProcessor的下一个处理器是FinalRequestProcessor
PrepRequestProcessor的继承图

其中,PrepRequestProcessor和SyncRequestProcessor的结构一样,都是实现了Thread的一个线程,所以在这里初始化时便启动了这两个线程。
在具体分析请求处理器之前,需要知道,客户端请求放到哪里了
请求数据
在数据接收中最后分析了,对接收客户端请求数据后,便会最终执行到NIOServerCnxn#doIO方法

初始化时,incomingBuffer=lenBuffer=ByteBuffer.allocate(4) 4个字节大小

从SocketChannel socket通道读取4字节大小的数据到incomingBuffer

incomingBuffer.remaining()=0说明已经写到缓冲区的position指针到最大值limit既是这里读取了4字节数据。
为什么会读取4字节数据,4个字节表示从客户端发送的数据包的大小需要占据多大字节数。然后根据前4个字节的值,再读多少就是我们客户端发送的数据
假如create /abc 123命令占据54字节,所以前4个字节表示的值是54

org.apache.zookeeper.server.NIOServerCnxn#readLength

把incomingBuffer缓冲区大小设置为读取的长度
读取请求的数据
org.apache.zookeeper.server.NIOServerCnxn#readPayload

org.apache.zookeeper.server.NIOServerCnxn#readRequest
读取请求

org.apache.zookeeper.server.ZooKeeperServer#processPacket


org.apache.zookeeper.server.ZooKeeperServer#submitRequest

org.apache.zookeeper.server.ZooKeeperServer#enqueueRequest

org.apache.zookeeper.server.RequestThrottler#submitRequest
请求限流器对象RequestThrottler把请求添加到submittedRequests队列中,submittedRequests是LinkedBlockingQueue<Request>队列

所以说,当一个客户端发送一个请求会把数据放入到限流器对象的一个阻塞的队列中submittedRequests
其中,RequestThrottler也是实现了ZooKeeperCriticalThread也是一个线程。
org.apache.zookeeper.server.RequestThrottler#run

其中,submittedRequests.take()获取是一个阻塞方法
限流判断

当maxRequests设置了大于0时,会进行限流。当zks中requestsInProcess比maxRequests小,则直接通过限流规则。如果requestsInProcess比maxRequests最大请求数大说明超过限流数,则会休眠stallTime(默认100ms)

通过ZooKeeperServer zks(zk服务端对象)提交当前请求
org.apache.zookeeper.server.ZooKeeperServer#submitRequestNow

PrepRequestProcessor
对Request的预处理,对于请求并不是所有的都是符合要求的。所以需要进行验证,然后再对Txn和TxnHeader属性值进行赋值,在持久化时,只需要这两个属性值便可以。
org.apache.zookeeper.server.PrepRequestProcessor#run

submittedRequests.take()是一个阻塞方法,最后通过pRequest方法处理请求

org.apache.zookeeper.server.PrepRequestProcessor#pRequest

调用pRequestHelper方法之后,便会把当前请求交给下一个请求处理器
org.apache.zookeeper.server.SyncRequestProcessor#processRequest

把处理后的请求添加到下一个请求处理器的queuedRequests队列
1、pRequestHelper
org.apache.zookeeper.server.PrepRequestProcessor#pRequestHelper

都会调用pRequest2Txn方法,只是传入的CreateRequest类不同。
createContainer、create、create2是CreateRequest。createTTL是createTtlRequest。
deleteContainer、delete是DeleteRequest。setData是SetDataRequest。reconfig是ReconfigRequest。
setACL是SetACLRequest。createSession、closeSession是null。
对于不需要创建Txn的getData、getChildren、ping、checkWatches、removeWatches、addWatch等需要调用会话方法zks.sessionTracker.checkSession
2、pRequest2Txn
org.apache.zookeeper.server.PrepRequestProcessor#pRequest2Txn

这个方法根据请求类型调用不同处理逻辑,这里是create为例。
其他还包括deleteContainer、delete、setData、reconfig、setACL、createSession、closeSession、check等逻辑
3、pRequest2TxnCreate
org.apache.zookeeper.server.PrepRequestProcessor#pRequest2TxnCreate

从序列化后的record对象转换为CreateRequest,并获取请求path路径=/abc,acl=1和data=123数据(这里是以create /abc 123为例)

①、检查ACL


检查父节点acl,是否是创建临时节点isSequential

②、设置持久化txn

③、生成并记录ChangeRecord
这个对象表示修改记录,为什么会出现这个中间对象。
当我们创建一个节点时,因为在这个步骤只是预处理,并没有对数据进行持久化。如果再次修改这个值,但是如果还没有持久化就会出现问题,所以这个对象就是解决这个问题的。
ChangeRecord parentRecord(父节点的ChangeRecord )

ChangeRecord nodeRecord(当前节点的ChangeRecord )

org.apache.zookeeper.server.PrepRequestProcessor#addChangeRecord

SyncRequestProcessor
SyncRequestProcessor是一个对请求持久化的类,也可以打快照(DataTree进行持久化)。SyncRequestProcessor也是实现线程的类,在queuedRequests.take()方法阻塞
org.apache.zookeeper.server.SyncRequestProcessor#run

1、DataTree进行持久化
if(!si.isThrottled() && zks.getZKDatabase().append(si))
只要符合shouldSnapshot()条件就可以执行ZooKeeperThread线程打快照

打快照条件,根据logCount日志数量和logSize日志大小与snapCount快照的比较,具有随机性。
org.apache.zookeeper.server.SyncRequestProcessor#shouldSnapshot

调用takeSnapshot打快照
org.apache.zookeeper.server.ZooKeeperServer#takeSnapshot()

org.apache.zookeeper.server.ZooKeeperServer#takeSnapshot(boolean)
DataTree进行持久化

2、请求Request持久化
if(toFlush.isEmpty())这里为空,说明在下面已经调用了flush刷新方法。对于不符合上面两个if条件的都会把当前请求添加到toFlush队列中。当达到刷新条件便调用flush方法。

org.apache.zookeeper.server.SyncRequestProcessor#shouldFlush

①、flush
org.apache.zookeeper.server.SyncRequestProcessor#flush

②、提交日志txnLog
org.apache.zookeeper.server.ZKDatabase#commit

org.apache.zookeeper.server.persistence.FileTxnSnapLog#commit

③、把请求从toFlush取出,交给下一个请求处理器FinalRequestProcessor处理
nextProcessor.processRequest(i)
所以,这个类主要是对请求的持久化和对DataTree打快照
FinalRequestProcessor
org.apache.zookeeper.server.FinalRequestProcessor#processRequest
1、触发Watch
对Watch单独开一章介绍
2、返回Response

总结:
RequestProcessor是对请求的过滤处理,单机模式有三个请求过滤器。
PrepRequestProcessor:检查ACL、生成改变记录ChangeRecord对象、设置持久化txn
SyncRequestProcessor:Request请求持久化、DataTree打快照
FinalRequestProcessor:触发Watch、返回Response
其中,PrepRequestProcessor和SyncRequestProcessor都是线程类
网友评论