美文网首页
无聊到看了眼java线程池源码

无聊到看了眼java线程池源码

作者: 都有米 | 来源:发表于2018-07-03 23:50 被阅读24次

    一、线程池框架

    线程池本质上就是一个任务执行器。我们在使用线程池时使用的实现类就是ThreadPoolExecutorScheduledThreadPoolExecutor。他们之间的关系如下图所示,ScheduledThreadPoolExecutor是继承了ThreadPoolExecutor,并实现了任务调度接口。所以在使用线程池时推荐使用ScheduledThreadPoolExecutor来实现,因为它的功能更丰富。

    线程池

    1、Executor:任务执行器的顶层接口,就简单的一个execute方法,接收一个要执行的任务,没有强制要求异步执行,具体的逻辑由实现类去完成。

    public interface Executor {
        void execute(Runnable command);
    }
    

    2、ExecutorService:执行器服务接口。它继承了接口Executor,在此基础上增加了两类功能接口,1、关闭任务执行器;2、提交任务的方法返回一个Future对象,用来跟踪任务执行的结果。ThreadPoolExecutor是该接口的一个实现类,可以通过实例化ThreadPoolExecutor来使用线程池。

    public interface ExecutorService extends Executor {
        // 关闭执行者,关闭之前添加的任务还会执行,关闭后不再接受新增任务了。
        void shutdown();
        // 执行带返回值的任务
        <T> Future<T> submit(Callable<T> task);
        Future<?> submit(Runnable task);
    }
    

    3、Scheduled:调度接口,将任务列入计划(或时间)表。ScheduledThreadPoolExecutor是在ThreadPoolExecutor基础上实现了调度接口的线程池。任务调度的功能是高频需求,所有我们项目中封装线程池工具时推荐使用ScheduledThreadPoolExecutor来实现。

    public interface ScheduledExecutorService extends ExecutorService {
        // 延时执行任务
        public ScheduledFuture<?> schedule(Runnable command, 
                                                     long delay, 
                                                     TimeUnit unit);
        // 周期性地执行任务
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    }
    

    二、线程池实现

    虽然我推荐大家使用ScheduledThreadPoolExecutor,但是从类图中可以看出它继承自ThreadPoolExecutor,关键的任务执行逻辑都在ThreadPoolExecutor中。所以要了解线程池执行任务的逻辑还得去看ThreadPoolExecutor类的实现。

    线程池

    首先我们看下ThreadPoolExecutor线程池的使用。大致如下所示,创建一个ThreadPoolExecutor实例,然后调用它的execute(runnable)方法来添加任务即可。

        // 定义相关常量
        final int CORE_POOL_SIZE = 2 * CPU_CORE_NUM;
        final int MAX_POOL_SIZE = 3 * CPU_CORE_NUM;
        final int KEEP_ALIVE_TIME = 1;
    
        // 创建任务队列
        final ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(50);
    
        // 创建线程创建工厂
        final AtomicInteger mCount = new AtomicInteger(1);
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "Pool-Thread-"+mCount.getAndIncrement());
            }
        };
    
        // 创建线程池,注意传递的这几个参数,很重要。
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, //核心线程数
                MAX_POOL_SIZE, //最大线程数
                KEEP_ALIVE_TIME,  //空闲线程存活时间
                TimeUnit.SECONDS,  // KEEP_ALIVE_TIME数字的时间单位
                arrayBlockingQueue, // 任务队列
                threadFactory, //线程创建工厂
                new ThreadPoolExecutor.DiscardPolicy() // 任务拒绝策略处理器
        );
    
        // 向线程池中添加任务
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                // do sth
                SDKLogger.d("do sth");
            }
        });
    

    线程池构造参数

    • CORE_POOL_SIZE:正常情况下线程池所拥有的线程数量。当任务数大于该值时,就把任务添加到任务队列中等待。核心线程执行完当前线程中的任务后再去任务队列中获取下一个任务。
    • MAX_POOL_SIZE:当任务特别多,核心线程满了,任务队列也满了,这时线程池就会创建更多的线程来执行任务。但是线程也不能无止境的创建,这个值就是线程池所允许的线程最大数量。
    • KEEP_ALIVE_TIME:创建大量线程把任务执行完了以后,没有新的任务需要执行了,为了节省资源,线程池会保留核心线程,然后在等待该值时间后关闭多余的空闲线程。allowCoreThreadTimeOut方法可以设置,使得线程池也用同样的策略关闭空闲的核心线程,默认是会保留核心线程等待新的任务。
    • TimeUnit.SECONDS: KEEP_ALIVE_TIME数字的时间单位
    • arrayBlockingQueue:存放等待被执行的任务的队列
    • threadFactory:线程创建工厂,需要创建线程时就会调用该类方法来创建
    • new ThreadPoolExecutor.DiscardPolicy():如果线程数量达到最大值了,队列也满了,又来了新的任务就会走拒绝策略。这个策略可以自行实现,直接丢弃任务或者延时重新发送任务都可以。

    接下来重点看下线程池的execute方法。线程池执行任务的关键策略都在这里。线程池添加任务三部曲。

        public void execute(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
    
            /*
             * Proceed in 3 steps:
             *
             * 1. 如果当前运行的线程数量小于核心线程数量(corePoolSize), 
             *  就尝试创建一个新线程来运行新增任务。
             *
             * 2. 如果当前运行的线程数量大于等于核心线程数量(corePoolSize),
             *  就把新增任务添加到任务队列中去。
             *
             * 3. 如果任务队列满了,就创建新线程来执行新增任务。
             *  如果线程数量已经大于最大线程数(maximumPoolSize),就执行拒绝新增任务逻辑。
             */
            int c = ctl.get();
            // workerCountOf方法可以获取当前线程池中的线程数量
           // 第1步:threadSize < corePoolSize
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
    
            // 第2步:  threadSize >= corePoolSize 把任务添加到队列中
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 任务添加到队列后,double check下当前线程池状态
                if (!isRunning(recheck) && remove(command)) {
                    reject(command);
                } else if (workerCountOf(recheck) == 0) {
                    addWorker(null, false);
                }
            // 第3步:  如果队列满了,就创建更多的线程
            //        如果threadSize >= maximumPoolSize  就拒绝接受任务
            } else if (!addWorker(command, false)) {
                reject(command);
            }
        }
    

    三、线程池使用推荐

    官方为了方便大家使用线程池提供了一个执行器工具类Executors,该类提供了一系列工厂方法用于创先线程池。但是我并不推荐大家使用这个工具类来创建线程池。我们来看下它的实现。

    public class Executors {
        // 主要问题:没有设置任务队列容量,默认容量是 Integer.MAX_VALUE ,
        // 堆积的任务可能会耗费非常大的内存,甚至OOM。
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        // 同上
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
        // 主要问题:线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
        // 同上
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        ...
    }
    

    所以线程池不建议使用Executors去创建,而是通过ThreadPoolExecutor或者ScheduledThreadPoolExecutor的方式创建,这样的处理方式可以让大家更加明确线程池的运行规则,规避资源耗尽的风险。 Executors各个方法的弊端:

    1. newFixedThreadPool和newSingleThreadExecutor:
        主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
    2. newCachedThreadPool和newScheduledThreadPool:
        主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

    下面AsyncTask中创建线程池的例子,创建线程池的方式和一些数据经验值,大家都可以借鉴下:

    // 获取虚拟机可用的处理器数量
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    // 核心线程数量至少2个,最多4个;
    // 另外设置核心线程数比处理器数量少一个,是为了避免CPU在后台饱和工作。
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE_SECONDS = 30;
    
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);
    
        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };
    
    // 设置任务队列容量128
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);
    
    public static final Executor THREAD_POOL_EXECUTOR;
    static {
        // 使用ThreadPoolExecutor创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    

    四、分而治之:Fork/Join框架

    补充完整图、使用、实现

    相关文章

      网友评论

          本文标题:无聊到看了眼java线程池源码

          本文链接:https://www.haomeiwen.com/subject/pjfjuftx.html