美文网首页
ThreadPoolExecutor

ThreadPoolExecutor

作者: NirvanalI | 来源:发表于2020-04-09 12:10 被阅读0次

    线程池解决的问题
    1.为执行大量数据提供改进的性能异步任务
    2.提供了一种限制和管理资源的方法, 也包括执行任务集合时消耗的线程
    3.每个ThreadPoolExecutor 还维护了一些基本的统计数据, 比如完成的任务的数量

    当提交新任务[execute(Runnable)]
    1.当前运行中的线程数少于 corePoolSize, 会创建新线程处理任务[即使其他工作线程是空闲状态]
    2.当前运行汇总的线程数大于 corePoolSize 且少于maximumPoolSize, 并且任务队列[BlockingQueue<Runnable> workQueue]满了, 才会创建新的线程
    3.设置corePoolSizemaximumPoolSize 相同, 则创建了一个固定大小的线程池
    4.设置 maximumPoolSize=Integer.MAX_VALUE, 则表示线程池容纳任意值并发任务的数量
    5.corePoolSizemaximumPoolSize是在构造方法中设置的, 但是也可以通过 setCorePoolSize ,setMaximumPoolSize 动态调整
    6.默认情况线程池核心线程创建和开始在有任务提交. 如果需要预先启动线程可以重写prestartCoreThread, prestartAllCoreThreads方法.
    7.线程通过 ThreadFactory[Executors#defaultThreadFactory]创建, 创建的线程在同一个线程组[ThreadGroup]、同一个优先级、非守护状态。通过提供一个自定义的ThreadFactory, 可以更改线程名、线程组、优先级、守护状态等等。
    8.如果通过ThreadFactory.newThread() 方法创建线程失败得到null,Executor会继续, 但是不会执行任何任务。

    保活时间
    1.如果池中当前线程超过 corePoolSize, 超出corePoolSize 的线程的空闲时间超过 keepAliveTime, 那么这些线程将会终结.
    2.通过设置keepAliveTime=Long.MAX_VALUE 取消空闲线程的关闭
    3.一般情况keepAliveTime 是控制超出corePoolSize 的线程保活时间, 如果设置allowCoreThreadTimeOut()keepAliveTime也会作用在corePoolSize

    任务队列[BlockingQueue]
    1.如果超过corePoolSize 线程在运行, 当有新任务请求, 线程池更倾向将任务加到queue
    2.如果当前运行的线程数超过 maximumPoolSize, 并且队列满了, 则新提交的任务将会被拒绝

    拒绝策略:
    当提交的线程数过多, 并且线程数达到 maximumPoolSize, 则会执行拒绝策略.通过调用RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)方法

    四种提供的拒绝策略:
    1.ThreadPoolExecutor.AbortPolicy: 抛出一个运行时异常 RejectedExecutionException[默认拒绝策略]
    2.ThreadPoolExecutor.CallerRunsPolicy: 用执行task的线程执行超出的任务
    3.ThreadPoolExecutor.DiscardPolicy: 饱和task不会被执行, 直接遗弃
    4.ThreadPoolExecutor.DiscardOldestPolicy: 将队列头的任务遗弃[最老的任务抛弃]

    查看ThreadPoolExecutor 源代码:
    构造方法[单纯的属性赋值]:
    其中默认 threadFactory = Executors.defaultThreadFactory(), handler=new AbortPolicy()

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    参数解析:

    // 11100000000000000000000000000000 [高3 位表示线程池状态, 低29 位表示worker数量]
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 536870911
    // 0001 1111 1111 1111 1111 1111 1111 1111
    // 允许的最大线程数, 即worker 数量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // runState is stored in the high-order bits
    // 线程池的5种状态: 
    // 1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 0
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 0010 0000 0000 0000 0000 0000 0000 0000
    private static final int STOP       =  1 << COUNT_BITS;
    // 0100 0000 0000 0000 0000 0000 0000 0000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 0110 0000 0000 0000 0000 0000 0000 0000
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    // Packing and unpacking ctl
    // 获取线程池状态, 按位与
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取worker 的数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 根据线程池状态和线程池worker 数量, 生成ctl 值
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    任务提交方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        // 工作中的线程小于corePoolSize 时, 直接创建worker 执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // worker 数量超过核心线程数, 任务直接进入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
    
            // 线程状态不是RUNNING, 说明执行过shutdown 命令, 需要对新加入的任务执行reject 
            // 这边为什么需要recheck, 因为任务入队列前后, 线程池的状态可能会发生变化
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 判断0值, 主要是在线程池构造方法中, 核心线程数允许为0
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 
        else if (!addWorker(command, false))
            reject(command);
    }
    

    addWorker() 方法: java.util.concurrent.ThreadPoolExecutor#addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 外层自旋
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            
            // Check if queue empty only if necessary.
            // (rs > SHUTDOWN) || 
            // (rs >= SHUTDOWN && firstTask != null) ||
            // (rs >= SHUTDOWN && workQueue.isEmpty())
            // 1. 线程池状态大于SHUTDOWN, 直接返回false
            // 2. 线程池状态 等于SHUTDOWN, 且firstTask 不为空, 直接返回false
            // 3. 线程池状态等于SHUTDOWN, 且队列为空, 直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            
            // 内层自旋
            for (;;) {
                int wc = workerCountOf(c);
                // worker 数量超过容量或超过指定线程容量, 直接返回
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 使用cas 方法增加worker 数量
                if (compareAndIncrementWorkerCount(c))
                    // 执行成功跳出外层自旋
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 线程池状态发生变化, 对外层循环进行自旋
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // worker 添加必须串行, 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 重新检查线程状态
                    int rs = runStateOf(ctl.get());
                    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 线程已经运行
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // worker 创建并添加worker
                        workers.add(w);
                        int s = workers.size();
                        // 更新 largestPoolSize 变量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动worker 线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // worker 启动失败, 说明线程池状态发生了变化, 关闭操作被执行。
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    runWorker(): java.util.concurrent.ThreadPoolExecutor#runWorker

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor

          本文链接:https://www.haomeiwen.com/subject/ijapphtx.html