美文网首页
线程池原理

线程池原理

作者: gczxbb | 来源:发表于2018-05-08 17:04 被阅读9次

    线程池,关注如何缩短频繁线程创建和销毁的时间,通过线程复用技术,减少非核心任务的时间损耗(创建和销毁的时间),提高程序性能。它的主要原理是采用阻塞任务队列实现线程复用的方案。ThreadPoolExecutor是线程池的具体实现类,它的继承关系图。

    ThreadPoolExecutor继承关系图.jpg

    线程池类型

    源码提供了三种类型的线程池,通过不同的参数设置,缓存线程池、单线程线程池、固定数量线程的线程池。ThreadPoolExecutor的构造方法。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    这几个参数决定线程池类型,核心参数。
    corePoolSize,核心线程数。
    maximumPoolSize,线程池的允许的最大线程,超过会报异常。
    keepAliveTime,非核心线程允许存留时间,(保持活跃性的时间)。
    TimeUnit,时间度量参数。
    BlockingQueue<Runnable>,任务队列,可以使配置成无限或有限队列或栈。
    ThreadFactory,线程工厂,创建线程优先级以及统计线程数量等。

    线程池的对任务的控制流程

    • 线程数量<corePoolSize,新建线程,处理任务。
    • 线程数量>=corePoolSize,将任务放入workQueue队列,若有核心线程空闲,从workQueue队列取任务处理。
    • 当任务队列有限且已满,再次新建线程处理任务,这时,要保证总量不超过最大允许值,否则,导致RejectedExecutionHandler异常。
    • 再次新建的线程是非核心线程,空闲时最大存留keepAliveTime时间。

    三种线程池是Executors的静态方法创建。

    缓存线程池
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }
    

    核心线程数是0,允许的最大线程无限,任务队列SynchronousQueue不做具体存储,线程活跃时间60秒。
    每个任务先到SynchronousQueue队列,它其实一个管道,不保存,若不存在空闲线程则新建,新建线程数量永远不会超过允许的最大值。
    若一开始并发任务较多,会创建不少线程,每个线程任务完成后,变空闲线程,空闲时间未达到60s,可重用空闲线程处理新进任务,线程最大数量不限。即可以保障任务繁重时,空闲线程可复用,又能办证在没有任务时,保持一定时间后消失。

    单线程线程池
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    }
    

    核心线程是1,允许的最大线程是1,任务队列LinkedBlockingQueue,不设空闲时间,无界队列。
    只有一个核心线程处理任务,新任务入队列。若队列是有界的,多并发任务状态下队列总有满的时候,若队列满了就得新建临时线程,肯定会超允许的最大线程,报异常,因此,队列必须无界。

    固定数量线程的线程池

    Executors#newFixedThreadPool方法创建固定数量线程的线程池。

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory 
                        threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
    }
    

    核心线程与最大线程数量自己配置,任务队列LinkedBlockingQueue,不设空闲时间,无界队列。
    任务队列无界,因此,不会创建临时工线程,只有核心线程工作,keepAlieveTime无需设置。

    综上所述

    允许的最大线程数量与队列必须保证一个是无界的,否则,在高并发条件下导致异常。根据特定的业务场景灵活配置。


    工作原理

    当线程池ThreadPoolExecutor创建后,我们一般调用它的execute方法,向线程池派发任务,将任务交给线程池,不需要自己再管理,会自动执行。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//小于核心线程
            if (addWorker(command, true))//创建核心线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//加入队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//队列满了,创建非核心临时线程
            reject(command);
    }
    

    首先,workerCountOf(c)方法,返回工作线程数量,若小于核心线程数量,调用addWorker方法,创建核心线程(设标志位参数),该任务将由新建的核心线程处理。
    然后,若工作线程数量>=核心线程,isRunning(c)方法,表示c<0,将任务加入队列,offer方法,加入后返回成功标志。
    因为offer是非阻塞方法,也就是说,如果队列已满,将直接返回失败,这时,将调用addWorker方法,创建非核心线程(不设标志位参数)。该任务将由新建的非核心线程处理。

    以下是上面代码用到的一些变量与方法。
    //AtomicInteger确保高并发整型值自增时线程安全。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//536870911
    
    private static final int RUNNING    = -1 << COUNT_BITS;//-536870912
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
    private static final int STOP       =  1 << COUNT_BITS;//536870912
    private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
    private static final int TERMINATED =  3 << COUNT_BITS;//1610612736
    
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    ctl的初始值与RUNNING相同,值是-536870912,即11100000 00000000 00000000 00000000。
    -1存储的补码是1111...(32个1),向左移动29位,即右边加29个0。
    
    workerCountOf方法将c与CAPACITY进行与操作。
    runStateOf方法将c与CAPACITY的取反进行与操作。
    
    CAPACITY是536870911,即00011111 11111111 11111111 11111111。
    CAPACITY的取反是11100000 00000000 00000000 00000000。
    

    workerCountOf方法,第一次调用时,ctl初始值与CAPACITY与操作结果是0,若ctl不断自增,与CAPACITY操作的值不断自增1,工作线程初始是0,每增加一个工作线程,ctl自增一次。
    runStateOf方法,第一次调用时,ctl初始值与反CAPACITY与操作结果为反CAPACITY。
    总之,workerCountOf方法返回工作线程数量,runStateOf方法返回ctl的初始值-536870912。
    五个运行状态,RUNNING状态<0,isRunning方法,判断c是否<0,每次新建线程后c值自增1,c初始值-536870912,一般情况下,不足以使达到c>=0,因此,isRunning(c)方法一般返回true。

    下面分析一下addWorker创建线程方法。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            ...
            for (;;) {
                int wc = workerCountOf(c);//工作线程数量
                if (wc >= CAPACITY ||// 已经等于最大值。
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建线程,绑定任务。
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//上锁
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        workers.add(w);//任务放入集合
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true; //后续根据此标志启动线程。
                    }
                } finally {
                    mainLock.unlock();//解锁
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    两层死循环,若工作线程>CAPACITY容量,或>=允许的最大值(创建核心线程>=核心线程数量),返回失败。自增ctl,跳出循环。创建一个Worker对象,构造方法。

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    

    它是Runnable类型,封装派发给线程池的Runnable任务,ThreadFactory的newThread方法,创建一个新线程,将Worker类作为任务主体。该类的结构关系图。

    线程池的Worker任务关系图.jpg 我们再回到addWorker方法,runStateOf(ctl.get())<0时(一般<0),将Worker任务放入HashSet集合,设置workerAdded标志。然后,根据该标志,调用Thread的start方法,启动线程,设置启动标志workerStarted,表示线程已启动执行任务。最后,返回启动标志。
    新线程执行Worker任务,触发Worker#run方法。该方法调用runWorker方法。它是外部类ThreadPoolExecutor的方法,入参就是该Worker。
    final void runWorker(Worker w) {//处理Worker内的派发任务,在循环中进一步访问任务队列。
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                ....
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    该方法中,新建线程借助Worker任务,开始为线程池工作,内部Runnable是用户需要完成的任务。在while循环中,第一个优先处理execute派送,Worker内部的线程池任务,完成后,线程也不会结束,而是getTask方法,继续从任务队列中获取务,如果getTask返回空,结束线程,否则,继续执行,该方法可能会阻塞。

    private Runnable getTask() {
        boolean timedOut = false; 
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            ...//队列空时,返回null。
            int wc = workerCountOf(c);
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    如果工作线程数量wc>核心线程,设置timed标志。从队列取任务采用阻塞等待一定时间的poll方式,等待时间设置线程存活时间keepAliveTime,因此,即使队列没有任务,线程仍然存活,(保证任务来到可立即开始工作)。
    如果当工作线程数量wc<核心线程,采用一直阻塞的take方案,队列是空时线程一直阻塞,核心线程不会死亡。
    如果队列是空,poll时间已到,设置timeOut超时标志,进入下次循环,这时,如果再发生工作线程数量wc>核心线程,会使得timed和timedOut标志同时存在,此时,工作线程数量自减,返回空,退出while循环,线程结束。如果工作线程数量wc<核心线程(仅有核心线程),工作线程不会自减,for循环继续,阻塞在take查询任务。
    如果设置核心线程TimeOut,也会采用poll方式,存活时间一到,队列无任务,即使wc数量<核心线程,线程也会退出,允许核心线程死亡。
    每一个新建的线程在该方法的执行逻辑是相同。根据当前线程数量和超时标志决定从任务队列的获取方法是否阻塞。


    总结

    线程池的本质,每一个线程在完成派发任务后,并未结束,继续访问任务队列。根据当前线程数量,利用任务队列的阻塞特性,实现线程的存留时间。通过留存工作线程消费,不新建线程,实现任务的线程复用。


    任重而道远

    相关文章

      网友评论

          本文标题:线程池原理

          本文链接:https://www.haomeiwen.com/subject/ogkvrftx.html