美文网首页
Thrift源码分析(TNonblockingServer )

Thrift源码分析(TNonblockingServer )

作者: 番薯和米饭 | 来源:发表于2020-02-18 23:59 被阅读0次

    TNonblockingServer是服务端五个服务器之一,TNonblockingServer的工作模式也是单线程工作,但是该模式与TSimpleServer模式不同之处就是采用NIO的方式,这样可以避免每个客户端阻塞等待,它的accept,read,write都是注册在同一个Selector上,它内部用来处理这几个事件的是SelectAcceptThread线程, 它继承自AbstractNonblockingServer,AbstractNonblockingServer里面封装了一个FrameBuffer作为数据输入输出流的缓冲,同时还充当了rpc的调用,FrameBuffer由后面说明,这篇只说明TNonblockingServer线程io模型。下面先看下TNonblockingServer的流程图。


    TNonblockingServer工作流程图

    TNonblockingServer模式优点:

    相比于TSimpleServer效率提升主要体现在IO多路复用上,TNonblockingServer采用非阻塞IO,同时监控多个socket的状态变化;

    TNonblockingServer模式缺点:

    TNonblockingServer模式在业务处理上还是采用单线程顺序来完成,在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。

    TNonblockingServer服务端使用流程大概如下:

     public static void main(String[] args) {
               private Integer port = 9090;
               //thrift支持的scoker有很多种,这个是非阻塞的socker
               TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
               TNonblockingServer.Args args = new TNonblockingServer.Args(socket);
                //---------------thrift传输协议------------------------------
                //1. TBinaryProtocol      二进制传输协议
                //2. TCompactProtocol     压缩协议 他是基于TBinaryProtocol二进制协议在进一步的压缩,使得体积更小
                //3. TJSONProtocol        Json格式传输协议
                //4. TSimpleJSONProtocol  简单JSON只写协议,生成的文件很容易通过脚本语言解析,实际开发中很少使用
                //5. TDebugProtocol       简单易懂的可读协议,调试的时候用于方便追踪传输过程中的数据
                //-----------------------------------------------------------
                //协议工厂 TCompactProtocol压缩工厂  二进制压缩协议
                arg.protocolFactory(new TCompactProtocol.Factory());
                //---------------thrift传输格式------------------------------
    
    
                //---------------thrift数据传输方式------------------------------
                //1. TSocker            阻塞式Scoker 相当于Java中的ServerSocket
                //2. TFrameTransport    以frame为单位进行数据传输,非阻塞式服务中使用
                //3. TFileTransport     以文件的形式进行传输
                //4. TMemoryTransport   将内存用于IO,Java实现的时候内部实际上是使用了简单的ByteArrayOutputStream
                //5. TZlibTransport     使用zlib进行压缩,与其他传世方式联合使用;java当前无实现所以无法使用
                //传输工厂 更加底层的概念
                arg.transportFactory(new TFramedTransport.Factory());
                //---------------thrift数据传输方式------------------------------
    
                //设置处理器(Processor)工厂
                //processor已经设置好了业务处理逻辑
                arg.processorFactory(new TProcessorFactory(processor));
                TNonblockingServer tNonblockingServer = new TNonblockingServer(arg);
                //启动服务器 
                tNonblockingServer.serve();
    }
    

    我们先来看下TNonblockingServer的类图
    TNonblockingServer继承了抽象类AbstractNonblockingServer,AbstractNonblockingServer又继承了TServer抽象类。


    TNonblockingServer

    我们先来看下TServer接口的源代码。

    /**
     * Generic interface for a Thrift server.
     *
     */
    public abstract class TServer {
    
      public static class Args extends AbstractServerArgs<Args> {
        //参数传进来一个TServerTransport类型的参数,
        //记住名字带有Server多半都是在服务端才用到的参数。
        public Args(TServerTransport transport) {
          super(transport);//一层一层的传入,最后赋值给AbstractServerArgs类里的serverTransport
        }
      }
    
      public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
        //用来存储服务端的socket,通过构造方法传入
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        //一些传输类工厂和协议类工厂,此处实例化是当初默认值
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
        TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
    
        public AbstractServerArgs(TServerTransport transport) {
          serverTransport = transport;
        }
         //省略AbstractServerArgs类里面的方法...
        }
    //省略TServer 类里面的方法...
    }
    

    TServer里面有一个静态类Args,继承了抽象类AbstractServerArgs,AbstractServerArgs的源码也在TServer里面

    public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
        TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
    
        public AbstractServerArgs(TServerTransport transport) {
          serverTransport = transport;
        }
        
        //传入TProcessorFactory
        public T processorFactory(TProcessorFactory factory) {
          this.processorFactory = factory;
          return (T) this;
        }
    
         //new一个TProcessorFactory工厂,将传入的TProcessor 放入TProcessorFactory工厂内,
        public T processor(TProcessor processor) {
          this.processorFactory = new TProcessorFactory(processor);
          return (T) this;
        }
    
        //设置传输工厂
        public T transportFactory(TTransportFactory factory) {
          this.inputTransportFactory = factory;
          this.outputTransportFactory = factory;
          return (T) this;
        }
    
        //设置输入传输工厂
        public T inputTransportFactory(TTransportFactory factory) {
          this.inputTransportFactory = factory;
          return (T) this;
        }
    
        //设置输出传输工厂
        public T outputTransportFactory(TTransportFactory factory) {
          this.outputTransportFactory = factory;
          return (T) this;
        }
    
        //设置协议工厂
        public T protocolFactory(TProtocolFactory factory) {
          this.inputProtocolFactory = factory;
          this.outputProtocolFactory = factory;
          return (T) this;
        }
    
        //设置输入协议工厂
        public T inputProtocolFactory(TProtocolFactory factory) {
          this.inputProtocolFactory = factory;
          return (T) this;
        }
    
        //设置输出协议工厂
        public T outputProtocolFactory(TProtocolFactory factory) {
          this.outputProtocolFactory = factory;
          return (T) this;
        }
      }
    

    TServer 类里面也有一些属性,但是这些属性都是在构造函数里面通过args的属性获得

    public abstract class TServer {
        /**
         * Core processor
         */
        protected TProcessorFactory processorFactory_;
    
        /**
         * Server transport
         *
         */
        protected TServerTransport serverTransport_;
    
        /**
         * Input Transport Factory
         */
        protected TTransportFactory inputTransportFactory_;
    
        /**
         * Output Transport Factory
         */
        protected TTransportFactory outputTransportFactory_;
    
        /**
         * Input Protocol Factory
         */
        protected TProtocolFactory inputProtocolFactory_;
    
        /**
         * Output Protocol Factory
         */
        protected TProtocolFactory outputProtocolFactory_;
    
        private volatile boolean isServing;
    
        protected TServerEventHandler eventHandler_;
    
        // Flag for stopping the server
        // Please see THRIFT-1795 for the usage of this flag
        protected volatile boolean stopped_ = false;
    
        protected TServer(TServer.AbstractServerArgs args) {
            //TServer里的属性在初始化的时候通过args里的属性获取
            processorFactory_ = args.processorFactory;
            serverTransport_ = args.serverTransport;
            inputTransportFactory_ = args.inputTransportFactory;
            outputTransportFactory_ = args.outputTransportFactory;
            inputProtocolFactory_ = args.inputProtocolFactory;
            outputProtocolFactory_ = args.outputProtocolFactory;
        }
    
        /**
         * The run method fires up the server and gets things going.
         * 这个方法用来启动服务,是个抽象方法,详细的启动步骤由他的继承类实现
         */
        public abstract void serve();
    
        /**
         * Stop the server. This is optional on a per-implementation basis. Not
         * all servers are required to be cleanly stoppable.
         */
        public void stop() {
        }
    
        public boolean isServing() {
            return isServing;
        }
    
        protected void setServing(boolean serving) {
            isServing = serving;
        }
    
        public void setServerEventHandler(TServerEventHandler eventHandler) {
            eventHandler_ = eventHandler;
        }
    
        public TServerEventHandler getEventHandler() {
            return eventHandler_;
        }
    
        public boolean getShouldStop() {
            return this.stopped_;
        }
    
        public void setShouldStop(boolean shouldStop) {
            this.stopped_ = shouldStop;
        }
    }
    

    下面我么再来分析AbstractNonblockingServer 的代码

    public abstract class AbstractNonblockingServer extends TServer {
    
    //千篇一律吧AbstractNonblockingServerArgs继承于AbstractServerArgs
      public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
    //多了个属性 最大buffer读取大小
        public long maxReadBufferBytes = 256 * 1024 * 1024;
    
        public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
          super(transport);
          transportFactory(new TFramedTransport.Factory());
        }
      }
    
      /**
       * The maximum amount of memory we will allocate to client IO buffers at a
       * time. Without this limit, the server will gladly allocate client buffers
       * right into an out of memory exception, rather than waiting.
       * 一次分配给客户端IO缓冲区的最大内存量,如果没有这个限制,
       * 服务器分配给客户端缓冲区过大的话会抛出内存不足异常中,而不是等待。
       * 暂时还不懂,看源码英文注释自己想象翻译的
       */
      final long MAX_READ_BUFFER_BYTES;
    
      /**
       * How many bytes are currently allocated to read buffers.
       */
      final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
    
      public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
        super(args);
        MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
      }
    
      /**
       * Begin accepting connections and processing invocations.
       * 服务器通过调用这个方法开始接受客户端请求,进行业务逻辑处理
       */
      public void serve() {
        /**
        * Begin accepting connections and processing invocations.
        * 开启一个线程进行接受客户端请求,进行一系列的逻辑处理,但是这个方法没有实现体,
        * 详细的实现留给继承AbstractNonblockingServer类的类去实现
        */
        if (!startThreads()) {
          return;
        }
    
        // start listening, or exit
        //服务端开始接受连接,此法方法在此类里面已经实现了
        if (!startListening()) {
          return;
        }
    
        //设置服务端是否正在服务为true 
        setServing(true);
    
        // this will block while we serve
        /**
        * Begin accepting connections and processing invocations.
        * 阻塞当前服务,也就是开启这个server的线程,知道开启server的线程被中断
        * 详细的实现留给继承AbstractNonblockingServer类的类去实现
        */
        waitForShutdown();
    
        setServing(false);
    
        // do a little cleanup
        stopListening();
      }
    }
    

    AbstractNonblockingServer 类里面还有个比较重要的抽象类:AbstractSelectThread,SelectThread都继承于这个抽象类

    protected abstract class AbstractSelectThread extends Thread {
        protected Selector selector;
    
        // List of FrameBuffers that want to change their selection interests.
        protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
    
        public AbstractSelectThread() throws IOException {
          this.selector = SelectorProvider.provider().openSelector();
        }
    

    还有两个比较重要的方法

         //这两个方法大部分情况下会在startThreads()方法里面被调用。
         /**
         * Do the work required to read from a readable client. If the frame is
         * fully read, then invoke the method call.
         * 从可以读取数据的客户端读取数据,如果数据块读满了,触发相应的方法回调
         */
        protected void handleRead(SelectionKey key) {
          // 获取附加在SelectionKey上的FrameBuffer
          FrameBuffer buffer = (FrameBuffer) key.attachment();
          // 读取客户端发过来的数据,如果读取失败,清除SelectionKey 
          if (!buffer.read()) {
            cleanupSelectionKey(key);
            return;
          }
    
          // if the buffer's frame read is complete, invoke the method.
          // 如果读取客户端数据成功,requestInvoke触发相应事件,
          // 也就是根据客户端请求,调用服务端相应方法
          if (buffer.isFrameFullyRead()) {
            if (!requestInvoke(buffer)) {
              cleanupSelectionKey(key);
            }
          }
        }
    
        /**
         * Let a writable client get written, if there's data to be written.
         * 让可被写的客户端写数据
         */
        protected void handleWrite(SelectionKey key) {
          FrameBuffer buffer = (FrameBuffer) key.attachment();
          if (!buffer.write()) {
            cleanupSelectionKey(key);
          }
        }
    

    下面开始正式讲解TNonblockingServer的源码
    大家可以看出TNonblockingServer的源码除了老生常谈的Args类,多了个最重要的SelectAcceptThread类参数selectAcceptThread_,看名字就知道这个类肯定继承了Thread类,被当成一个线程了,SelectAcceptThread里面绝对封装了这个线程要处理的逻辑。

    public static class Args extends AbstractNonblockingServerArgs<Args> {
        public Args(TNonblockingServerTransport transport) {
          super(transport);
        }
      }
    
      private SelectAcceptThread selectAcceptThread_;
    
      public TNonblockingServer(AbstractNonblockingServerArgs args) {
        super(args);
      }
    

    源码往下看会看到我们熟悉的startThreads()方法

    /**
       * Start the selector thread to deal with accepts and client messages.
       * 通过这个方法开启线程去处理客户端请求,这个发方法是重写的,将
       * AbstractNonblockingServer抽象类里的方法重写了
       * @return true if everything went ok, false if we couldn't start for some
       * reason.
       */
      @Override
      protected boolean startThreads() {
        // start the selector
        try {
          // 将selectAcceptThread_实例化
          selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
          // 开启线程
          selectAcceptThread_.start();
          return true;
        } catch (IOException e) {
          LOGGER.error("Failed to start selector thread!", e);
          return false;
        }
      }
    

    SelectAcceptThread类也在TNonblockingServer类里面,我们看下SelectAcceptThread的类图和源码


    SelectAcceptThread类图

    SelectAcceptThread就是一个线程,想要的处理逻辑也封装在里面,
    SelectAcceptThread线程中,使用Selector(选择器), 可以使用一个线程处理多个客户端连接。Selector 能够检测多个注册的通道上是否有事件发生(多个Channel以事件的方式可以注册到同一个Selector), 如果有事件发生, 便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道, 也就是管理多个连接和请求。

    • 这些都是NIO的知识,如果不太了解,可以先去学习下NIO。
      /**
       * The thread that will be doing all the selecting, managing new connections
       * and those that still need to be read.
       */
      protected class SelectAcceptThread extends AbstractSelectThread {
    
        // The server transport on which new client transports will be accepted
        // 服务端的 serverTransport ,后面会把它注册到selector
        private final TNonblockingServerTransport serverTransport;
    
        /**
         * Set up the thread that will handle the non-blocking accepts, reads, and
         * writes.
         */
        public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
        throws IOException {
          this.serverTransport = serverTransport;
          // 将serverTransport 注册到了selector,selector是在AbstractSelectThread里声明的
          serverTransport.registerSelector(selector);
        }
    
        public boolean isStopped() {
          return stopped_;
        }
        /**
         * The work loop. Handles both selecting (all IO operations) and managing
         * the selection preferences of all existing connections.
         */
        public void run() {
          try {
            if (eventHandler_ != null) {
              eventHandler_.preServe();
            }
    
            while (!stopped_) {
              // 从这个地方开始进行select()
              select();
              processInterestChanges();
            }
            // 服务端停止以后,回收selector里面的key
            for (SelectionKey selectionKey : selector.keys()) {
              cleanupSelectionKey(selectionKey);
            }
          } catch (Throwable t) {
            LOGGER.error("run() exiting due to uncaught error", t);
          } finally {
            try {
              selector.close();
            } catch (IOException e) {
              LOGGER.error("Got an IOException while closing selector!", e);
            }
            stopped_ = true;
          }
        }
    }
    

    select()方法里面主要是通过seletor循环监控所有的socket,每次selector结束时,获取所有的处于就绪状态的socket,然后通过循环对这些处于就绪状态的socket进行处理

    
        /**
         * Select and process IO events appropriately:
         * If there are connections to be accepted, accept them.
         * If there are existing connections with data waiting to be read, read it,
         * buffering until a whole frame has been read.
         * If there are any pending responses, buffer them until their target client
         * is available, and then send the data.
         */
        private void select() {
          try {
            // wait for io events.
            /**
            * 线程阻塞在这里,seletor循环监控所有的IO事件, 
            * 每次selector结束时,获取所有的处于就绪状态的IO事件。
            */
            selector.select();
    
            // process the io events we received
            /**
            * 获取所有的处于就绪状态的io事件,通过遍历这些io事件来进行相应的处理,
            * 每个客户端第一次连接后的socket,注册到selector后,都有一个SelectionKey与之对应
            * 同理 服务端也注册到selector了,也有SelectionKey与之对应
            */
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (!stopped_ && selectedKeys.hasNext()) {
              SelectionKey key = selectedKeys.next();
              selectedKeys.remove();
    
              // skip if not valid
              if (!key.isValid()) {
                cleanupSelectionKey(key);
                continue;
              }
    
              // if the key is marked Accept, then it has to be the server transport.
              
              if (key.isAcceptable()) {
                 // 服务端的transport接收到了客服端发来创建连接的请求,所以accept()
                handleAccept();
              } else if (key.isReadable()) {
                // deal with reads
                // 方法详细实现在AbstractSelectThread类里面
                handleRead(key);
              } else if (key.isWritable()) {
                // deal with writes
                 // 方法详细实现在AbstractSelectThread类里面
                handleWrite(key);
              } else {
                LOGGER.warn("Unexpected state in select! " + key.interestOps());
              }
            }
          } catch (IOException e) {
            LOGGER.warn("Got an IOException while selecting!", e);
          }
        }
    
        protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
            final SelectionKey selectionKey,
            final AbstractSelectThread selectThread) {
            return processorFactory_.isAsyncProcessor() ?
                      new AsyncFrameBuffer(trans, selectionKey, selectThread) :
                      new FrameBuffer(trans, selectionKey, selectThread);
        }
    
        /**
         * Accept a new connection.
         */
        private void handleAccept() throws IOException {
          SelectionKey clientKey = null;
          TNonblockingTransport client = null;
          try {
            // accept the connection
            // 接受客户端过来的连接请求
            client = (TNonblockingTransport)serverTransport.accept();
            // 将客户端注册到Selector里,以后就可以处理新加入客户端的请求了
            clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
    
             // 实例化一个FrameBuffer,将实例化的FrameBuffer
             FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
    
             // 将实例化的FrameBuffer添加到新生成的客户端的SelectionKey 
             // 因为在handleRead()和handleWrite()方法里面都是在这个FrameBuffer里面获取数据,
             // 调用服务端的方法,进行相应的业务处理后,再发送给客户端
              clientKey.attach(frameBuffer);
          } catch (TTransportException tte) {
            // something went wrong accepting.
            LOGGER.warn("Exception trying to accept!", tte);
            if (clientKey != null) cleanupSelectionKey(clientKey);
            if (client != null) client.close();
          }
        }
      } // SelectAcceptThread
    

    相关文章

      网友评论

          本文标题:Thrift源码分析(TNonblockingServer )

          本文链接:https://www.haomeiwen.com/subject/vdxnfhtx.html