美文网首页Zookeeper
Zookeeper之RequestProcessor链源码分析

Zookeeper之RequestProcessor链源码分析

作者: love111 | 来源:发表于2020-06-11 17:42 被阅读0次
image.png

RequestProcessor初始化结构

RequestProcessor是请求处理链的接口
初始化是在org.apache.zookeeper.server.ZooKeeperServer#setupRequestProcessors


image.png

PrepRequestProcessor的下一个处理器是SyncRequestProcessor,SyncRequestProcessor的下一个处理器是FinalRequestProcessor
PrepRequestProcessor的继承图


image.png
其中,PrepRequestProcessor和SyncRequestProcessor的结构一样,都是实现了Thread的一个线程,所以在这里初始化时便启动了这两个线程。
在具体分析请求处理器之前,需要知道,客户端请求放到哪里了

请求数据

数据接收中最后分析了,对接收客户端请求数据后,便会最终执行到NIOServerCnxn#doIO方法

image.png
初始化时,incomingBuffer=lenBuffer=ByteBuffer.allocate(4) 4个字节大小
image.png
从SocketChannel socket通道读取4字节大小的数据到incomingBuffer
image.png
incomingBuffer.remaining()=0说明已经写到缓冲区的position指针到最大值limit既是这里读取了4字节数据。
为什么会读取4字节数据,4个字节表示从客户端发送的数据包的大小需要占据多大字节数。然后根据前4个字节的值,再读多少就是我们客户端发送的数据
假如create /abc 123命令占据54字节,所以前4个字节表示的值是54
image.png
org.apache.zookeeper.server.NIOServerCnxn#readLength
image.png
把incomingBuffer缓冲区大小设置为读取的长度
读取请求的数据
org.apache.zookeeper.server.NIOServerCnxn#readPayload
image.png
org.apache.zookeeper.server.NIOServerCnxn#readRequest
读取请求
image.png
org.apache.zookeeper.server.ZooKeeperServer#processPacket
image.png
image.png
org.apache.zookeeper.server.ZooKeeperServer#submitRequest
image.png
org.apache.zookeeper.server.ZooKeeperServer#enqueueRequest
image.png
org.apache.zookeeper.server.RequestThrottler#submitRequest
请求限流器对象RequestThrottler把请求添加到submittedRequests队列中,submittedRequests是LinkedBlockingQueue<Request>队列
image.png
所以说,当一个客户端发送一个请求会把数据放入到限流器对象的一个阻塞的队列中submittedRequests
其中,RequestThrottler也是实现了ZooKeeperCriticalThread也是一个线程。
org.apache.zookeeper.server.RequestThrottler#run
image.png
其中,submittedRequests.take()获取是一个阻塞方法
限流判断
image.png
当maxRequests设置了大于0时,会进行限流。当zks中requestsInProcess比maxRequests小,则直接通过限流规则。如果requestsInProcess比maxRequests最大请求数大说明超过限流数,则会休眠stallTime(默认100ms)
image.png
通过ZooKeeperServer zks(zk服务端对象)提交当前请求
org.apache.zookeeper.server.ZooKeeperServer#submitRequestNow
image.png

PrepRequestProcessor

对Request的预处理,对于请求并不是所有的都是符合要求的。所以需要进行验证,然后再对Txn和TxnHeader属性值进行赋值,在持久化时,只需要这两个属性值便可以。
org.apache.zookeeper.server.PrepRequestProcessor#run


image.png

submittedRequests.take()是一个阻塞方法,最后通过pRequest方法处理请求


image.png
org.apache.zookeeper.server.PrepRequestProcessor#pRequest
image.png
调用pRequestHelper方法之后,便会把当前请求交给下一个请求处理器

org.apache.zookeeper.server.SyncRequestProcessor#processRequest


image.png
把处理后的请求添加到下一个请求处理器的queuedRequests队列
1、pRequestHelper

org.apache.zookeeper.server.PrepRequestProcessor#pRequestHelper


image.png

都会调用pRequest2Txn方法,只是传入的CreateRequest类不同。
createContainer、create、create2是CreateRequest。createTTL是createTtlRequest。
deleteContainer、delete是DeleteRequest。setData是SetDataRequest。reconfig是ReconfigRequest。
setACL是SetACLRequest。createSession、closeSession是null。
对于不需要创建Txn的getData、getChildren、ping、checkWatches、removeWatches、addWatch等需要调用会话方法zks.sessionTracker.checkSession

2、pRequest2Txn

org.apache.zookeeper.server.PrepRequestProcessor#pRequest2Txn


image.png

这个方法根据请求类型调用不同处理逻辑,这里是create为例。
其他还包括deleteContainer、delete、setData、reconfig、setACL、createSession、closeSession、check等逻辑

3、pRequest2TxnCreate

org.apache.zookeeper.server.PrepRequestProcessor#pRequest2TxnCreate


image.png

从序列化后的record对象转换为CreateRequest,并获取请求path路径=/abc,acl=1和data=123数据(这里是以create /abc 123为例)


image.png
①、检查ACL
image.png
image.png

检查父节点acl,是否是创建临时节点isSequential


image.png
②、设置持久化txn
image.png
③、生成并记录ChangeRecord
这个对象表示修改记录,为什么会出现这个中间对象。
当我们创建一个节点时,因为在这个步骤只是预处理,并没有对数据进行持久化。如果再次修改这个值,但是如果还没有持久化就会出现问题,所以这个对象就是解决这个问题的。
ChangeRecord parentRecord(父节点的ChangeRecord )
image.png
ChangeRecord nodeRecord(当前节点的ChangeRecord )
image.png

org.apache.zookeeper.server.PrepRequestProcessor#addChangeRecord


image.png

SyncRequestProcessor

SyncRequestProcessor是一个对请求持久化的类,也可以打快照(DataTree进行持久化)。SyncRequestProcessor也是实现线程的类,在queuedRequests.take()方法阻塞
org.apache.zookeeper.server.SyncRequestProcessor#run


image.png
1、DataTree进行持久化

if(!si.isThrottled() && zks.getZKDatabase().append(si))
只要符合shouldSnapshot()条件就可以执行ZooKeeperThread线程打快照


image.png

打快照条件,根据logCount日志数量和logSize日志大小与snapCount快照的比较,具有随机性。
org.apache.zookeeper.server.SyncRequestProcessor#shouldSnapshot


image.png
调用takeSnapshot打快照
org.apache.zookeeper.server.ZooKeeperServer#takeSnapshot()
image.png

org.apache.zookeeper.server.ZooKeeperServer#takeSnapshot(boolean)
DataTree进行持久化


image.png
2、请求Request持久化

if(toFlush.isEmpty())这里为空,说明在下面已经调用了flush刷新方法。对于不符合上面两个if条件的都会把当前请求添加到toFlush队列中。当达到刷新条件便调用flush方法。


image.png

org.apache.zookeeper.server.SyncRequestProcessor#shouldFlush


image.png
①、flush
org.apache.zookeeper.server.SyncRequestProcessor#flush
image.png

②、提交日志txnLog
org.apache.zookeeper.server.ZKDatabase#commit


image.png
org.apache.zookeeper.server.persistence.FileTxnSnapLog#commit
image.png
③、把请求从toFlush取出,交给下一个请求处理器FinalRequestProcessor处理
nextProcessor.processRequest(i)
所以,这个类主要是对请求的持久化和对DataTree打快照

FinalRequestProcessor

org.apache.zookeeper.server.FinalRequestProcessor#processRequest
1、触发Watch
对Watch单独开一章介绍
2、返回Response


image.png

总结:

RequestProcessor是对请求的过滤处理,单机模式有三个请求过滤器。
PrepRequestProcessor:检查ACL、生成改变记录ChangeRecord对象、设置持久化txn
SyncRequestProcessor:Request请求持久化、DataTree打快照
FinalRequestProcessor:触发Watch、返回Response
其中,PrepRequestProcessor和SyncRequestProcessor都是线程类

相关文章

网友评论

    本文标题:Zookeeper之RequestProcessor链源码分析

    本文链接:https://www.haomeiwen.com/subject/mtpbtktx.html