美文网首页
线程池工作原理

线程池工作原理

作者: 行云流水_SuTong | 来源:发表于2019-11-08 15:44 被阅读0次

    原文地址 https://blog.csdn.net/fengluoye2012/article/details/87914330

    前言

    线程池大家都听说过或者使用过,线程池的工作原理:

    线程池在创建之后,线程池内没有一个线程,在添加任务之后;

    • 只有在运行的线程数小于核心线程数时,创建核心线程,执行任务;
    • 当达到最大核心线程数,但是队列未满的情况,则往队列中添加,等待有空闲的核心线程之后,从队列中取出任务执行;
    • 当队列也满了之后,并且在运行的线程未达到最大线程数,再创建普通线程(在任务完成一段时间内没有任务复用该线程,就会被销毁),执行线程;
    • 在执行的线程数也达到最大值后,再继续有新任务执行,则执行拒绝策略;

    一般情况下,核心线程在被创建之后,在线程池不被销毁的情况下,就一直存活,除非设置了 allowCoreThreadTimeOut=true(允许为核心线程设置存活时间),才会像普通线程一样在,一段时间内,没有被复用,就被销毁;普通线程(非核心线程)在执行完任务一定时间内,没有被复用,就会被销毁。

    线程池的优点:

    • 线程的创建和销毁由线程池维护,一个线程在完成任务后并不会立即销毁,而是由后续的任务复用这个线程,从而减少线程的创建和销毁,节约系统的开销;
    • 线程池旨在线程的复用,这就可以节约我们用以往的方式创建线程和销毁所消耗的时间,减少线程频繁调度的开销,从而节约系统资源,提高系统吞吐量;
    • 在执行大量异步任务时提高了性能;
    • Java 内置的一套 ExecutorService 线程池相关的 api,可以更方便的控制线程的最大并发数、线程的定时任务、单线程的顺序执行等;

    线程池的创建

    创建线程池的主要方法:1)Executors 的工厂方法创建线程池,本质上也是创建 ThreadPoolExecutor 对象;2)直接通过 ThreadPoolExecutor 类创建;

    Executors 的 newFixedThreadPool() 方法为例;

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    ThreadPoolExecutor 的构造函数;

    private final HashSet<Worker> workers = new HashSet<>();
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,
       TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
        //corePoolSize:核心线程数;
        //maximumPoolSize:线程池允许的最大线程数;
        //keepAliveTime:保持活动时间,空闲的线程(普通线程)在超过keepAliveTime时间内没有被复用,就被销毁;
        //unit:时间单位;
        //workQueue:任务队列,用来存储已经被提交,即将被执行的任务;
        //threadFactory:线程工厂,用来创建线程池中的线程;
        //handler:拒绝策略,线程池关闭,或者最大线程数和队列已经饱和的情况下,抛出RejectedExecutionException异常;   
    }
    

    ThreadPoolExecutor 类的变量及方法;参考:进制和位运算

    //原子类 ctlOf()返回值为RUNNING;
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;//COUNT_BITS = 29;
    //CAPACITY的二进制为:0001 0000 0000 0000  0000 0000 0000 0001;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//1*2^29-1
    
    //RUNNING的二进制为:1110 0000 0000 0000 0000 0000 0000 0000;
    private static final int RUNNING    = -1 << COUNT_BITS; //-1 * 2^29;
    private static final int SHUTDOWN   =  0 << COUNT_BITS; //0;
    private static final int STOP       =  1 << COUNT_BITS; // 1*2^29;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    //~CAPACITY的二进制:1110 1111 1111 1111 1111 1111 1111 1110;
    //RUNNING&CAPACITY之后的值:0000 0000 0000 0000 0000 0000 0000 0000;即转换为十进制为0;
    //RUNNING & ~CAPACITY之后的值:1110 0000 0000 0000 0000 0000 00000 0000;即转换为十进制为负数;
    
    //由于~CAPACITY 为负数,c是负数则返回值就是负数;
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //只要c的值比RUNNING大,返回值就大于0;
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //只要wc的值为0,返回值就是rs;
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    任务的执行过程

    execute() 方法,是线程池最常用的方法,往线程池中添加任务:经过分析可知,线程池的执行原理;

    • 只有在运行的线程数小于核心线程数时,在 addWorker(command, true) 方法中创建 worker 对象,其内部创建线程 thread(核心线程);并添加到 workers 中,执行线程;
    • 当达到最大核心线程数,但是队列未满的情况,则往队列 workQueue 中添加,等待有空闲的主线程,从队列中取出任务执行;
    • 当队列也满了之后,并且在运行的线程未达到最大线程数,再通过 addWorker(command, false) 方法中创建 worker 对象,其内部创建线程 thread(普通线程,在任务完成一段时间内没有任务复用该线程,则销毁);并添加到 workers 中,执行线程;
    • 在执行的线程数也达到最大值后,在继续有新任务执行,则执行拒绝策略;
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //ctl.get()获取的值为初始化时设置的value,即为RUNNING=-1 * 2^29;
        int c = ctl.get();
        //workerCountOf()的返回值为0;由位运算&计算获得;
        //执行的任务数小于核心线程数,添加到workers中;
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
        //达到最大核心线程数,向workQueue队列中插入任务;offer()当队列已满,插入失败;
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //线程池被终止,并且从workQueue中删除;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //只有当recheck的值为RUNNING时,workerCountOf()的返回值为0;
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //队列已经满了的情况下;
        else if (!addWorker(command, false))
            reject(command);
    }
    

    java retry 的使用详解可知 break retry 跳出外层循环;continue retry 跳出内层循环;

    ThreadPollExecutor 的内部类 Worker 是一个 Runnable 对象,主要是通过 getThreadFactory().newThread() 方法创建线程;

    addWorker():向 HashSet 对象 workers 添加 Worker 对象 w,并且执行 Worker 的 thread 的 start() 方法,进而执行 Worker 的 run() 方法;核心线程在默认情况下,没有任务需要执行的情况下,getTask() 会一直被阻塞,无返回值;普通线程在没有任务的情况下,getTask() 在一段时间之后,返回 null,结束循环;

    //core:true 表示创建核心线程;firstTask:可能为Null;
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //c的初始值为即为RUNNING=-1 * 2^29;每调用compareAndIncrementWorkerCount(),c的值加1;
            int c = ctl.get();
            //c 为负数,返回值rs为负数;
            int rs = runStateOf(c);
            
            //SHUTDOWN为0;
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;
    
            for (;;) {
                //正在执行任务的线程数;只要c的值比RUNNING大,wc就大于0;wc的返回值就是c-RUNNING的差;
                int wc = workerCountOf(c);
                //判断正在同时运行的线程数是否达到限制值;
                if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //原子整形变量c+1,原子整形更新成功后,跳出外层循环;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                //跳出内循环;
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建Worker对象w,Worker会创建线程;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//加锁
                try {
                    //ctl.get()的值为负数,则rs为负数;
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        //向HashSet对象workers中添加Worker对象w;
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //将标识位改为true;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //调用Worker中的Thread的start();进而执行Worker的run()方法;核心线程在默认情况下,
                //没有任务需要执行的情况下,getTask()会一直被阻塞,无返回值;
                //普通线程在没有任务的情况下,getTask()在一段时间之后,返回null,结束循环;
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        }
        return workerStarted;
    }
    

    Worker 类的 run() 方法:执行 run() 进而调用 runWorker();

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
      
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //通过ThreadFactory的newThread()方法创建thread();
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }
    }
    

    runWorker():执行线程池 execute() 方法传入任务或者从 getTask() 方法中获取的任务的 run() 方法;

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            //worker不仅执行自己对象内保存的任务,同时还不断的从wokeQueue中取出的任务;
            //getTash()方法不断从workQueue中取出任务;
            while (task != null || (task = getTask()) != null) {
                w.lock();
                
                try {
                    beforeExecute(wt, task);
                    try {
                        //执行对应任务的run()方法;
                        task.run();
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    //任务执行完之后,就将task制null,继续循环;
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    核心线程的秘密

    getTask() :workQueue 队列不为空时,每次从 workQueue 中取出需要执行的任务,来执行;

    LinkedBlockingQueue 源码分析可知 LinkedBlockingQueue 的 poll():从队尾删除,队列为空,阻塞一段时间后,直接返回 null;不为空,返回删除的节点的值;take():如果队列为空,线程阻塞;不为空,直接返回队列头结点对应的值。

    timed:由 allowCoreThreadTimeOut(允许核心线程被销毁,默认为 false)和 corePoolSize 来决定,判断是调用 poll() 还是 take() 方法。

    • 默认情况下,核心线程调用 take() 方法,在队列为空的情况下,会阻塞线程,没有返回值,所以核心线程会一直在运行,不会被销毁;
    • 普通线程调用 poll() 方法,在队列为空的情况下,在一段时间后,会返回 null, 跳出 runWorker() 的循环,线程会自动被销毁。

    所以,上述内容,就是线程池的核心线程不被回收(核心线程保活)的原因

    private Runnable getTask() {
        //上一次从队列中通过poll()方法删除元素,是否超时;
        boolean timedOut = false; 
    
        for (;;) {
            //获取ctl存储的值;
            int c = ctl.get();
            //由于~CAPACITY 为负数,c是负数则返回值就是负数;
            int rs = runStateOf(c);
    
            //需要销毁正在运行的线程,返回null;
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            
            //获取正在运行的的线程数;
            int wc = workerCountOf(c);
    
            //当allowCoreThreadTimeOut为true表示核心线程设置保活事件,
            //或者在执行的线程数超过核心线程数,timed为true;
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //需要销毁正在运行的线程,返回null;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //c-1,并且更新c的值;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                //当队列workQueue不为空,会直接返回对应的元素;为空时,poll()会阻塞一段时间,返回null;
                //take()方法会一直阻塞,直到队列中有新的任务添加,返回对应的任务;
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //如果任务r不为null,则返回,否则继续从队列中取出任务,timedOut = true;
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    线程池优化

    根据根据 Android-26 的 AsyncTask 对线程池的设置可知:

    //CPU的数量;
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    //核心线程池的数量在2-4的范围内;
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    //最大线程数量;
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    //线程保活事件30秒;
    private static final int KEEP_ALIVE_SECONDS = 30;
    //threadFactory创建线程;
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);
        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };
    //队列;
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
    
    public static final Executor THREAD_POOL_EXECUTOR;
    
    //线程的初始化
    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
        //允许为核心线程设置存活时间;核心线程在一段时间内没有被复用,也会被销毁;
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    

    以上就是线程池的主要内容,如有不足之处,请多指点,谢谢!

    相关文章

      网友评论

          本文标题:线程池工作原理

          本文链接:https://www.haomeiwen.com/subject/ptqwbctx.html