美文网首页
juc-executors 执行器框架

juc-executors 执行器框架

作者: hehehehe | 来源:发表于2020-11-16 16:42 被阅读0次
    三个核心接口来满足使用者的需求:

    Executor:提交普通的可执行任务
    ExecutorService:提供对线程池生命周期的管理、异步任务的支持
    ScheduledExecutorService:提供对任务的周期性执行支持


    image.png image.png
    ThreadPoolExecutor简介

    ThreadPoolExecutor并没有自己直接实现ExecutorService接口,因为它只是其中一种Executor的实现而已,所以Doug Lea把一些通用部分封装成一个抽象父类——AbstractExecutorService,供J.U.C中的其它执行器继承。如果读者需要自己实现一个Executor,也可以继承该抽象类。


    image.png
    AbstractExecutorService

    AbstractExecutorService提供了 ExecutorService 接口的默认实现——主要实现了 submit、invokeAny 、invokeAll这三类方法,如果读者看过上一篇综述文章,就应该知道,ExecutorService的这三类方法几乎都是返回一个Future对象。而Future是一个接口,AbstractExecutorService既然实现了这些方法,必然要实现该Future接口,我们来看下AbstractExecutorService实现的submit方法:

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    
    image.png
    线程池

    ThreadPoolExecutor中只有一种类型的线程,名叫Worker,它是ThreadPoolExecutor定义的内部类,同时封装着Runnable任务和执行该任务的Thread对象,我们称它为【工作线程】,它也是ThreadPoolExecutor唯一需要进行维护的线程;
    【核心线程池】【非核心线程池】都是逻辑上的概念,ThreadPoolExecutor在任务调度过程中会根据corePoolSize和maximumPoolSize的大小,判断应该如何调度任务.
    ThreadPoolExecutor一共定义了5种线程池状态:
    RUNNING : 接受新任务, 且处理已经进入阻塞队列的任务
    SHUTDOWN : 不接受新任务, 但处理已经进入阻塞队列的任务
    STOP : 不接受新任务, 且不处理已经进入阻塞队列的任务, 同时中断正在运行的任务
    TIDYING : 所有任务都已终止, 工作线程数为0, 线程转化为TIDYING状态并准备调用terminated方法
    TERMINATED : terminated方法已经执行完成

    image.png
    Worker

    Worker被定义为ThreadPoolExecutor的内部类,实现了AQS框架,ThreadPoolExecutor通过一个HashSet来保存工作线程:

    /**
     * 工作线程集合.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
    
    /**
     * Worker表示线程池中的一个工作线程, 可以与任务相关联.
     * 由于实现了AQS框架, 其同步状态值的定义如下:
     * -1: 初始状态
     * 0:  无锁状态
     * 1:  加锁状态
     */
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
     
        /**
         * 与该Worker关联的线程.
         */
        final Thread thread;
        /**
         * Initial task to run.  Possibly null.
         */
        Runnable firstTask;
        /**
         * Per-thread task counter
         */
        volatile long completedTasks;
     
     
        Worker(Runnable firstTask) {
            setState(-1); // 初始的同步状态值
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
     
        /**
         * 执行任务
         */
        public void run() {
            runWorker(this);
        }
     
        /**
         * 是否加锁
         */
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
     
        /**
         * 尝试获取锁
         */
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
     
        /**
         * 尝试释放锁
         */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
     
        public void lock() {
            acquire(1);
        }
     
        public boolean tryLock() {
            return tryAcquire(1);
        }
     
        public void unlock() {
            release(1);
        }
     
        public boolean isLocked() {
            return isHeldExclusively();
        }
     
        /**
         * 中断线程(仅任务非初始状态)
         */
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    通过Worker的定义可以看到,每个Worker对象都有一个Thread线程对象与它相对应,当任务需要执行的时候,实际是调用内部Thread对象的start方法,而Thread对象是在Worker的构造器中通过getThreadFactory().newThread(this)方法创建的,创建的Thread将Worker自身作为任务,所以当调用Thread的start方法时,最终实际是调用了Worker.run()方法,该方法内部委托给runWorker方法执行任务,这个方法我们后面会详细介绍。

    线程工厂
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
    
    /**
     * 默认的线程工厂.
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
     
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }
     
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    

    为什么需要用ThreadFactory来创建线程,而不是直接通过new Thread()的方式。这个问题在executors框架概述中已经谈过了,这样做的好处是:一来解耦对象的创建与使用,二来可以批量配置线程信息(优先级、线程名称、是否守护线程等),以自由设置池子中所有线程的状态。

    线程池的调度流程

    ExecutorService的核心方法是submit方法——用于提交一个待执行的任务,如果读者阅读ThreadPoolExecutor的源码,会发现它并没有覆写submit方法,而是沿用了父类AbstractExecutorService的模板,然后自己实现了execute方法:

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {              // CASE1: 工作线程数 < 核心线程池上限
            if (addWorker(command, true))             // 添加工作线程并执行
                return;
            c = ctl.get();
        }
     
        // 执行到此处, 说明工作线程创建失败 或 工作线程数≥核心线程池上限
        if (isRunning(c) && workQueue.offer(command)) {     // CASE2: 插入任务至队列
     
            // 再次检查线程池状态
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } else if (!addWorker(command, false))        // CASE3: 插入队列失败, 判断工作线程数 < 总线程池上限
            reject(command);    // 执行拒绝策略
    }
    
    • 如果工作线程数小于核心线程池上限(CorePoolSize),则直接新建一个工作线程并执行任务;
    • 如果工作线程数大于等于CorePoolSize,则尝试将任务加入到队列等待以后执行。如果加入队列失败了(比如队列已满的情况),则在总线程池未满的情况下(CorePoolSize ≤ 工作线程数 < maximumPoolSize)新建一个工作线程立即执行任务,否则执行拒绝策略。
    工作线程的创建
    /**
     * 添加工作线程并执行任务
     *
     * @param firstTask 如果指定了该参数, 表示将立即创建一个新工作线程执行该firstTask任务; 否则复用已有的工作线程,从工作队列中获取任务并执行
     * @param core      执行任务的工作线程归属于哪个线程池:  true-核心线程池  false-非核心线程池
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);             // 获取线程池状态
     
            /**
             * 这个if主要是判断哪些情况下, 线程池不再接受新任务执行, 而是直接返回.总结下, 有以下几种情况:
             * 1. 线程池状态为 STOP 或 TIDYING 或 TERMINATED: 线程池状态为上述任一一种时, 都不会再接受任务,所以直接返回
             * 2. 线程池状态≥ SHUTDOWN 且 firstTask != null: 因为当线程池状态≥ SHUTDOWN时, 不再接受新任务的提交,所以直接返回
             * 3. 线程池状态≥ SHUTDOWN 且 队列为空: 队列中已经没有任务了, 所以也就不需要执行任何任务了,可以直接返回
             */
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;
     
            for (; ; ) {
                int wc = workerCountOf(c);      // 获取工作线程数
     
                /**
                 * 这个if主要是判断工作线程数是否超限, 以下任一情况属于属于超限, 直接返回:
                 * 1. 工作线程数超过最大工作线程数(2^29-1)
                 * 2. 工作线程数超过核心线程池上限(入参core为true, 表示归属核心线程池)
                 * 3. 工作线程数超过总线程池上限(入参core为false, 表示归属非核心线程池)
                 */
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
     
                if (compareAndIncrementWorkerCount(c))  // 工作线程数加1
                    break retry;                        // 跳出最外层循环
     
                c = ctl.get();
                if (runStateOf(c) != rs)                // 线程池状态发生变化, 重新自旋判断
                    continue retry;
            }
        }
     
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);                  // 将任务包装成工作线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 重新检查线程池状态
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())               
                            throw new IllegalThreadStateException();
                        workers.add(w);                 // 加入工作线程集合
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)     // 创建/启动工作线程失败, 需要执行回滚操作
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    第一部分是一个自旋操作,主要是对线程池的状态进行一些判断,如果状态不适合接受新任务,或者工作线程数超出了限制,则直接返回false。

    这里需要注意的就是core参数,为true时表示新建的工作线程在逻辑上归属于核心线程池,所以需要判断条件 工作线程数 < corePoolSize 是否满足;core为false时表示在新增的工作线程逻辑上属于非核心线程池,所以需要判断条件 工作线程数 < maximumPoolSize是否满足。

    经过第一部分的过滤,第二部分才真正去创建工作线程并执行任务:
    首先将Runnable任务包装成一个Worker对象,然后加入到一个工作线程集合中(名为workers的HashSet),最后调用工作线程中的Thread对象的start方法执行任务,其实最终是委托到Worker的下面方法执行:

    /**
     * 执行任务
     */
    public void run() {
        runWorker(this);
    }
    
    工作线程的执行

    runWoker用于执行任务,整体流程如下:

    • while循环不断地通过getTask()方法从队列中获取任务(如果工作线程自身携带着任务,则执行携带的任务);
      线程池的线程在这里得以复用
    • 控制执行线程的中断状态,保证如果线程池正在停止,则线程必须是中断状态,否则线程必须不是中断状态;
    • 调用task.run()执行任务;
    • 处理工作线程的退出工作。
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();     // 执行任务的线程
        Runnable task = w.firstTask;            // 任务, 如果是null则从队列取任务
        w.firstTask = null;
        w.unlock();                             // 允许执行线程被中断
        boolean completedAbruptly = true;       // 表示是否因为中断而导致退出
        try {
            while (task != null || (task = getTask()) != null) {    // 当task==null时会通过getTask从队列取任务
                w.lock();
     
                /**
                 * 下面这个if判断的作用如下:
                 * 1.保证当线程池状态为STOP/TIDYING/TERMINATED时,当前执行任务的线程wt是中断状态(因为线程池处于上述任一状态时,均不能再执行新任务)
                 * 2.保证当线程池状态为RUNNING/SHUTDOWN时,当前执行任务的线程wt不是中断状态
                 */
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
     
                try {
                    beforeExecute(wt, task);            // 钩子方法,由子类自定义实现
                    Throwable thrown = null;
                    try {
                        task.run();                     // 执行任务
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);     // 钩子方法,由子类自定义实现
                    }
                } finally {
                    task = null;
                    w.completedTasks++;     // 完成任务数+1
                    w.unlock();
                }
            }
     
            // 执行到此处, 说明该工作线程自身既没有携带任务, 也没从任务队列中获取到任务
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);    // 处理工作线程的退出工作
        }
    
    }
    

    正常情况下,工作线程会存活着,不断从任务队列获取任务执行,如果获取不到任务了(getTask返回null),会置completedAbruptly 为false,然后执行清理工作——processWorkerExit(worker,false);
    异常情况下,工作线程在执行过程中被中断或出现其它异常,会置completedAbruptly 为true,也会执行清理工作——processWorkerExit(worker,true);

    工作线程的清理
    image.png

    processWorkerExit的作用就是将该退出的工作线程清理掉,然后看下线程池是否需要终止。

    任务的获取

    getTask方法的主要作用就是:通过自旋,不断地尝试从阻塞队列中获取一个任务,如果获取失败则返回null。

    拒绝策略

    ThreadPoolExecutor在以下两种情况下会执行拒绝策略:

    • 当核心线程池满了以后,如果任务队列也满了,首先判断非核心线程池有没满,没有满就创建一个工作线程(归属非核心线程池), 否则就会执行拒绝策略;
    • 提交任务时,ThreadPoolExecutor已经关闭了。

    1.AbortPolicy(默认)
    AbortPolicy策略其实就是抛出一个RejectedExecutionException异常:

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() {
        }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }
    

    2.DiscardPolicy
    DiscardPolicy策略其实就是无为而治,什么都不做,等任务自己被回收:

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() {
        }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    

    3.DiscardOldestPolicy
    DiscardOldestPolicy策略是丢弃任务队列中的最近一个任务,并执行当前任务:

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() {
        }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {      // 线程池未关闭(RUNNING)
                e.getQueue().poll();    // 丢弃任务队列中的最近任务
                e.execute(r);           // 执行当前任务
            }
        }
    }
    

    4.CallerRunsPolicy
    CallerRunsPolicy策略相当于以自身线程来执行任务,这样可以减缓新任务提交的速度。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() {
        }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {  // 线程池未关闭(RUNNING)
                r.run();            // 执行当前任务
            }
        }
    }
    
    image.png

    如果任务是 CPU 密集型(需要进行大量计算、处理),则应该配置尽量少的线程,比如 CPU 个数 + 1,这样可以避免出现每个线程都需要使用很长时间但是有太多线程争抢资源的情况;
    如果任务是 IO密集型(主要时间都在 I/O,CPU 空闲时间比较多),则应该配置多一些线程,比如 CPU 数的两倍,这样可以更高地压榨 CPU。

    相关文章

      网友评论

          本文标题:juc-executors 执行器框架

          本文链接:https://www.haomeiwen.com/subject/pxhzbktx.html