美文网首页
线程池的内部实现

线程池的内部实现

作者: topshi | 来源:发表于2019-05-04 23:02 被阅读0次

    核心线程池的内部实现

    对于核心的几个线程池,newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPool,其内部都使用了ThreadPoolExecutor实现。

    public static ExecutorService newFixedThreadPool(int var0) {
          return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, 
                                                    new LinkedBlockingQueue());
     }
    public static ExecutorService newSingleThreadExecutor() {
         return new Executors.FinalizableDelegatedExecutorService(
                    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 
                                                  new LinkedBlockingQueue())
                                                                  );
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, 
                                                   new SynchronousQueue());
    }
    

    ThreadPoolExecutor构造函数:

    public ThreadPoolExecutor( int corePoolSize, 
                               int maximumPoolSize, 
                               long keepAliveTime, 
                               TimeUnit unit, 
                               BlockingQueue<Runnable> workQueue, 
                               ThreadFactory threadFactory, 
                               RejectedExecutionHandler handler)
    

    参数含义如下:

    • corePoolSize:线程池中的线程数量。
    • maximumPoolSize:线程池中的最大线程数量。
    • keepAliveTime:当线程池中线程数量超过corePoolSize,多余空闲线程的存活时间。
    • unit:keepAliveTime的单位。
    • workQueue:任务队列,被提交但是尚未执行的任务。
    • threadFactory:线程工厂,用于创建线程,一般使用默认。
    • handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

    以上参数重点:workQueue和handler。

    workQueue

    workQueue指被提交但尚未被执行的任务队列,它是一个BlockingQueue接口的对象,用于存放Runnable对象。
    ThreadPoolExecutor构造函数可使用的几种BlockingQueue

    • 直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue没有容量,每个插入操作都要等待一个相应的删除操作,每个删除操作都要等待一个对应的插入操作。SynchronousQueue并不会保存提交的任务,而总是将新提交的任务马上执行(进队马上出队,像通道),如果没有空闲线程,则尝试新建线程,如果线程数量达到最大值,则执行拒绝策略。因此使用SynchronousQueue时,maximumPoolSize要设得很大,否则很容易执行拒绝策略。
    • 有界的任务队列:有界的任务队列可使用ArrayBlockingQueue实现。ArrayBlockingQueue的构造函数接受一个容量参数,指定队列的容量。若有新任务需要执行,如果线程池的实际线程小于corePoolSize,则优先创建新的线程,若大于corePoolSize,则将新任务加入到等待队列。如果队列已满导致无法加入,若总线程数小于maximumPoolSize,则创建新的线程执行任务,若总线程数大于maximumPoolSize,则执行拒绝策略。
    • 无界的任务队列:无界任务队列可通过LinkedBlockingQueue类实现。与有界队列不同,除非系统资源耗尽,否则不存在任务入队失败的情况。当新任务提交,若线程数小于corePoolSize,线程池会创建新的线程执行任务,若线程数达到corePoolSize,不再会创建新线程,新任务会被加入到队列。
    • 优先任务队列:优先任务队列是带有执行优先级的队列。通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序,是一个特殊的无界队列(非FIFO)。

    几种线程池使用的workQueue

    • newFixedThreadPool返回一个corePoolSizemaximumPoolSize相同,workQueueLinkedBlockingQueue的线程池。因为线程数固定,不存在动态变化,因此maximumPoolSize可以随便设定。因为使用了无界任务队列,如果任务提交频繁处理不过来,任务队列会膨胀,从而耗尽系统资源。
    • newSingleThreadExecutor返回一个单个线程的线程池。它和newFixedThreadPool基本一样,只不过将线程数设置为1而已。
    • newCachedThreadPool返回一个corePoolSize为0,maximumPoolSize为无穷大,workQueueSynchronousQueue的线程池。该线程池若无任务提交,池中则无线程,若有任务提交,则创建线程执行任务。SynchronousQueue是直接提交队列,线程池总是会创建线程来执行新提交的任务。当所有的任务都执行完,由于corePoolSize = 0,也就是池中所有线程都是多余的空闲线程了,那么线程池会在指定时间内(60s)销毁所有线程。

    ThreadPoolExecutor线程池核心调度

       public void execute(Runnable var1) {
            if (var1 == null) {
                throw new NullPointerException();
            } else {
                int var2 = this.ctl.get();
                //线程总数小于corePoolSize,创建线程执行任务
                if (workerCountOf(var2) < this.corePoolSize) {
                    if (this.addWorker(var1, true)) {
                        return;
                    }
                    var2 = this.ctl.get();
                }
                //线程总数大于corePoolSize,将任务加入队列
                if (isRunning(var2) && this.workQueue.offer(var1)) {
                    int var3 = this.ctl.get();
                    if (!isRunning(var3) && this.remove(var1)) {
                        this.reject(var1);
                    } else if (workerCountOf(var3) == 0) {
                        this.addWorker((Runnable)null, false);
                    }
                //入队失败(有界队列已满,或使用了SynchronousQueue)
                } else if (!this.addWorker(var1, false)) {
                    //线程数达到最大,执行拒绝策略
                    this.reject(var1);
                }
            }
        }
    

    addWorker(var1, true/false)的第二个参数为true时表示线程总数小于corePoolSize,否则大于corePoolSize

    拒绝策略

    当线程池的线程用完了,同时等待队列也塞满了线程,无法再创建新的线程来执行任务时,需要有一套机制合理地处理这个问题。
    JDK内置提供了四种拒绝策略:

    • AbortPolicy策略:该策略直接抛出异常,阻止系统正常工作。
    • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。这样做不会真正的丢弃任务,但任务提交线程的性能极有可能急剧下降。
    • DiscardOldestPolicy策略:该策略将丢弃最老的请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
    • DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。

    以上内置的策略均实现了RejectedExecutionHandler接口,若以上策略无法满足实际应用,可自己扩展该接口。

    相关文章

      网友评论

          本文标题:线程池的内部实现

          本文链接:https://www.haomeiwen.com/subject/qdwxoqtx.html