Java的一大优势是能完成多线程任务,对线程的封装和调度非常好,那么它又是如何实现的呢?
jdk的包下和线程相关类的类图。
线程相关类图从上图可以看出Java线程池的实现类主要有两个类ForkJoinPool和ThreadPoolExecutor。
ThreadPoolExecutor :标准线程池
ScheduledThreadPoolExecutor: 支持延迟任务的线程池
ForkJoinPool :类似于ThreadPoolExecutor ,但是使用work-stealing模式,其会为线程池中每个线程创建一个队列,从而用work-stealing(任务窃取)算法使得线程可以从其他线程队列里窃取任务来执行。即如果自己的任务处理完成了,则可以去忙碌的工作线程那里窃取任务来执行
使用Executors来创建线程池。
1、创建单线程的线程池。
public static ExecutorServicenewSingleThreadExecutor(){
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2、创建固定数量的线程池
public static ExecutorServicenewFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3、创建可缓冲的线程池
public static ExecutorServicenewCachedThreadPool(){
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
其中ThreadPoolExecutor的构造函数中有几个参数,现在介绍这些参数,是理解线程池工作原理的重要方式:
1、第一个参数:int corePoolSIze,核心池大小,也就是线程池中会维持不被释放的线程数量。我们可以看到FixedThreadPool中这个参数值就是设定的线程数量,而SingleThreadExcutor中就是1,newCachedThreadPool中就是0,不会维持,只会缓存60L。但需要注意的是,在线程池刚创建时,里面并没有建好的线程,只有当有任务来的时候才会创建(除非调用方法prestartAllCoreThreads()与prestartCoreThread()方法),在corePoolSize数量范围的线程在完成任务后不会被回收。
2、第二个参数:int maximumPoolSize,线程池的最大线程数,代表着线程池中能创建多少线程池。超出corePoolSize,小于maximumPoolSize的线程会在执行任务结束后被释放。此配置在CatchedThreadPool中有效。
3、第三个参数:long keepAliveTime,刚刚说到的会被释放的线程缓存的时间。我们可以看到,正如我们所说的,在CachedThreadPool()构造过程中,会被设置缓存时间为60s(时间单位由第四个参数控制)。
4、第四个参数:TimeUnit unit,设置第三个参数keepAliveTime的时间单位。
5、第五个参数:存储等待执行任务的阻塞队列,有多种选择,分别介绍:
SynchronousQueue——直接提交策略,适用于CachedThreadPool。它将任务直接提交给线程而不保持它们。如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求最大的 maximumPoolSize 以避免拒绝新提交的任务(正如CachedThreadPool这个参数的值为Integer.MAX_VALUE)。当任务以超过队列所能处理的量、连续到达时,此策略允许线程具有增长的可能性。吞吐量较高。
LinkedBlockingQueue——无界队列,适用于FixedThreadPool与SingleThreadExcutor。基于链表的阻塞队列,创建的线程数不会超过corePoolSizes(maximumPoolSize值与其一致),当线程正忙时,任务进入队列等待。按照FIFO原则对元素进行排序,吞吐量高于ArrayBlockingQueue。
ArrayBlockingQueue——有界队列,有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
ThreadPoolExcutor的构造方式不仅有这一种,总共有四种,还可以在最后加入一个参数以控制线程池任务超额处理策略:
当用来缓存待处理任务的队列已满时,又加入了新的任务,那么这时候就该考虑如何处理这个任务。
可以通过实现RejectedExceptionHandler接口,实现rejectedException(ThreadPoolExecutor e, Runnable r)方法自定义操作。但通常我们使用JDK提供了4种处理策略,在ThreadPoolExecutor构造时以参数传入:
ThreadPoolExcutor.AbortPolicy()——直接抛出异常,默认操作
ThreadPoolExcutor.CallerRunsPolicy()——只用调用者所在线程来运行任务
ThreadPoolExcutor.DiscardOldersPolicy()——丢弃队列里最近的一个任务,并执行当前任务
ThreadPoolExcutor.DiscardPolicy()——不处理,直接丢掉
接着,我们看一下线程池中比较重要的execute方法,该方法用于向线程池中添加一个任务。
execute方法核心模块用红框标记了。
第一个红框:workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;
第二个红框:判断线程池是否在运行,如果在,任务队列是否允许插入,插入成功再次验证线程池是否运行,如果不在运行,移除插入的任务,然后抛出拒绝策略。如果在运行,没有线程了,就启用一个线程。
第三个红框:如果添加非核心线程失败,就直接拒绝了。
接下来,我们看看如何添加一个工作线程的?
添加worker线程
从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务,代码如下(这里代码有点长,没关系,也是分块的,总共有5个关键的代码块):
第一个红框:做是否能够添加工作线程条件过滤:
判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;
第二个红框:做自旋,更新创建线程数量:
通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,则跳出循环,开始创建新的线程
有人或许会疑问 retry 是什么?这个是java中的goto语法。只能运用在break和continue后面。
接着看后面的代码:
第一个红框:获取线程池主锁。
线程池的工作线程通过Woker类实现,通过ReentrantLock锁保证线程安全。
第二个红框:添加线程到workers中(线程池中)。
第三个红框:启动新建的线程。
接下来,我们看看workers是什么。
一个hashSet。所以,线程池底层的存储结构其实就是一个HashSet。
worker线程处理队列任务
第一个红框:是否是第一次执行任务,或者从队列中可以获取到任务。
第二个红框:获取到任务后,执行任务开始前操作钩子。
第三个红框:执行任务。
第四个红框:执行任务后钩子。
这两个钩子(beforeExecute,afterExecute)允许我们自己继承线程池,做任务执行前后处理。
到这里暂时结束
网友评论