美文网首页
源码修炼笔记之Dubbo线程池策略

源码修炼笔记之Dubbo线程池策略

作者: 花醉霜寒 | 来源:发表于2020-07-27 15:21 被阅读0次

FixedThreadPool

    public Executor getExecutor(URL url) {
        //获取指定的线程名称
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        //获取指定的线程数量
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        //获取queue来判定创建阻塞队列
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        //通过ThreadPoolExecutor来创建线程池
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

FixThreadPool内部是通过ThreadPoolExecutor来创建线程,核心线程数和最大线程数都是上下文中指定的线程数量threads,因为不存在空闲线程所以keepAliveTime为0,
当queues=0,创建SynchronousQueue阻塞队列;
当queues<0,创建无界的阻塞队列LinkedBlockingQueue;
当queues>0,创建有界的阻塞队列LinkedBlockingQueue。
采用dubbo自己实现的线程工厂NamedInternalThreadFactory,将线程置为守护线程(Demon)
拒绝策略为AbortPolicyWithReport,策略为将调用时的堆栈信息保存到本地文件中,并抛出异常RejectedExecutionException

CachedThreadPool

    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

CachedThreadPool与FixedThreadPool的区别是核心线程数和最大线程数不相等,通过alive来控制空闲线程的释放

LimitedThreadPool

    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

LimitedThreadPool与CachedThreadPool的区别是空闲线程的超时时间为Long.MAX_VALUE,相当于线程数量不会动态变化了,创建的线程不会被释放。

EagerThreadPool

    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }

与上述三种线程池不同,EagerThreadPool并非通过JUC中的ThreadPoolExecutor来创建线程池,而是通过EagerThreadPoolExecutor来创建线程池,EagerThreadPoolExecutor继承自ThreadPoolExecutor,实现自定义的execute方法,采用的阻塞队列是TaskQueue,TaskQueue继承自LinkedBlockingQueue。

    //构造方法
    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    //execute方法
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            //
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }

execute方法首先调用ThreadPoolExecutor的execute方法,如果执行失败会重新放入TaskQueue进行重试。

实现自定义的ThreadPool

ThreadPool被定义为一个扩展点,如下所示,

@SPI("fixed")
public interface ThreadPool {

    /**
     * Thread pool
     *
     * @param url URL contains thread parameter
     * @return thread pool
     */
    @Adaptive({THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

其默认实现是FixedThreadPool,可以通过实现该扩展来实现自定义的线程池策略。

相关文章

  • 源码修炼笔记之Dubbo线程池策略

    FixedThreadPool FixThreadPool内部是通过ThreadPoolExecutor来创建线程...

  • 信号量用法:锁

    用于dubbo线程池满的拒绝策略

  • 安卓面试笔记

    OKhttp线程池策略,线程池丢弃策略,jsbridge原理与native交互过程,okhttp源码,子线程之间通...

  • Dubbo 线程池策略和线程模型

    开篇  这篇文章的目的主要是分析下Dubbo当中关于线程池的策略和线程模型,主要从源码角度出发并结合网上一些现成的...

  • Dubbo线程池

    Dubbo的线程模型与线程池策略 Dubbo默认的底层网络通讯使用的是Netty,服务提供方NettyServer...

  • Dubbo学习笔记之SPI

    Dispatcher dubbo的Dispatcher策略: all 所有消息都派发到线程池,包括请求,响应,连接...

  • 从线程池走到dubbo源码

    title: dubbo源码阅读线程池感想 date: 2018-05-28 22:51:10 tags: - J...

  • 源码修炼笔记之线程池ThreadPoolExecutor详解

    多线程编程过程中,用new Thread()的方式需要频繁的创建和销毁线程,线程的创建存在系统调用clone(),...

  • 线程池再探

    线程池源码之execute execute: 总体来说就是: 如果线程的数量小于线程池的核心线程数直接创建线程执行...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

网友评论

      本文标题:源码修炼笔记之Dubbo线程池策略

      本文链接:https://www.haomeiwen.com/subject/clvzlktx.html