美文网首页
源码修炼笔记之Dubbo线程池策略

源码修炼笔记之Dubbo线程池策略

作者: 花醉霜寒 | 来源:发表于2020-07-27 15:21 被阅读0次

    FixedThreadPool

        public Executor getExecutor(URL url) {
            //获取指定的线程名称
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            //获取指定的线程数量
            int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
            //获取queue来判定创建阻塞队列
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            //通过ThreadPoolExecutor来创建线程池
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    

    FixThreadPool内部是通过ThreadPoolExecutor来创建线程,核心线程数和最大线程数都是上下文中指定的线程数量threads,因为不存在空闲线程所以keepAliveTime为0,
    当queues=0,创建SynchronousQueue阻塞队列;
    当queues<0,创建无界的阻塞队列LinkedBlockingQueue;
    当queues>0,创建有界的阻塞队列LinkedBlockingQueue。
    采用dubbo自己实现的线程工厂NamedInternalThreadFactory,将线程置为守护线程(Demon)
    拒绝策略为AbortPolicyWithReport,策略为将调用时的堆栈信息保存到本地文件中,并抛出异常RejectedExecutionException

    CachedThreadPool

        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    

    CachedThreadPool与FixedThreadPool的区别是核心线程数和最大线程数不相等,通过alive来控制空闲线程的释放

    LimitedThreadPool

        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    

    LimitedThreadPool与CachedThreadPool的区别是空闲线程的超时时间为Long.MAX_VALUE,相当于线程数量不会动态变化了,创建的线程不会被释放。

    EagerThreadPool

        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
    
            TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
            EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                    threads,
                    alive,
                    TimeUnit.MILLISECONDS,
                    taskQueue,
                    new NamedInternalThreadFactory(name, true),
                    new AbortPolicyWithReport(name, url));
            taskQueue.setExecutor(executor);
            return executor;
        }
    
    

    与上述三种线程池不同,EagerThreadPool并非通过JUC中的ThreadPoolExecutor来创建线程池,而是通过EagerThreadPoolExecutor来创建线程池,EagerThreadPoolExecutor继承自ThreadPoolExecutor,实现自定义的execute方法,采用的阻塞队列是TaskQueue,TaskQueue继承自LinkedBlockingQueue。

        //构造方法
        public EagerThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit, TaskQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        //execute方法
        public void execute(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
            // do not increment in method beforeExecute!
            submittedTaskCount.incrementAndGet();
            try {
                //
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                // retry to offer the task into queue.
                final TaskQueue queue = (TaskQueue) super.getQueue();
                try {
                    if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                        submittedTaskCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.", rx);
                    }
                } catch (InterruptedException x) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } catch (Throwable t) {
                // decrease any way
                submittedTaskCount.decrementAndGet();
                throw t;
            }
        }
    
    

    execute方法首先调用ThreadPoolExecutor的execute方法,如果执行失败会重新放入TaskQueue进行重试。

    实现自定义的ThreadPool

    ThreadPool被定义为一个扩展点,如下所示,

    @SPI("fixed")
    public interface ThreadPool {
    
        /**
         * Thread pool
         *
         * @param url URL contains thread parameter
         * @return thread pool
         */
        @Adaptive({THREADPOOL_KEY})
        Executor getExecutor(URL url);
    
    }
    

    其默认实现是FixedThreadPool,可以通过实现该扩展来实现自定义的线程池策略。

    相关文章

      网友评论

          本文标题:源码修炼笔记之Dubbo线程池策略

          本文链接:https://www.haomeiwen.com/subject/clvzlktx.html