美文网首页并发
01 java线程池-初始线程池-ThreadPoolExecu

01 java线程池-初始线程池-ThreadPoolExecu

作者: 花神子 | 来源:发表于2019-05-20 16:06 被阅读76次

    java线程池

    为什么要用线程池?

    1. 降低系统资源消耗。

    2. 提高线程可控性。

    如何创建使用线程池?

    JDK8提供了五种创建线程池的方法:

      1. 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
      1. (JDK8新增)会根据所需的并发数来动态创建和关闭线程。能够合理的使用CPU进行对任务进行并发操作,所以适合使用在很耗时的任务。注意返回的是ForkJoinPool对象。
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    

    什么是ForkJoinPool:

    使用一个无限队列来保存需要执行的任务,可以传入线程的数量;不传入,则默认使用当前计算机中可用的cpu数量;使用分治法来解决问题,使用fork()和join()来进行调用。
    
    public ForkJoinPool(int parallelism,
                            ForkJoinWorkerThreadFactory factory,
                            UncaughtExceptionHandler handler,
                            boolean asyncMode) {
            this(checkParallelism(parallelism),
                 checkFactory(factory),
                 handler,
                 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
                 "ForkJoinPool-" + nextPoolId() + "-worker-");
            checkPermission();
        }
    
      1. 创建一个可缓存的线程池,可灵活回收空闲线程,若无可回收,则新建线程.
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    • 4.创建一个单线程的线程池。
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    • 5.创建一个定长线程池,支持定时及周期性任务执行。
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    

    Executor 源码结构

    executor-architect.png

    1. Executor

    public interface Executor {
        void execute(Runnable command);
    }
    

    2. ExecutorService

    扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法

    ExecutorService.png

    3. AbstractExecutorService

    对ExecutorService接口的抽象类实现。

    ThreadPoolExecutor源码分析

    线程池状态的转换模型:
    ThreadPoolStatus.png

    注意的是状态值越大线程越不活跃

    // AtomicInteger是原子类  ctlOf()返回值为RUNNING;
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 高3位表示线程状态
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 低29位表示workerCount容量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // runState is stored in the high-order bits
    // 能接收任务且能处理阻塞队列中的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 不能接收新任务,但可以处理队列中的任务。
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 不接收新任务,不处理队列任务。
    private static final int STOP       =  1 << COUNT_BITS;
    // 所有任务都终止
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 什么都不做
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    // 存放任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    

    构造器

    public ThreadPoolExecutor(int corePoolSize,//线程池初始启动时线程的数量
                              int maximumPoolSize,//最大线程数量
                              long keepAliveTime,//空闲线程多久关闭?
                              TimeUnit unit,// 计时单位
                              BlockingQueue<Runnable> workQueue,//放任务的阻塞队列
                              ThreadFactory threadFactory,//线程工厂
                              RejectedExecutionHandler handler// 拒绝策略) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    向线程池提交任务时,会通过两个方法:execute和submit。

    execute方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // clt记录着runState和workerCount
        int c = ctl.get();
        //workerCountOf方法取出低29位的值,表示当前活动的线程数
        //然后拿线程数和 核心线程数做比较
        if (workerCountOf(c) < corePoolSize) {
            // 如果活动线程数<核心线程数
            // 添加到addWorker中
        //第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断
            if (addWorker(command, true))
                // 如果成功则返回
                return;
            // 如果失败则重新获取 runState和 workerCount
            c = ctl.get();
        }
        // 如果当前线程池是运行状态并且任务添加到队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            // 重新获取 runState和 workerCount
            int recheck = ctl.get();
            // 如果不是运行状态并且 
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                //第一个参数为null,表示在线程池中创建一个线程,但不去启动
                // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
                addWorker(null, false);
        }
        //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize
        else if (!addWorker(command, false))
            //如果失败则拒绝该任务
            reject(command);
    }
    

    总结如下:

      1. workerCount < corePoolSize,创建线程执行任务。
      1. workerCount > corePoolSize&&阻塞队列workQueue未满,把新的任务放入阻塞队列。
      1. workQueue已满,并且workerCount >= corePoolSize,并且workerCount < maximumPoolSize,创建线程执行任务。
      1. 当workQueue已满,workerCount >= maximumPoolSize,采取拒绝策略,默认拒绝策略是直接抛异常。
    execute-flow.png

    addWorker方法

    主要工作是在线程池中创建一个新的线程并执行

    • firstTask the task the new thread should run first (or null if none). (指定新增线程执行的第一个任务或者不执行任务)

    • core if true use corePoolSize as bound, else maximumPoolSize.(core如果为true则使用corePoolSize绑定,否则为maximumPoolSize。 (此处使用布尔指示符而不是值,以确保在检查其他状态后读取新值)。)

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
    
            int c = ctl.get();
            //  获取运行状态
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            // 如果状态值 >= SHUTDOWN (不接新任务&不处理队列任务)
            // 并且 如果 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                // 返回false
                return false;
    
            for (;;) {
                //获取线程数wc
                int wc = workerCountOf(c);
                // 如果wc大与容量 || core如果为true表示根据corePoolSize来比较,否则为maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 增加workerCount(原子操作)
                if (compareAndIncrementWorkerCount(c))
                    // 如果增加成功,则跳出
                    break retry;
                // wc增加失败,则再次获取runState
                c = ctl.get();  // Re-read ctl
                // 如果当前的运行状态不等于rs,说明状态已被改变,返回重新执行
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 根据firstTask来创建Worker对象
            w = new Worker(firstTask);
            // 根据worker创建一个线程
            final Thread t = w.thread;
            if (t != null) {
                // new一个锁
                final ReentrantLock mainLock = this.mainLock;
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 获取runState
                    int rs = runStateOf(ctl.get());
                    // 如果rs小于SHUTDOWN(处于运行)或者(rs=SHUTDOWN && firstTask == null)
                    // firstTask == null证明只新建线程而不执行任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果t活着就抛异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 否则加入worker(HashSet)
                        //workers包含池中的所有工作线程。仅在持有mainLock时访问。
                        workers.add(w);
                        // 获取工作线程数量
                        int s = workers.size();
                        //largestPoolSize记录着线程池中出现过的最大线程数量
                        if (s > largestPoolSize)
                            // 如果 s比它还要大,则将s赋值给它
                            largestPoolSize = s;
                        // worker的添加工作状态改为true    
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果worker的添加工作完成
                if (workerAdded) {
                    // 启动线程
                    t.start();
                    // 修改线程启动状态
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回线启动状态
        return workerStarted;
    
    为什么需要持有mainLock?

    因为workers是HashSet类型的,不能保证线程安全。

    w = new Worker(firstTask); 如何理解呢?
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    

    可以看到它继承了AQS并发框架还实现了Runnable。证明它还是一个线程任务类。那我们调用t.start()事实上就是调用了该类重写的run方法。

    Worker为什么不使用ReentrantLock来实现呢?

    tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。

    线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。

    public void run() {
        runWorker(this);
    }
    
    final void runWorker(Worker w) {
        // 拿到当前线程
        Thread wt = Thread.currentThread();
        // 拿到当前任务
        Runnable task = w.firstTask;
        // 将Worker.firstTask置空 并且释放锁
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果task或者getTask不为空,则一直循环
            while (task != null || (task = getTask()) != null) {
                // 加锁
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //  return ctl.get() >= stop 
                // 如果线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断
                // 其实就是保证两点:
                // 1. 线程池没有停止
                // 2. 保证线程没有中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 中断当前线程
                    wt.interrupt();
                try {
                    // 空方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行run方法(Runable对象)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 执行完后, 将task置空, 完成任务++, 释放锁
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 退出工作
            processWorkerExit(w, completedAbruptly);
        }
    
    在runWorker方法中,为什么要在执行任务的时候对每个工作线程都加锁呢?

    shutdown方法与getTask方法存在竞态条件.(这里不做深入,建议自己深入研究,对它比较熟悉的面试官一般会问)

    总结一下runWorker方法的执行过程:

      1. while循环中,不断地通过getTask()方法从workerQueue中获取任务
      1. 如果线程池正在停止,则中断线程。否则调用3.
      1. 调用task.run()执行任务;
      1. 如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);
    流程图:
    runWorker-flow.png

    相关文章

      网友评论

        本文标题:01 java线程池-初始线程池-ThreadPoolExecu

        本文链接:https://www.haomeiwen.com/subject/xpikzqtx.html