java线程池实现分析

作者: msrpp | 来源:发表于2018-08-31 17:58 被阅读6次

    线程池可以降低系统线程的重复创建销毁频率,给用户提供了很方便的多任务执行方式。

    ThreadPoolExecutor

    java中线程池的核心实现是ThreadPoolExecutor。
    ThreadPoolExecutor的继承关系如下

    image.png

    ThreadPoolExecutor的构造函数有很多,但是最后都是调用到了这个。

    public ThreadPoolExecutor(int corePoolSize,                
                              int maximumPoolSize,             
                              long keepAliveTime,              
                              TimeUnit unit,                   
                              BlockingQueue<Runnable> workQueue
                              ThreadFactory threadFactory,     
                              RejectedExecutionHandler handler)
    

    参数的含义依次是:

    • corePoolSize ,核心线程的个数
    • maximumPoolSize,线程池能开启的线程最大值
    • keepAliveTime,unit 额外线程超时多久不工作,将销毁,如果调用了对象的allowCoreThreadTimeOut方法,将允许所有线程超时退出。
    • workQueue 任务队列,用来缓存消息的
    • threadFactory 线程创建工厂,可以用来设置线程的一些属性,比如线程组,名字,优先级,是否守护等。
    • handler 处理不了的消息将会调用这个对象的rejectedExecution方法来执行拒绝策略,可以看到ThreadPoolExecutor提供了几种默认实现。
      1.AbortPolicy 默认的处理方式,抛出异常。
      2.DiscardPolicy 啥也不干
      3.rejectedExecution 将缓存队列的头部(poll方法)取出,重新尝试一次
      4.CallerRunsPolicy 直接使用调用方的线程执行任务的run操作。

    我们主要关注这几个方法:execute/submit,shutdown/shutdownNow。首先是 execute

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    可以看到初步的逻辑,如果当前的线程数量没达到核心线程数, 则直接创建线程执行任务,否则尝试调用队列的offer方法,将任务塞入队列,如果塞入失败,则执行拒绝策略。

    进一步看看addWorker方法。

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    此函数做的事情有:1.判断线程是否超限,是则退出,否则用cas的方式给工作线程数量自增。2.创建Worker对象,绑定一个新建线程和首个任务并start该线程。Worker是ThreadPoolExecutor实现了Runnable方法的内部类(可以直接保存外部类对象,所以可以直接访问外部类的方法)。

    线程启动以后执行如下方法:

        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • 1.代码比较清晰,获取首个任务/ThreadPoolExecutor对象的缓存队列里拿任务,并执行。看一下getTask方法。
    
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    

    判断线程池是否被shutdown,随时准备退出。基于核心线程数量和用户是否允许核心线程退出,判断当前线程是否需超时退出。分别执行poll或take方法。这两个方法是阻塞的,等到任务或者超时了才返回。如果超时了还没等到任务,这个线程将退出。

    Executor中默认提供的线程池实现

    Executor中提供了几个默认的线程池实现,实际都是调用了ThreadPoolExecutor来创建的,我们先看看会使用到的任务队列。

    • 1.LinkedBlockingQueue 如其名,内部实现是一个FIFO的单向链表,比较简单,略过。

    • 2.SynchronousQueue 同步队列,生产者和消费者需要同时触发生产和消费的动作时,才能成功交换数据。SynchronousQueue 的poll,take,offer,put等方法操作实际是由内部的Transferer接口的transfer来完成的,这个接口有两种实现,TransferStack(FILO)和TransferQueue(FIFO)。下面以TransferStack为例作分析。

            E transfer(E e, boolean timed, long nanos) {
                /*
                 * Basic algorithm is to loop trying one of three actions:
                 *
                 * 1. If apparently empty or already containing nodes of same
                 *    mode, try to push node on stack and wait for a match,
                 *    returning it, or null if cancelled.
                 *
                 * 2. If apparently containing node of complementary mode,
                 *    try to push a fulfilling node on to stack, match
                 *    with corresponding waiting node, pop both from
                 *    stack, and return matched item. The matching or
                 *    unlinking might not actually be necessary because of
                 *    other threads performing action 3:
                 *
                 * 3. If top of stack already holds another fulfilling node,
                 *    help it out by doing its match and/or pop
                 *    operations, and then continue. The code for helping
                 *    is essentially the same as for fulfilling, except
                 *    that it doesn't return the item.
                 */
    
                SNode s = null; // constructed/reused as needed
                int mode = (e == null) ? REQUEST : DATA;
    
                for (;;) {
                    SNode h = head;
                    if (h == null || h.mode == mode) {  // empty or same-mode
                        if (timed && nanos <= 0) {      // can't wait
                            if (h != null && h.isCancelled())
                                casHead(h, h.next);     // pop cancelled node
                            else
                                return null;
                        } else if (casHead(h, s = snode(s, e, h, mode))) {
                            SNode m = awaitFulfill(s, timed, nanos);
                            if (m == s) {               // wait was cancelled
                                clean(s);
                                return null;
                            }
                            if ((h = head) != null && h.next == s)
                                casHead(h, s.next);     // help s's fulfiller
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        }
                    } else if (!isFulfilling(h.mode)) { // try to fulfill
                        if (h.isCancelled())            // already cancelled
                            casHead(h, h.next);         // pop and retry
                        else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                            for (;;) { // loop until matched or waiters disappear
                                SNode m = s.next;       // m is s's match
                                if (m == null) {        // all waiters are gone
                                    casHead(s, null);   // pop fulfill node
                                    s = null;           // use new node next time
                                    break;              // restart main loop
                                }
                                SNode mn = m.next;
                                if (m.tryMatch(s)) {
                                    casHead(s, mn);     // pop both s and m
                                    return (E) ((mode == REQUEST) ? m.item : s.item);
                                } else                  // lost match
                                    s.casNext(m, mn);   // help unlink
                            }
                        }
                    } else {                            // help a fulfiller
                        SNode m = h.next;               // m is h's match
                        if (m == null)                  // waiter is gone
                            casHead(h, null);           // pop fulfilling node
                        else {
                            SNode mn = m.next;
                            if (m.tryMatch(h))          // help match
                                casHead(h, mn);         // pop both h and m
                            else                        // lost match
                                h.casNext(m, mn);       // help unlink
                        }
                    }
                }
            }
    
    • 第一个参数e为空的话,则是获取。以take为例,如果没有生产者,即当前队列为空,新建一个节点放入栈顶,同时调用awaitFulfill(内部会将线程信息放入节点,调用UnSafe类的park方法将当前线程挂起),后续还有消费者进来的话也是阻塞等待唤醒,直到有个生产者进来查询栈顶的waiter信息并唤醒它。

    • 第一个参数有值的话, 则是插入,过程和上面类似,有put的poll方法。

    • newCachedThreadPool
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    核心线程数为0,最大线程数未限制,用的是同步队列,所以特点是:任务消耗完一分钟之后,线程就全部销毁;如果在线程就绪的时候,塞入任务,可以马上被就绪线程消耗掉,如果没有空闲的线程,则开启新的线程来执行,所以会有线程开辟过多的风险。

    • newFixedThreadPool
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    

    固定核心线程数量,但是队列是无限大的。任务过多且来不及处理可能会造成oom。

    • newSingleThreadExecutor
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    

    newFixedThreadPool 的特殊情况,只有一个线程。

    • newScheduledThreadPool
    
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory);
        }
    

    固定核心线程,最大线程数量不做限制,不同的是,这个方法返回的是个ScheduledExecutorService,ScheduledExecutorService继承于ExecutorService,所以有了更多的功能,可以调用scheduleAtFixedRate来完成延时任务的执行,这里有疑问,DelayedWorkQueue队列大小是没有做限制的,创建线程池时用的最大线程数量Integer.MAX_VALUE好像无实际作用。

    相关文章

      网友评论

        本文标题:java线程池实现分析

        本文链接:https://www.haomeiwen.com/subject/stlrwftx.html