java线程池实现分析

作者: msrpp | 来源:发表于2018-08-31 17:58 被阅读6次

线程池可以降低系统线程的重复创建销毁频率,给用户提供了很方便的多任务执行方式。

ThreadPoolExecutor

java中线程池的核心实现是ThreadPoolExecutor。
ThreadPoolExecutor的继承关系如下

image.png

ThreadPoolExecutor的构造函数有很多,但是最后都是调用到了这个。

public ThreadPoolExecutor(int corePoolSize,                
                          int maximumPoolSize,             
                          long keepAliveTime,              
                          TimeUnit unit,                   
                          BlockingQueue<Runnable> workQueue
                          ThreadFactory threadFactory,     
                          RejectedExecutionHandler handler)

参数的含义依次是:

  • corePoolSize ,核心线程的个数
  • maximumPoolSize,线程池能开启的线程最大值
  • keepAliveTime,unit 额外线程超时多久不工作,将销毁,如果调用了对象的allowCoreThreadTimeOut方法,将允许所有线程超时退出。
  • workQueue 任务队列,用来缓存消息的
  • threadFactory 线程创建工厂,可以用来设置线程的一些属性,比如线程组,名字,优先级,是否守护等。
  • handler 处理不了的消息将会调用这个对象的rejectedExecution方法来执行拒绝策略,可以看到ThreadPoolExecutor提供了几种默认实现。
    1.AbortPolicy 默认的处理方式,抛出异常。
    2.DiscardPolicy 啥也不干
    3.rejectedExecution 将缓存队列的头部(poll方法)取出,重新尝试一次
    4.CallerRunsPolicy 直接使用调用方的线程执行任务的run操作。

我们主要关注这几个方法:execute/submit,shutdown/shutdownNow。首先是 execute

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以看到初步的逻辑,如果当前的线程数量没达到核心线程数, 则直接创建线程执行任务,否则尝试调用队列的offer方法,将任务塞入队列,如果塞入失败,则执行拒绝策略。

进一步看看addWorker方法。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

此函数做的事情有:1.判断线程是否超限,是则退出,否则用cas的方式给工作线程数量自增。2.创建Worker对象,绑定一个新建线程和首个任务并start该线程。Worker是ThreadPoolExecutor实现了Runnable方法的内部类(可以直接保存外部类对象,所以可以直接访问外部类的方法)。

线程启动以后执行如下方法:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  • 1.代码比较清晰,获取首个任务/ThreadPoolExecutor对象的缓存队列里拿任务,并执行。看一下getTask方法。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

判断线程池是否被shutdown,随时准备退出。基于核心线程数量和用户是否允许核心线程退出,判断当前线程是否需超时退出。分别执行poll或take方法。这两个方法是阻塞的,等到任务或者超时了才返回。如果超时了还没等到任务,这个线程将退出。

Executor中默认提供的线程池实现

Executor中提供了几个默认的线程池实现,实际都是调用了ThreadPoolExecutor来创建的,我们先看看会使用到的任务队列。

  • 1.LinkedBlockingQueue 如其名,内部实现是一个FIFO的单向链表,比较简单,略过。

  • 2.SynchronousQueue 同步队列,生产者和消费者需要同时触发生产和消费的动作时,才能成功交换数据。SynchronousQueue 的poll,take,offer,put等方法操作实际是由内部的Transferer接口的transfer来完成的,这个接口有两种实现,TransferStack(FILO)和TransferQueue(FIFO)。下面以TransferStack为例作分析。

        E transfer(E e, boolean timed, long nanos) {
            /*
             * Basic algorithm is to loop trying one of three actions:
             *
             * 1. If apparently empty or already containing nodes of same
             *    mode, try to push node on stack and wait for a match,
             *    returning it, or null if cancelled.
             *
             * 2. If apparently containing node of complementary mode,
             *    try to push a fulfilling node on to stack, match
             *    with corresponding waiting node, pop both from
             *    stack, and return matched item. The matching or
             *    unlinking might not actually be necessary because of
             *    other threads performing action 3:
             *
             * 3. If top of stack already holds another fulfilling node,
             *    help it out by doing its match and/or pop
             *    operations, and then continue. The code for helping
             *    is essentially the same as for fulfilling, except
             *    that it doesn't return the item.
             */

            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
  • 第一个参数e为空的话,则是获取。以take为例,如果没有生产者,即当前队列为空,新建一个节点放入栈顶,同时调用awaitFulfill(内部会将线程信息放入节点,调用UnSafe类的park方法将当前线程挂起),后续还有消费者进来的话也是阻塞等待唤醒,直到有个生产者进来查询栈顶的waiter信息并唤醒它。

  • 第一个参数有值的话, 则是插入,过程和上面类似,有put的poll方法。

  • newCachedThreadPool
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

核心线程数为0,最大线程数未限制,用的是同步队列,所以特点是:任务消耗完一分钟之后,线程就全部销毁;如果在线程就绪的时候,塞入任务,可以马上被就绪线程消耗掉,如果没有空闲的线程,则开启新的线程来执行,所以会有线程开辟过多的风险。

  • newFixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

固定核心线程数量,但是队列是无限大的。任务过多且来不及处理可能会造成oom。

  • newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

newFixedThreadPool 的特殊情况,只有一个线程。

  • newScheduledThreadPool

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

固定核心线程,最大线程数量不做限制,不同的是,这个方法返回的是个ScheduledExecutorService,ScheduledExecutorService继承于ExecutorService,所以有了更多的功能,可以调用scheduleAtFixedRate来完成延时任务的执行,这里有疑问,DelayedWorkQueue队列大小是没有做限制的,创建线程池时用的最大线程数量Integer.MAX_VALUE好像无实际作用。

相关文章

网友评论

    本文标题:java线程池实现分析

    本文链接:https://www.haomeiwen.com/subject/stlrwftx.html