线程池的创建
org.apache.zookeeper.server.WorkerService.workers的创建
image.png
image.png
如果threadsAreAssignable=true
则workers会有numWorkerThreads=8个执行器ExecutorService对象,其中corePoolSize=1
通过AcceptThread接收连接
org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#run
image.png
org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#select
selector.select()是个阻塞操作,当有客户端连接。这里会往下执行,此时的事件是OP_ACCEPT事件
image.png
org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#doAccept
创建SocketChannel Socket通道
image.png
设置非阻塞configureBlocking,并获取SelectorThread选择器线程
image.png
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
把当前连接添加到SelectorThread#acceptedQueue队列中,并唤醒阻塞。
image.png
这里需要了解唤醒的是哪里的阻塞呢?
SelectorThread中的select()也调用了selector.select()方法,这个线程也是一直运行的。如果有客户端连接,通过AcceptThread线程处理后添加到acceptedQueue队列后,说明有客户端可能需要读写事件,便会这里解阻塞。如何本来就没有客户端连接,这里是一直阻塞的。
连接后添加到那个selectorThreads的队列中,即如何选择selectorThreads线程?
创建AcceptThread对象时,便会获取selectorIterator的迭代器对象
image.png
在添加队列时,便对selectorIterator进行迭代获取。
SelectorThreads数量可以通过numSelectorThreads设置
image.png
SelectorThread注册读事件
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
如果没有连接事件过来,便会一直阻塞
image.png
能够继续往下执行是由于添加到acceptedQueue队列后,解阻塞了。
这里主要有三步:
1、select()
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#select
这里将会一直阻塞在这selector.select();在第一次循环时,由于SelectionKey对象为null,没有读写事件便会调用②的processAcceptedConnections方法处理这个连接。为什么不能继续执行也就是SelectionKey为什么是null,因为在接收到客户端连接后,并没有注册读或写事件。所以会先调用processAcceptedConnections处理这个连接
image.png
注册了读或写事件之后,才会调用到handleIO方法
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run进行while(!stopped)循环。
image.png
当我们在客户端输入create /aaa 123时,这是一个isWritable事件,会调用到handleIO处理读写事件,交给workerPool工作线程池处理
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#handleIO
image.png
handleIO方法主要步骤包括:
把当前SelectorThread和当前SelectionKey封装为IOWorkRequest对象、获取当前SelectionKey的NIOServerCnxn对象。
NIOServerCnxn#disableSelectable设置selectable.set(false),key.interestOps(0);不能处理当前连接的其他请求
交给workerPool调度workRequest
org.apache.zookeeper.server.WorkerService#schedule
image.png
ScheduledWorkRequest.run()
org.apache.zookeeper.server.WorkerService.ScheduledWorkRequest
image.png
org.apache.zookeeper.server.NIOServerCnxnFactory.IOWorkRequest#doWork
image.png
org.apache.zookeeper.server.NIOServerCnxn#doIO
服务端接收到客户端的命令,读取数据
image.png
image.png
2、processAcceptedConnections
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
把放入到acceptedQueue队列中接收的连接,取出来注册OP_READ读事件,然后添加NIOServerCnxn对象与当前key绑定。这里就是给当前连接添加附加对象NIOServerCnxn,即每一个连接都会有一个表示当前连接的上下文对象。
image.png
org.apache.zookeeper.server.NIOServerCnxnFactory#addCnxn
把当前连接的NIOServerCnxn(Nio服务端上下文)添加到cnxns集合
image.png
3、processInterestOpsUpdateRequests
从updateQueue队列中获取,并更改当前事件
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processInterestOpsUpdateRequests
image.png
总结:
zookeeper默认的接收数据模式是通过NIO。
如果想更改数据接收模式,可以修改zookeeper.serverCnxnFactory配置是org.apache.zookeeper.server.NettyServerCnxnFactory
通过AcceptThread线程接收连接,然后把当前连接通过SelectorThread线程处理读或写事件。
通过一系列的封装,再把封装后的对象交给work工作线程池处理。最终会调用到NIOServerCnxn的doIO处理读写事件。
读事件就是客户端发送的数据,写事件就是服务端返回(响应)的数据
网友评论