1.请求接受方式
以NettyServerCnxn为例
2.NettyServerCnxnFactory类,通过设置bootstrap.childHandler(new ChannelInitializer<SocketChannel>),将连接,读写,流控等工作成功的交接给了 CnxnChannelHandler 内部类。
4.在读取数据方法中,接受客户端的请求,客户端指其他follower节点。
5.读取到的数据传递给了queuedBuffer,用来缓冲多buffer信息,是为了应对多线程并发的情况。
image.png
6.将数据接着向下传递。bufferQueue中读取了一段字节流。放入bb中。
image.png
7.接着在 (6)步骤的方法中,传递数据给zookeeperServer
image.png
8.流控设置,默认limit=1000时。当 outstandingCount>1000/(size-1)时(1代表leader节点本身),说明一个客户端达到了平均量的上线,所以理论上按照平均值来算: arg(size-1)>1000,就超过1000的限量。分布式系统限流调用统计,可以参考。 假设 limit = 1000/(size-1),leader服务器始终处理的 limit 个请求,但是其他follower节点累积的请求为 (size-1)limit=1000个,最大等待数,指的是follow中没有转发过来的。用户端发出的请求,被阻塞了,不在接收范围内。
最后按照请求类型的不同,需要做不同的处理,非 auth和sasl的请求,发到责任链中做相应处理。
image.png
image.png
9.责任链模式的请求顺序如下图所示。
image.png
10.在ProposalRquestProcessor处理之后,分为事务和非事务操作。针对事务操作传递给了commitProcessor,commitProcessor中有一个相应的queuedRequests队列,接受请求,依次处理。并发控制就在与此。
而针对非事务的请求,直接交给下一级处理器。
网友评论