美文网首页
Thrift源码分析(THsHaServer)

Thrift源码分析(THsHaServer)

作者: 番薯和米饭 | 来源:发表于2020-02-22 18:00 被阅读0次

    HsHaServer模式(半同步半异步)

    THsHaServer类是TNonblockingServer类的子类,在TNonblockingServer模式中,采用的是用一个线程来完成对所有socket的监听和业务处理,造成了效率的低下,比如某次rpc调用是访问数据库,读取大量数据,那么这个线程就会阻塞在这里,而不能去处理其他客户端的请求,THsHaServer模式的引入则是部分解决了这些问题。THsHaServer模式中,引入一个线程池来专门进行业务处理,如下图所示:


    THsHaServer

    从流程图就可以看出HsHaServer是将业务处理交给了专门的线程池来处,理,从而减小了监听线程的压力。

    THsHaServer里的Args类源码

    当你大致读懂了TNonblockingServer类和FrameBuffer类的源码后,THsHaServer的源码相对而言比较简单。下面我们先来看看THsHaServer类里的Args类:

    • Args类里面就新加了实例化工作线程池时候所需要的参数,这些参数有默认值,也可以自己设置。
    • 工作线程可以先实例化后在传进去,也可以不设置,实例化THsHaServer的时候会在构造函数里实例化一个工作线程池
    public static class Args extends AbstractNonblockingServerArgs<Args> {
        // 这里是在设置工作线程池的默认参数,这些参数都在后面初始化线程池用得到
        public int minWorkerThreads = 5;
        public int maxWorkerThreads = Integer.MAX_VALUE;
        private int stopTimeoutVal = 60;
        private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
        // 工作线程池
        private ExecutorService executorService = null;
    
        public Args(TNonblockingServerTransport transport) {
          super(transport);
        }
        /**
         * Sets the min and max threads.
         *
         * @deprecated use {@link #minWorkerThreads(int)} and {@link #maxWorkerThreads(int)}  instead.
         */
        @Deprecated
        public Args workerThreads(int n) {
          minWorkerThreads = n;
          maxWorkerThreads = n;
          return this;
        }
    
        /**
         * @return what the min threads was set to.
         * @deprecated use {@link #getMinWorkerThreads()} and {@link #getMaxWorkerThreads()} instead.
         */
        @Deprecated
        public int getWorkerThreads() {
          return minWorkerThreads;
        }
    
        //设置线程池最少线程
        public Args minWorkerThreads(int n) {
          minWorkerThreads = n;
          return this;
        }
    
        //设置线程池最多线程
        public Args maxWorkerThreads(int n) {
          maxWorkerThreads = n;
          return this;
        }
    
        public int getMinWorkerThreads() {
          return minWorkerThreads;
        }
    
        public int getMaxWorkerThreads() {
          return maxWorkerThreads;
        }
    
        public int getStopTimeoutVal() {
          return stopTimeoutVal;
        }
    
        public Args stopTimeoutVal(int stopTimeoutVal) {
          this.stopTimeoutVal = stopTimeoutVal;
          return this;
        }
    
        public TimeUnit getStopTimeoutUnit() {
          return stopTimeoutUnit;
        }
    
        public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
          this.stopTimeoutUnit = stopTimeoutUnit;
          return this;
        }
    
        public ExecutorService getExecutorService() {
          return executorService;
        }
    
        /**
         * THsHaServer 的工作线程可以自己先初始化一个工作线程池在传进来,也可以不初始化
         * 只设定相关参数后,交由THsHaServer自己实例化工作线程
         */
    
        /**
         * @param executorService
         * @return
         */
    
        // 这里是获取传进来的工作线程
        public Args executorService(ExecutorService executorService) {
          this.executorService = executorService;
          return this;
        }
      }
    

    THsHaServer源码

    THsHaServer类里面新增了一个ExecutorService类型的参数invoker,用来指向工作线程池的,好方便后面调用工作线程池。

      // 实例化THsHaServer的时候,会把工作线程池赋值给invoker,从而方便选择器(selector)方便的调用工作线程
      private final ExecutorService invoker;
    
      private final Args args;
    
      /**
       * Create the server with the specified Args configuration
       */
      public THsHaServer(Args args) {
        super(args);
    
        // 实例化THsHaServer的时候,如果args.executorService存在,也就是工作线程在外面已经实例化了
        // 然后从外面传进来了,则直接赋值给THsHaServer类的invoker参数,如果不是
        // 则在这里调用createInvokerPool()方法实例化工作线程池
        invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
        this.args = args;
      }
    
      /**
       * {@inheritDoc}
       */
    
      @Override
      protected void waitForShutdown() {
        // 阻塞主线程
        joinSelector();
        gracefullyShutdownInvokerPool();
      }
    
      /**
       * Helper to create an invoker pool
       * 实例化工作线程池
       */
      protected static ExecutorService createInvokerPool(Args options) {
        int minWorkerThreads = options.minWorkerThreads;
        int maxWorkerThreads = options.maxWorkerThreads;
        int stopTimeoutVal = options.stopTimeoutVal;
        TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
    
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
          maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
    
        return invoker;
      }
      }
    

    关闭线程池操作

    THsHaServer既然多了个工作线程池,那么当selector线程关闭时,还需要关闭工作线程池,gracefullyShutdownInvokerPool()方法主要是用来关闭线程池的,使用shutdown()的方法来关闭线程,不会立即终止线程池,首先将线程池的状态设置成STOP, 然后尝试停止所有的正在执行或暂停任务的线程。此时,则不能再往线程池中添加任何新的任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

     // 用来阻塞 调用THsHaServer的线程
      @Override
      protected void waitForShutdown() {
        joinSelector();
       //逐渐清空工作线程池
        gracefullyShutdownInvokerPool();
      }
    
      protected ExecutorService getInvoker() {
        return invoker;
      }
    
        // 用来关闭工作线程池
        protected void gracefullyShutdownInvokerPool() {
            // try to gracefully shut down the executor service
            // 使用shutdown()的方法来关闭线程池,如果线程正在处理任务,则让其处理完
            invoker.shutdown();
    
            // Loop until awaitTermination finally does return without a interrupted
            // exception. If we don't do this, then we'll shut down prematurely. We want
            // to let the executorService clear it's task queue, closing client sockets
            // appropriately.
            long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
            long now = System.currentTimeMillis();
            while (timeoutMS >= 0) {
                try {
                    // waitTermination会一直等待,直到线程池状态为TERMINATED或者,等待的时间到达了指定的时间。
                    invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
                    break;
                } catch (InterruptedException ix) {
                    // 等待的过程中出错了则重新计算等待的时间
                    long newnow = System.currentTimeMillis();
                    timeoutMS -= (newnow - now);
                    now = newnow;
                }
            }
        }
                             
    

    重头戏来了, 读到这里读者最好已经阅读了TNonblockingServer和FrameBuffer的源码,、THsHaServer通过重写了requestInvoke()方法,将业务处理交给工作线程池处理,这个方法和下个getRunnable()方法一起看, 获取用户rpc请求后,是通过调用frameBuffer.invoke()方法来调用相应的业务处理, THsHaServer里是将frameBuffer.invoke()方法封装在了Invocation类的run()方法里面, Invocation继承了Runnable类,等于将frameBuffer.invoke()方法封装在了一个Runnable类里面,让后给线程池去处理。

    /**
       * We override the standard invoke method here to queue the invocation for
       * invoker service instead of immediately invoking. The thread pool takes care
       * of the rest.
       */
      @Override
      protected boolean requestInvoke(FrameBuffer frameBuffer) {
        try {
          // 通过getRunnable()获取要执行的业务处理
          Runnable invocation = getRunnable(frameBuffer);
          // 将业务处理传给工作线程池去处理
          invoker.execute(invocation);
          return true;
        } catch (RejectedExecutionException rx) {
          LOGGER.warn("ExecutorService rejected execution!", rx);
          return false;
        }
      }
    
      protected Runnable getRunnable(FrameBuffer frameBuffer){
        // 实例化个Invocation
        return new Invocation(frameBuffer);
      }
    

    Invocation 类源码

    /**
     * An Invocation represents a method call that is prepared to execute, given
     * an idle worker thread. It contains the input and output protocols the
     * thread's processor should use to perform the usual Thrift invocation.
     */
    class Invocation implements Runnable {
      private final FrameBuffer frameBuffer;
    
      public Invocation(final FrameBuffer frameBuffer) {
        this.frameBuffer = frameBuffer;
      }
    
      public void run() {
        frameBuffer.invoke();
      }
    }
    

    总结

    • THsHaServer的优点
      THsHaServer与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。

    • THsHaServer的缺点
      主线程仍然需要完成所有socket的监听接收、数据读取和数据写入操作。当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受,瓶颈在监听socket的主线程上,因为主线程就只有一个。

    相关文章

      网友评论

          本文标题:Thrift源码分析(THsHaServer)

          本文链接:https://www.haomeiwen.com/subject/hzslqhtx.html