美文网首页
dubbo线程池

dubbo线程池

作者: SparkOnly | 来源:发表于2021-03-29 21:51 被阅读0次

    类型

    dubbo的线程池类型通过SPI定义在org.apache.dubbo.common.threadpool.ThreadPool文件里
    总共有以下4类

    • fixed(默认)
      org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
      复用固定数量的线程
    public Executor getExecutor(URL url) {
            //threadname, Dubbo
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            //threads, 200
            int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
            //queues, 0
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    • cached
      org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
      默认线程1分钟后就会被回收,新线程会随着请求的到来被创建
    public Executor getExecutor(URL url) {
            //默认Dubbo
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            //默认0
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
            //默认0
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            //默认60000,1分钟
            int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    • limited
      org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
      需要的时候就创建线程,直到达到上限。线程池不会自动收缩
    public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    • eager
      org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool
      核心线程都在使用的时候,对于新任务创建新线程,而不是放入阻塞队列
    public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
    
            // init queue and executor
            TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
            EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                    threads,
                    alive,
                    TimeUnit.MILLISECONDS,
                    taskQueue,
                    new NamedInternalThreadFactory(name, true),
                    new AbortPolicyWithReport(name, url));
            taskQueue.setExecutor(executor);
            return executor;
        }
    

    EagerThreadPoolExecutor核心过程
    记录提交的任务数,如果执行抛出RejectedExecutionException会重新放入队列进行执行

    public void execute(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
            // do not increment in method beforeExecute!
            submittedTaskCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                // retry to offer the task into queue.
                final TaskQueue queue = (TaskQueue) super.getQueue();
                try {
                    if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                        submittedTaskCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.", rx);
                    }
                } catch (InterruptedException x) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } catch (Throwable t) {
                // decrease any way
                submittedTaskCount.decrementAndGet();
                throw t;
            }
        }
    

    TaskQueue的核心过程

    public boolean offer(Runnable runnable) {
            if (executor == null) {
                throw new RejectedExecutionException("The task queue does not have executor!");
            }
            int currentPoolThreadSize = executor.getPoolSize();
            // have free worker. put task into queue to let the worker deal with task.
            // 已提交任务数<当前线程池大小,说明有空闲的worker,直接放入队列让worker处理任务
            if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
                return super.offer(runnable);
            }
            // return false to let executor create new worker.
            // 当前线程池大小< 最大线程池大小,直接返回false,让线程池创建新的worker
            if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
                return false;
            }
            // currentPoolThreadSize >= max
            return super.offer(runnable);
        }
    

    初始化过程

    1. NettyClient(消费端)或NettyServer(服务端)初始化后,会初始化线程池。
      通过调用DefaultExecutorRepository#createExecutorIfAbsent。

    下面两张图分别为NettyClient和NettyServer初始化时候的设值


    消费端
    服务端
    • 消费端
      默认采用cached方式
      AbstractClient类:
    //String THREADPOOL_KEY = "threadpool";
    //String DEFAULT_CLIENT_THREADPOOL = "cached";
     private void initExecutor(URL url) {
            url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
            url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
            executor = executorRepository.createExecutorIfAbsent(url);
        }
    
    • 服务端
      默认采用fixed方式
      从SPI注解上获取到类型
    @SPI("fixed")
    public interface ThreadPool {
        /**
         * Thread pool
         *
         * @param url URL contains thread parameter
         * @return thread pool
         */
        @Adaptive({THREADPOOL_KEY})
        Executor getExecutor(URL url);
    }
    

    DefaultExecutorRepository#createExecutor
    会动态生成ThreadPool$Adaptive对象,默认获取SPI注解上的参数

    private ExecutorService createExecutor(URL url) {
            return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
        }
    

    相关文章

      网友评论

          本文标题:dubbo线程池

          本文链接:https://www.haomeiwen.com/subject/xqsbhltx.html