美文网首页Zookeeper
Zookeeper之数据接收源码分析(单机模式)

Zookeeper之数据接收源码分析(单机模式)

作者: loveFXX | 来源:发表于2020-06-09 12:02 被阅读0次
    image.png

    线程池的创建

    org.apache.zookeeper.server.WorkerService.workers的创建


    image.png
    image.png

    如果threadsAreAssignable=true
    则workers会有numWorkerThreads=8个执行器ExecutorService对象,其中corePoolSize=1

    通过AcceptThread接收连接

    org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#run


    image.png

    org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#select
    selector.select()是个阻塞操作,当有客户端连接。这里会往下执行,此时的事件是OP_ACCEPT事件


    image.png
    org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#doAccept
    创建SocketChannel Socket通道
    image.png

    设置非阻塞configureBlocking,并获取SelectorThread选择器线程


    image.png
    org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
    把当前连接添加到SelectorThread#acceptedQueue队列中,并唤醒阻塞。
    image.png
    这里需要了解唤醒的是哪里的阻塞呢?
    SelectorThread中的select()也调用了selector.select()方法,这个线程也是一直运行的。如果有客户端连接,通过AcceptThread线程处理后添加到acceptedQueue队列后,说明有客户端可能需要读写事件,便会这里解阻塞。如何本来就没有客户端连接,这里是一直阻塞的。
    连接后添加到那个selectorThreads的队列中,即如何选择selectorThreads线程?
    创建AcceptThread对象时,便会获取selectorIterator的迭代器对象
    image.png
    在添加队列时,便对selectorIterator进行迭代获取。

    SelectorThreads数量可以通过numSelectorThreads设置


    image.png

    SelectorThread注册读事件

    org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
    如果没有连接事件过来,便会一直阻塞


    image.png

    能够继续往下执行是由于添加到acceptedQueue队列后,解阻塞了。
    这里主要有三步:

    1、select()

    org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#select
    这里将会一直阻塞在这selector.select();在第一次循环时,由于SelectionKey对象为null,没有读写事件便会调用②的processAcceptedConnections方法处理这个连接。为什么不能继续执行也就是SelectionKey为什么是null,因为在接收到客户端连接后,并没有注册读或写事件。所以会先调用processAcceptedConnections处理这个连接


    image.png

    注册了读或写事件之后,才会调用到handleIO方法
    org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run进行while(!stopped)循环。


    image.png
    当我们在客户端输入create /aaa 123时,这是一个isWritable事件,会调用到handleIO处理读写事件,交给workerPool工作线程池处理
    org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#handleIO
    image.png

    handleIO方法主要步骤包括:
    把当前SelectorThread和当前SelectionKey封装为IOWorkRequest对象、获取当前SelectionKey的NIOServerCnxn对象。
    NIOServerCnxn#disableSelectable设置selectable.set(false),key.interestOps(0);不能处理当前连接的其他请求
    交给workerPool调度workRequest
    org.apache.zookeeper.server.WorkerService#schedule


    image.png
    ScheduledWorkRequest.run()
    org.apache.zookeeper.server.WorkerService.ScheduledWorkRequest
    image.png
    org.apache.zookeeper.server.NIOServerCnxnFactory.IOWorkRequest#doWork
    image.png
    org.apache.zookeeper.server.NIOServerCnxn#doIO
    服务端接收到客户端的命令,读取数据
    image.png
    image.png
    2、processAcceptedConnections

    org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
    把放入到acceptedQueue队列中接收的连接,取出来注册OP_READ读事件,然后添加NIOServerCnxn对象与当前key绑定。这里就是给当前连接添加附加对象NIOServerCnxn,即每一个连接都会有一个表示当前连接的上下文对象。


    image.png

    org.apache.zookeeper.server.NIOServerCnxnFactory#addCnxn
    把当前连接的NIOServerCnxn(Nio服务端上下文)添加到cnxns集合


    image.png
    3、processInterestOpsUpdateRequests

    从updateQueue队列中获取,并更改当前事件
    org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processInterestOpsUpdateRequests


    image.png

    总结:

    zookeeper默认的接收数据模式是通过NIO。
    如果想更改数据接收模式,可以修改zookeeper.serverCnxnFactory配置是org.apache.zookeeper.server.NettyServerCnxnFactory
    通过AcceptThread线程接收连接,然后把当前连接通过SelectorThread线程处理读或写事件。
    通过一系列的封装,再把封装后的对象交给work工作线程池处理。最终会调用到NIOServerCnxn的doIO处理读写事件。
    读事件就是客户端发送的数据,写事件就是服务端返回(响应)的数据

    相关文章

      网友评论

        本文标题:Zookeeper之数据接收源码分析(单机模式)

        本文链接:https://www.haomeiwen.com/subject/uljstktx.html