线程池

作者: 码农崛起 | 来源:发表于2018-05-14 10:01 被阅读0次
    Executor.png

    Executor的主要作用是解耦任务提交和任务执行(包括如何使用线程,如何调度)

    class DirectExecutor implements Executor {
       public void execute(Runnable r) {
        r.run();
      }
     }
    

    Executor本身并不表示使用线程

    ExecutorService提供了关闭机制以及提交任务返回Future对象用于追踪任务执行进度或取消任务。

    FutureTask.png

    先分析一下AbstractExecutorService实现中用到的Future的实现类FutureTask的实现机制

    状态.png

    如果当前的任务是Runnable,通过RunnableAdapter转为Callable

    RunnableAdapter.png run.png

    FutureTask自身实现了Runnable,包装内部的Callable或Runnable

    get.png awaitDone.png report.png

    Future#get实现机制:如果任务还没开始,调用线程加入任务的等待队列,等待任务完成或取消时被唤醒,否则等待任务到达最终状态,正常执行返回结果或异常时抛出ExecutionException异常

    cancel.png finishCompletion.png

    Future#cancle实现机制:如果任务还没开始,状态改为INTERRUPTING或CANCELLED,如果支持中断,打断当前线程(参考run方法,先设置runner线程,再修改状态,所以可能当前状态是NEW,但是runner已经设置了),然后唤醒所有之前等待的线程。

    AbstractExecutorService的实现机制:任务的具体执行都委托给从Executor继承的execute方法,主要实现了submit和invokeAll,invokeAny方法。

    invokeAll.png ExecutorCompletionService.png

    任务的执行都委托给Executor,所有提交的任务都用QueueingFuture包装,任务执行完加入内部的BlockingQueue。

    invokeAny.png

    invokeAny:先提交一个任务,然后循环检查ExecutorCompletionService的阻塞队列是否有已完成的任务,有就返回,没有就再提交一个新任务,直到任务都提交完,然后阻塞。第一个任务完成后,cancel所有可以cancel的任务。

    AbstractExecutorService有两个具体的子类:ThreadPoolExecutor和ForkJoinPool,ScheduledThreadPoolExecutor又继承了ThreadPoolExecutor

    ThreadPoolExecutor:

    线程池运行状态.png 线程池参数.png

    workQueue表示任务队列,workers表示当前执行任务的线程集合。

    Worker.png

    Worker继承了AbstractQueuedSynchronizer,自身就是一个简单的互斥锁,实现了Runnable,Worker在构造时内部会利用ThreadFactory产生一个线程,线程启动时,执行Worker自身的run方法。

    runWorker.png

    Worker执行过程中,会通过getTask获取任务,每次执行任务之前都会获取worker自身的互斥锁

    getTask.png

    getTask通过返回null(线程池stop,或shutdown之后任务队列为空,或者动态调整参数之后线程太多,或者获取任务超时(说明任务太少了,不需要那么多线程)),控制Worker结束循环

    processWorkerExit.png

    Worker循环结束有两种原因:执行的任务抛出异常,getTask返回null。
    如果是后者,再次检查以确保目前的线程数不低于最低要求,线程数不够时添加worker线程。因异常而结束任务循环也会添加新的worker线程。

    addWorker-1.png addWorker-2.png

    添加worker失败的原因有三:线程池stop;shutdown之后任务队列为空;当前线程数超过最大线程数。worker添加成功之后,启动内部的线程,开始循环处理任务。

    execute.png

    关键点在于,核心线程全部启动之后,任务会先加入任务队列,只有任务队列是有界队列,且队列满了才会启动非核心线程!!!

    shutdown.png interruptIdleWorkers.png tryTerminate.png

    shutdown之后,修改状态为SHUTDOWN,然后打断所有idle线程,所谓idle,就是可以获取worker的互斥锁,说明worker当前在等待任务而不是执行任务,参考runWorker方法。如果当前所有worker正巧都在等待任务,所有worker都会被打断(processWorkerExit方法会在worker退出循环时调用,根据情况再添加worker)。tryTerminate中会先检查如果当前状态是SHUTDOWN但是任务队列不为空,不能进入terminal状态,如果当前是shutdown且任务队列为空且线程数为空,修改状态为过渡状态TIDYING,然后修改为最终状态TERMINATED。

    shutdownNow.png

    打断所有已经启动的worker,返回所有还未执行的任务。

    awaitTermination.png

    shutdown之后线程池并不一定关闭!!!所以正确的做法是shutdown之后调用awaitTermination等待所有任务执行完后所有线程被打断。

    ThreadPoolExecutor.png

    ThreadPoolExecutor可控制参数:
    corePoolSize:核心线程数,worker数量小于corePoolSize时每次提交任务都启动一个core线程,可以使用set方法在运行时调整。
    maximumPoolSize:最大线程数,包括core和非core线程,从上面的源码分析可以直到只有任务队列为有界队列时才会启动非core线程。
    workQueue:任务队列,只有任务队列为有界队列时才会启动非core线程。
    keepAliveTime:worker在指定时间内获取不到任务,说明此时人浮于事,需要裁员,getTask会返回null,结束获取任务超时的worker。
    threadFactory:定义如何产生线程,默认直接new Thread。
    handler:提交任务时任务队列满了或线程池shutdown之后的行为,默认抛出RejectedExecutionException异常,可选策略包括忽略(DiscardPolicy),在提交任务的线程中执行(CallerRunsPolicy),移除任务队列里最前面的任务(DiscardOldestPolicy)。
    keepAliveTime:如果通过set设置了值,如果一个worker超过指定时间未获得任务就会timeout而结束循环,如果当前线程数超过了corePoolSize,不会再添加新的worker,默认不支持timeout。
    allowCoreThreadTimeOut:默认线程数小于corePoolSize,timeout之后就会添加新的worker,如果设置了allowCoreThreadTimeOut,只有当前线程为0时才会添加新的worker。

    下面分析一下ThreadPoolExecutor的子类ScheduledThreadPoolExecutor的实现机制:

    ScheduledThreadPoolExecutor.png

    从构造上看,主要是任务队列使用了DelayedWorkQueue,DelayedWorkQueue是一个简单的基于二叉堆实现的优先级阻塞无界队列,所有任务按触发时刻排序,keepAliveTime为0,不支持worker超时。从上文的分析可知,使用无界队列时是不会启动非core线程的,maximumPoolSize设置成了Integer.MAX_VALUE而不是corePoolSize,避免运行时修改corePoolSize时还要修改maximumPoolSize。

    ScheduledFutureTask.png

    所有提交的任务都会用ScheduledFutureTask包装

    compareTo.png

    任务先按触发时刻排序,同时触发的任务按提交顺序排序

    run.png setNextRunTime.png triggerTime.png

    如果是重复任务,任务执行完,计算下次触发时刻,重新加入任务队列。此处有一个细节:就算是fixed-rate的任务,也是上次执行完之后才会再次加入任务队列。

    onShutdown.png

    shutdown之后不允许提交新任务,如果是之前提交的延迟任务还没到时间或者是周期性任务,根据参数决定是否还能继续执行,默认运行继续等待执行延迟任务,不允许执行周期任务。

    ForkJoinPool:日后补充!!!

    下面来分析一下Executors里的静态方法构造的都是什么线程:

    newFixedThreadPool.png

    无界队列,不支持timeout,固定线程数。

    newSingleThreadExecutor.png

    newSingleThreadExecutor = newFixedThreadPool(1)

    newCachedThreadPool.png

    使用特殊的队列SynchronousQueue,相当于容量为1的阻塞队列,只有这样,如果已经有任务在等待执行了,再次提交任务时才会启动非core线程。

    newScheduledThreadPool.png

    相关文章

      网友评论

          本文标题:线程池

          本文链接:https://www.haomeiwen.com/subject/lntprftx.html