美文网首页
Java线程池简介

Java线程池简介

作者: Cris_Ma | 来源:发表于2019-05-27 15:10 被阅读0次

    ThreadPoolExecutor解析

    Java里线程池的基本接口是 Executor:

    public interface Executor {
            void execute(Runnable command);
    }
    

    实现线程池的类是ThreadPoolExecutor,最主要的构造方法如下:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    

    参数解析:

    • corePoolSize 核心线程数
    • maximumPoolSize 最大线程数
    • keepAliveTime 线程空闲以后存活时间。通常在线程数大于核心线程数时才生效,直到存活线程数等于核心线程数
    • unit 时间单位
    • workQueue 存储任务的阻塞队列
    • threadFactory 用来创建线程的线程工厂
    • handler 拒绝任务时的策略,通常有以下几种取值:
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
    

    成员变量:

        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    

    ctl是原子的整数值,它用来记录线程池的状态 和 当前线程池中线程 数量,初始值为RUNNING状态,线程数为0

    private static final int COUNT_BITS = Integer.SIZE - 3;
    

    Integer.SIZE表示int类型数据字节数(32),COUNT_BITS表示线程数量占据的位数(29)

        //线程池最大容量(线程数) 00011111 11111111 11111111 11111111
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    //线程池状态
        // runState is stored in the high-order bits
        //接受新任务并且处理阻塞队列里的任务
        private static final int RUNNING    = -1 << COUNT_BITS;
        //拒绝新任务但是处理阻塞队列里的任务
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        //拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
        private static final int STOP       =  1 << COUNT_BITS;
        //所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
        private static final int TIDYING    =  2 << COUNT_BITS;
        //终止状态。terminated方法调用完成以后的状态
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    可以看到,表示状态的数据全部左移29位,存储在int数值的前三位,然后通过ctlOf方法,将状态和线程数两个值位或操作结合起来,这样就得到了ctl值,也就是说上面的ctl后29位用来存储线程数,前3位存储线程池状态。

    执行过程:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            //获取当前线程池的状态
            int c = ctl.get();
    
            //判断线程数是否小于核心线程数
            if (workerCountOf(c) < corePoolSize) {
                //尚未达到核心线程数,直接添加线程执行任务
                if (addWorker(command, true))
                    return;
                //如果添加线程失败,重新获取线程池状态
                c = ctl.get();
            }
            //已经达到核心线程数,或者添加线程失败,继续执行:
            //判断线程池是否处于Running状态,如果是,添加任务到队列中
    
            if (isRunning(c) && workQueue.offer(command)) {
                //再次检查线程池状态
                int recheck = ctl.get();
                
                if (! isRunning(recheck) && remove(command))
                //如果不是处于Running状态,就移除任务,移除成功以后执行拒绝策略
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    //如果是处于非Running状态,但是任务移除失败,或者处于Running状态,但是线程数为0,新建线程
                    addWorker(null, false);
            }
            //不是Running状态,或者任务添加失败了(队列已满),进入addWorker,执行command
            else if (!addWorker(command, false))
            //添加线程失败则拒绝任务
                reject(command);
        }
    
        private static int workerCountOf(int c)  { return c & CAPACITY; }
    

    addWorker方法接受两个参数,command为待执行的任务对象,coretrue表示添加核心线程,false表示不需要使用核心线程。

    execute执行流程如下:

    1. 如果commad为空,直接抛出异常

    2. 获取线程池状态,如果线程数小于核心线程数,直接调用addWorker方法,创建核心线程来执行任务

    3. 核心线程创建成功,则直接返回。如果核心线程创建失败,说明当前线程池核心线程已经满了或者线程池被关闭了等异常情况,任务没有办法执行,那么重新获取线程池状态,执行下一步。

    4. 再次判断线程池是否处于Running状态。如果是,添加command到任务队列中,执行下面的步骤5;如果不是Running状态或者添加command失败,跳到步骤9

    5. 任务添加成功以后,等待线程来执行它,此时会再次检查状态,进入下面6,7,8三个步骤

    6. 如果此时线程池不是Running状态了,把刚刚添加的任务移除掉,执行拒绝策略,execute流程结束,任务执行失败。

    7. 如果线程池仍然是Running状态,或者线程池不是Running状态,但是移除任务失败(Shutdown时,不接受新任务,但是该任务还没有执行完,移除不掉),此时仍然有任务需要执行,但是池内的线程数为0,则调用addWorker方法添加一个null对象,创建一个非核心线程来执行任务,execute流程结束,任务会被执行,但是线程池可能会在任务执行完毕之后结束。

    8. 如果线程池仍然是Running状态,而且线程池内的线程数也不是0,那么execute流程到此结束,任务添加到workQueue中,线程池状态正常。

    9. 线程池处于非Running状态,或者任务添加失败(比如队列已满),调用addWorker方法,通过非核心线程来处理command

    addWorker的执行步骤:

    private boolean addWorker(Runnable firstTask, boolean core) {
    
        //该循环用来更新ctl的值(自增),addWorker可能会被多线程同时调用,所以更新ctl可能会失败,如果失败则重新读取,继续更新ctl值,更新成功则跳出循环,开始创建线程
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
    //检查线程池状态,分为几种情况:
    //1.当前状态大于Shutdown,直接返回false,说明线程池已经被关闭了
    //2.当前状态等于Shutdown,说明线程池正在关闭中,此时来判断传入的commad是否为null,并且! workQueue.isEmpty(),如果三项有一项不满足,直接返回false。这种情况对应的是execute中的addWorker(null, false);这一句指令,线程在关闭过程中,但是还有任务在等待执行的情况。
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    //到这里说明线程池状态更新成功了,开始创建线程执行任务
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //创建一个持有当前任务的worker
                w = new Worker(firstTask);
                //Worker里的线程是通过制定的 ThreadFactory 来创建的
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    //此处对workers对象进行操作,并发访问的时候必须加锁,保证不会重复添加线程
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    //线程添加成功,开始执行任务
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
                
            } finally {
                //最终会走到这里,判断任务有没有成功执行,未成功的话说明线程没有成功运行起来,回滚ctl的状态。
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    该方法主要分为两个步骤,第一步是对ctl状态更新,第二步是状态更新完成以后,创建线程并执行任务。

    第一步主要为两个循环,外层的循环用来获取线程池状态,并判断是否满足创建线程的条件,如果满足,进入内部的循环,尝试对ctl进行自增操作(线程数+1)

    考虑到并发问题,自增可能会失败,失败以后,再一次自增之前,线程池的状态可能发生变化,所以需要再次获取线程池状态,如果没有变化再次尝试自增,如果已经变了,回到外部的循环,重新判断是否满足自增条件。

    满足自增条件有几下几种情况:

    1. 线程池状态 > shutdown: 说明线程池已经被关闭了,直接返回false,不再创建线程

    2. 线程池 = shutdown,此时再判断 firstTask == null && !workQueue.isEmpty(),其实对应的是execute方法中的addWorker(null, false);语句,说明线程池正在关闭,但是还有未执行的任务,此时也需要创建线程。传入的command不是null,或者任务队列为空,也会直接返回false,不再创建线程。

    确定需要创建线程并且自增操作成功,线程成状态更新完成以后,开始真正的线程创建和任务执行工作

    需要注意的是workers变量,它存储的对象为Worker,实际上是对任务和线程的包装,workers是一个HashSet,是线程不安全的,对它的操作需要加锁。

    第二步线程创建和执行的步骤如下:

    1. 新建包含当前任务的Worker对象,获取到该对象包含的线程(在Worker的构造函数中,通过指定的线程工厂创建,创建对象时已经生成了)。

    2. 如果线程t不为空,说明创建线程成功,开始获取锁,检查线程池和线程状态,更新workers变量最后执行t.start开始执行任务

    关键的t.start方法是怎样执行任务的,还要看接下来的Worker对象:

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
             /** Thread this worker is running in.  Null if factory fails. */
             final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
    
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            public void run() {
                runWorker(this);
            }
        }
    

    上边的源码只列出了几个关键部分,可以看到Worker继承了Runnable,并重写了run方法,而构造函数中创建线程时传入的对象就是Worker本身,所以t.start方法首先执行的是Workerrun方法,run方法里只有一句runWorker(this),实际上最终执行任务是通过runWorker来执行的。

    下面是runWorker的源码:

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    
         private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    简单来说,runWorker的作用如下:

    1. 如果是新创建的worker,第一次启动会执行当前worker内的任务,执行完之后会依次从workQueue中取出任务执行。

    2. 如果workQueue为空,那么等待keepAliveTime时间,workQueue仍然为空,结束循环,线程也就结束了。

    Executors类

    Executors是Java线程池的工具类,它内部实现了4中常用的线程池:

    newCachedThreadPool

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    可缓存线程池:
    没有核心线程,可以创建无上限的普通线程,如果某个线程超过60s没有任务,结束该线程

    任务队列为SynchronousQueue,它内部不存储数据,前一个数据被取走之后,后一个才能存进来。也就是说,execute方法执行的时候,如果command添加到队列失败,说明SynchronousQueue中的任务没有被取走,直接新开一个线程运行任务,如果添加成功,任务会在队列中等待空闲进程来取走它。

    newFixedThreadPool

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    固定长度线程池:

    核心线程和最大线程数都是指定数值,不设置超时(设置也没有意义,核心线程不会退出,除非设置了allowCoreThreadTimeOut),任务大于线程数时放在LinkedBlockingQueue中排队。

    newScheduledThreadPool

    
       public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue());
        }
    

    可指定核心线程数,最大线程数无上限,有默认的超时时间,可以执行周期性的任务,它执行任务主要调用的是schedule方法:

    • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

    • public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

    • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

    • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

    newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
    

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行.

    相关文章

      网友评论

          本文标题:Java线程池简介

          本文链接:https://www.haomeiwen.com/subject/zzshtctx.html