Executor框架
Executor总览
Executor Interface
该接口出现的意义(以应对无限制创建线程的不足):
1.线程生命周期开销非常高
在启动多线程时,一般是在循环中为每一个任务new一个Thread,线程的创建与
销毁是有代价的,并且为每一个任务都请求一个新的线程将会消耗大量资源。
2.资源消耗
如果可运行的线程数量多于可用的处理器数量,有些线程将会闲置,大量闲置的线程会占用许多内存,而且大量线程在竞争CPU资源时还将产生其他的性能开销,如果当前已经有足够多的线程使所有CPU保持忙碌状态,那么创建更多的线程反而会降低性能。
3.稳定性
在可创建线程数上存在一个限制,这个限制将随平台的不同而不同,且受多个因素制约,如果打破了这些限制,虚拟机将会抛出OutOfMemoryError。
如上图所示,Executor为异步任务执行提供了基础,提供了一种标准的方法将任务的提交过程与执行过程解耦开来。 当看到new Thread(Runnable).start()时,用executor代替。ExectuorService
它的子类ExectuorService提供了更多功能(可以更好的管理任务的终止。并且返还Future类,更好的实现任务的异步)
官方文档对其主要方法的介绍,主要介绍了他的两个shutdown,两个shutdown被调用后都会拒绝接受新的任务,但是区别在于,shutdown方法会平缓的过度,即让先前提交的任务继续执行,全部执行完毕后才会terminate,但是shutdownNow则更加暴力,直接terminate,并且直接结束当前正在运行的任务,但是会返回一个list,包含了还未执行的任务。
两个shutdown方法如下
其处理任务的核心方法是submit,该方法比父接口中的execute强在可以返回一个Future类,可以更好的实现异步
ThreadPoolExecutor
该子类即为线程池,定义如下
该类的构造函数比较复杂多变,一般由工具类Executors的工厂方法来生成,也可自己定制。
文档中介绍其主要有点如下
1.减少了每个任务调用的开销,执行大量异步任务时提供了更好的性能
2.提供了管理资源(比如线程)的途径
Core and maximum pool sizes
该类中两个比较关键的state
corePoolSize和MaximumPoolSize这两个变量在文档中的解释上面图片红色蓝色对应两种情况
1.当有新任务提交,若当前线程数小于corePoolSize,那么即使此时有其他空闲的线程,线程池仍然会new一个新的线程来处理该任务。
2.当有新任务提交,若当前线程数大于corePoolSize但小于maximumPoolSize时,任务会被加入到等待队列中,若等待队列已满,线程池才会new一个新的线程来处理。
KeepAlive属性
当线程数量大于corePoolSize时,闲置时间超过keepAliveTime的线程将会被收回。
Queue的比较
对于这三种BlockingQueue的选择,这篇文章 有分析在实际场景中的应用
这里稍微总结下该文章的几个观点
线程池有个问题,就是当新任务提交时,他倾向与加入队列(即使线程池内有空闲的进程),而不是直接处理,SynchronousQueue因为其特殊性可以解决该问题。
在jdk7中,加入了一个新的BlockingQueue------LinkedTransferQueue,实现了TransferQueue
JDK中队TransferQueue的简述TransfefQueue和BlockingQueue的主要区别如下
TransferQueue可等待消费者前来“消费” 两者从设计概念上讲,TQ更细化TransferQueue和SynQueue的比较
LinkedTransferQueue和SynchronousQueue和类似,可以看作前者是后者的升级版,都采用CAS无锁算法,并且都使用了Dual data structure,但是LTQ可以缓存,所以效率会高很多
这里附上我对这几个队列的总结:
我在某个深夜在手机记事本上随手写的总结线程池的选择
---newFixedThreadPool (corepoolsize=maxpoolsize,LinkedBlockingQueue)
将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)。
---newCachedThreadPool (corepoolsize=0,maxpoolsize=INTEGER.MAX,SynchronousQueue)
将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
---newSingleThreadExecutor (LinkedBlockingQueue)
是一个单线程的Executor,它创建单个工作者线程来
执行任务,如果这个线程异常结束,会创建另一个线程来替代。
newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行(例如
FIFO、LIFO、优先级)。
---newScheduledThreadPool创建了一个固定长度的线程池,而且以延迟或定时的
方式来执行任务,类似于Timer。
饱和策略
·CallerRunsPolicy:不抛弃任务也不抛出异常,而是将任务退回调用者(有疑问?)
·AbortPolicy: 默认策略,抛出RejectedExecutionException
·DiscardOldestPolicy:丢弃队列最近的一个任务(优先级最低),并执行当前任务。
·DiscardPolicy:不处理,丢弃掉。
·当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务
Callable和Future/FutureTask
Callable就是一个可以返回结果可以抛出异常的runnable
Future表示一个任务的生命周期,并提供方法来判断是否已经完成或取消以及获取任务的结果和取消任务,其隐含意义是,任务的生命周期只能前进,不能后退,当某个任务完成后,他就永远停留在完成的状态上。
JDK中队Future的描述FutureTask实现了Future和Runnable接口
JDK中队FutureTask的描述对Future的方法总结:
Sample Usage
官方给的例子实质上,在AbstractExecutorService中的submit方法,就是把传进来的runnable包装成FutureTask,再调用execute方法
最后补充下futuretask的get方法
其中awaitDone用的是CAS的无锁算法,在一个无限循环中判断是否完成计算,最后report方法把结果返回。
这个outcome就是
get方法有个可以代时间参数的版本,若超时会抛出TimeoutException。
CompletionService:
CompletionService将Executor和BlockingQueue的功能融合在一起,如果向Executor提交了一组计算任务,并且希望在计算完成后获取结果,一种方式是保留与每个任务相连的Future,然后反复get,同时将参数timeout指定为0,从而通过轮训来判断任务是否完成。另一种更好的方式就是使用CompletionService。这两种方式对比如下图:
CompletionService中主要方法,注意poll和take的区别
若队列为空poll返回null,而take会一直等待ExecutorCompletionService
这个类的思想,就是把Executor处理好返回的Future放到一个BlockingQueue中(默认是LinkedBlockingQueue),并提供方法访问这个队列(take,poll因为这两个方法是调用BlockingQueue的对应方法,所以都是线程安全的)。
下面是官方文档中给出的例子,总而言之,这个方法就是更方便的提供操作一系列已提交的任务的功能。
ExecutorCompletionServic的官方demo最后附上Stackoverflow上对这个粗率的描述
ScheduledExecutorService
如文档所述,这是一个可以延迟或周期性执行任务的ExecutorService
主要方法如下
前两个方法就是延迟一定时间后执行,后面两个方法,用于周期性的执行
--scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit)
首先延迟initialDelay,然后每隔period便执行一次(如果该command执行时间超过period,那么会等这个command执行完后再执行,而不会并发的同时与两个command执行)
--scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit)
首先延迟initialDelay,然后执行完毕后,再延迟delay,再执行
ScheduledThreadPoolExecutor
因为这是一个“周期执行任务”的线程池,所以在设计上,应该就是要保持线程数不变,jdk文档里也说了,它更像是一个fixed-sized pool,也不建议将corePoolSize设置为0,因为这导致线程池内无可用线程去处理其他任务(我的理解是,因为corePoolSize是0,任务进来后会直接创建线程去执行,任务多后,线程创建过多,可能无法去执行新的任务)
这个类的submit方法就是去调用自己的无delay的schedule方法,
这也是这个类最核心的方法,其中的decorateTask方法就是把原来的传进来的runnable包装成一个ScheduleFutureTask,然后执行。
DelayedExecute方法如下
简单的说就是判断加执行,再看下RunableScheduleFututre
ScheduleFututre
该类是一个内部类
主要就是多了一些关于延时,周期性执行的方法。
以上就是自己对Exectuor系列的总结,在网上找了很多资料,摘取了一些自己认为重要的部分,加上自己的理解,写下了这篇文章,因为水平有限,如有错误,请多指教。
网友评论