美文网首页源码分析
java线程池-ThreadPoolExecutor源码分析

java线程池-ThreadPoolExecutor源码分析

作者: millions_chan | 来源:发表于2017-03-16 00:07 被阅读1684次

线程是程序执行流的最小单元,合理的使用线程可以充分利用系统资源、提高吞吐率以及加快响应时间。然而创建线程的消耗很大,为了节约系统资源,方便管理和监控,我们通常会使用线程池。java 1.5中引入了线程池框架executors,给我们提供了一个开箱即用的线程池实现。

案例1


public class ExecutorCase {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 10; i++) {
            executorService.execute(new Task());
        }
    }

    private static class Task implements Runnable {

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(300);
                System.out.println(Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果为:

pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-1

上面的用例生成了一个大小为3的线程池,并提交了若干任务。查看newFixedThreadPool可见其代码如下,可见实际上它创建了一个ThreadPoolExecutor的实例。其实上,Executors的各种静态方法中,除了与ScheduledExecutorService有关的都是以不同的参数创建了一个ThreadPoolExecutor的实例。

  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

参数说明


ThreadPoolExecutor的构建函数为:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

每个参数的说明如下:

  • corePoolSize
    线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。这些线程将一直存活,除非设置了allowCoreThreadTimeOut

  • maximumPoolSize
    线程池中的最大线程数。当阻塞队列已满但客户端还继续向线程池提交任务时,若corePoolSize小于maximumPoolSize,则会新建线程加入线程池中

  • keepAliveTime
    线程池中的线程的存活时间

  • TimeUnit
    keepAliveTime的时间单位

  • workQueue
    某种blockingQueue的实现,用于在没有空闲线程时暂时存储需要处理的任务

  • **threadFactory **
    用于自定义线程的创建,比如给线程一些标识度比较高的名字。

  • **handler **
    线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须进行某种处理,默认的处理方式是抛出RejectedExecutionException异常。

了解了上面这些参数的含义,对Executors中的各种预定义线程池的特性应当就会比较了解了。

ThreadPoolExecutor的内部状态


其内部状态如下,用ctl的低29位存储线程数量,用高三位存储线程的运行状态,CAPACITY 头三位为0,其他位为1,可以方便的获取状态和线程数。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }

提交方式


有两种提交方式

  1. Executor.execute()

  2. ExecutorService.submit()

前者简单的提交一个线程,后者返回一个FutureTask,可以获取提交的计算结果。区别留待后续讨论。

execute代码


翻译一下代码中的注释:

  1. 如果线程数小于corePoolSize,则向线程池中添加一个新线程。
  2. 否则将其加入等待执行队列中,如果加入成功,则需要再检查一下是否需要添加新线程或是线程池状态已经发生了变化。
  3. 如果队列已满,则再尝试添加新的线程,不过调用参数core会发生改变,说明不是添加的“核心线程”
   /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);

addWorker的实现

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&  ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

这部分代码首先判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务(有一种特殊情况,就是状态处于SHUTDOWN 但是已提交了一个任务或是任务队列里还有待完成的队列),直接返回;然后通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,则跳出循环,开始创建新的线程:

boolean workerStarted   = false;
boolean workerAdded = false;
Worker  w       = null;
try {
    w = new Worker( firstTask );
    final Thread t = w.thread;
    if ( t != null )
    {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            /*
             * Recheck while holding lock.
             * Back out on ThreadFactory failure or if
             * shut down before lock acquired.
             */
            int rs = runStateOf( ctl.get() );

            if ( rs < SHUTDOWN ||
                 (rs == SHUTDOWN && firstTask == null) )
            {
                if ( t.isAlive() ) /* precheck that t is startable */
                    throw new IllegalThreadStateException();
                workers.add( w );
                int s = workers.size();
                if ( s > largestPoolSize )
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if ( workerAdded )
        {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if ( !workerStarted )
        addWorkerFailed( w );
}
return(workerStarted);

这里的核心逻辑就是生成一个新Worker,将其加入到workers中,并且启动与该worker相关的线程。

Worker简介

查看Worker类的源码,我们可以发现其继承了AbstractQueuedSynchronizer,可以方便的实现工作线程的中止操作;并实现了Runnable,可以将自身作为一个任务在工作线程中执行;

   /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

在构造方法中,先将state设置为-1,并且使用线程工厂创建线程。

   /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

在run方法中,将所有任务全部委托给了runWorker。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

首先,调用unlock将AQS标志设置为0表示开始接收中断,然后保证在线程池即将停止时确定线程被中断,而没有被停止时清除中断信号。注意,Thread.interrupted()会清除中断标志。而后,调用task的run方法,在完成了第一个任务后,会使用getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源:

getTask方法


 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

注意在getTask方法中,会根据allowCoreThreadTimeOut 以及线程数与corePoolSize的关系给timed赋值,循环的从workQueue中获取Runnable。注意当线程数大于最大线程数或timed为true且获取任务超时时,会直接返回null,从而使相关的Worker runTask的线程退出。

submit的实现

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

可以看出先将Callable包装成了FutureTask(实现了RunnableFuture)然后进行execute。其内部状态为:

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

其get方法为:

   public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

可以看到核心为调用awaitDone方法:

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

这段代码的逻辑如下:

  1. 如果主线程被中断,则抛出中断异常;
  2. 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;
  3. 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
  4. 通过WaitNode类封装当前线程WaitNode() { thread = Thread.currentThread(); },为了提高效率通过UNSAFE的CAS将自己添加到waiters链表;
  5. 最终通过LockSupport的park或parkNanos挂起线程;

FutureTask的run实现

  public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

这里首先查看状态必须为NEW,并且通过CAS将当前线程设置为Future的runner,然后调用Callable的call方法,并将结果或异常存储,使用Unsafe修改state的状态,并调用finishCompletion方法。

  private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

这里读取waiters队列(注意每次获取都使用了CAS),获取以后处理队列中所有节点。还记得在awaitDone中我们调用LockSupport.parkNanos(this, nanos)LockSupport.parkNanos(this)将线程挂起么?在finishCompletion方法中我们会调用LockSupport.unpark(t) 唤醒在等待的线程。

ScheduledThreadPoolExecutor


ScheduledThreadPoolExecutor使用ScheduledFutureTask作为自己的task,其run方法中在调用run后又重新设置了自己的状态,而后更新下次运行时间,将自己重新加入workQueue中。

       public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

在schedule方法中,组装了RunnableScheduledFuture,而后调用delayedExecute。

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

delayedExecute中,调用了ensurePrestart,从而使用addWorker添加了工作进程:

   private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

而ScheduledThreadPoolExecutor的workQueue是一个delayQueue,可以在指定时间后才获取到队列中的元素,这样就实现了定时反复运行。

相关文章

网友评论

    本文标题:java线程池-ThreadPoolExecutor源码分析

    本文链接:https://www.haomeiwen.com/subject/wbernttx.html