美文网首页
Java线程池详解(二)

Java线程池详解(二)

作者: java部落 | 来源:发表于2017-10-31 16:49 被阅读0次

    三、ThreadPoolExecutor解析

    上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面的图片展示了ThreadPoolExecutor的类图:

    Paste_Image.png

    ThreadPoolExecutor的类图
    下面是几个比较关键的类成员:

    private final BlockingQueue<Runnable> workQueue;  // 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行
     
     private final HashSet<Worker> workers = new HashSet<Worker>();//任务的执行值集合,来消费workQueue里面的任务
      
     private volatile ThreadFactory threadFactory;//线程工厂
          
     private volatile RejectedExecutionHandler handler;//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:
      
     1、CallerRunsPolicy:在调用者线程里面运行该任务
     2、DiscardPolicy:丢弃任务
     3、DiscardOldestPolicy:丢弃workQueue的头部任务
      
    private volatile int corePoolSize;//最下保活work数量
     
    private volatile int maximumPoolSize;//work上限
    

    我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。

    step 1: <ExecutorService>
    Future<?> submit(Runnable task); 
     
    step 2:<AbstractExecutorService>
        public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
     
    step 3:<Executor>
    void execute(Runnable command);
     
    step 4:<ThreadPoolExecutor>
     public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //提交我们的额任务到workQueue
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //使用maximumPoolSize作为边界
            reject(command); //还不行?拒绝提交的任务
    }
     
    step 5:<ThreadPoolExecutor>
    private boolean addWorker(Runnable firstTask, boolean core)
     
     
    step 6:<ThreadPoolExecutor>
    w = new Worker(firstTask); //包装任务
    final Thread t = w.thread; //获取线程(包含任务)
    workers.add(w);   // 任务被放到works中
    t.start(); //执行任务
    

    上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。

    Paste_Image.png

    worker类图
    上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:

    final Thread thread;
     
    Runnable firstTask; //我们提交的任务,可能被立刻执行,也可能被放到队列里面
    

    thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:

      public void run() {
            runWorker(this);
        }
         
        final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:

    private Runnable getTask() {
           boolean timedOut = false; // Did the last poll() time out?
     
           for (;;) {
               int c = ctl.get();
               int rs = runStateOf(c);
     
               // Check if queue empty only if necessary.
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                   decrementWorkerCount();
                   return null;
               }
     
               int wc = workerCountOf(c);
     
               // Are workers subject to culling?
               boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
     
               if ((wc > maximumPoolSize || (timed && timedOut))
                   && (wc > 1 || workQueue.isEmpty())) {
                   if (compareAndDecrementWorkerCount(c))
                       return null;
                   continue;
               }
     
               try {
                   Runnable r = timed ?
                       workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                       workQueue.take();
                   if (r != null)
                       return r;
                   timedOut = true;
               } catch (InterruptedException retry) {
                   timedOut = false;
               }
           }
       }
    

    其实核心也就一句:

    Runnable r = timed ?
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
               workQueue.take();
    

    我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:

    * When a new task is submitted in method {@link #execute(Runnable)},
    * and fewer than corePoolSize threads are running, a new thread is
    * created to handle the request, even if other worker threads are
    * idle.  If there are more than corePoolSize but less than
    * maximumPoolSize threads running, a new thread will be created only
    * if the queue is full.  By setting corePoolSize and maximumPoolSize
    * the same, you create a fixed-size thread pool. By setting
    * maximumPoolSize to an essentially unbounded value such as {@code
    * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
    * number of concurrent tasks. Most typically, core and maximum pool
    * sizes are set only upon construction, but they may also be changed
    * dynamically using {@link #setCorePoolSize} and {@link
    * #setMaximumPoolSize}.
    

    在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。可以参考下面的文档:

    * If the pool currently has more than corePoolSize threads,
    * excess threads will be terminated if they have been idle for more
    * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
    

    欢迎加入学习交流群569772982,大家一起学习交流。

    相关文章

      网友评论

          本文标题:Java线程池详解(二)

          本文链接:https://www.haomeiwen.com/subject/kbibpxtx.html