随意使用而不加以管理,只会让好事变坏事。 ----佚名
线程因为thread类和runnable/callable接口而很容易创建。但是并发同步操作,绝不是你成功创建了线程,成功执行了线程就可以放心使用的了。因为线程资源是比较珍贵的系统资源,可以说用一个,就少一个。无视线程的管理,到最后极有可能会捡了芝麻丢了西瓜。通常管理线程的方法,就是线程池。java提供了一个成熟的线程池框架。如下图。
image.png
该框架的根本是执行器。与之对应还有同步器。
执行器只有一个执行方法execute,它可以运行Runnable。继承它的ExecutorService则扩展了shutdown这样的结束方法。而最基本的线程池基类ThreadPoolExecutor则建立在这两个接口上。所以线程池可以调度线程对象。
首先我们要看下ThreadPoolExecutor核心的代表有效线程数和线程池状态的一个变量ctl。
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
*
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
然后我们看下线程池整体运行图
image.png
由上图我们可知,提交到线程池的线程需要worker这个对象进行执行。查看源码发现该对象实际是继承AbstractQueuedSynchronizer(AQS)实现了个非重入的互斥锁。它的类注释如下
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state.
可以知道它是一个抽象的同步器队列,同步器参考知识
image.png
不过一般我们不会直接使用这个基类创建线程池。java已经提供了好几种不同用处的线程池。
newFixedThreadPool()方法:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理任务队列中的任务。
newSingleThreadExecutor()方法:该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
newCachedThreadPool()方法:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
newSingleThreadScheduledExecutor()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。
newScheduledThreadPool()方法:该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。
我们可以根据自己的实际情况选择不同的生成器方法生成需要的线程池。当然,无论是哪种线程池都是基于ThreadPoolExecutor类的封装。更具体的说,是对于其构造函数的不同使用。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters, the default thread factory and the default rejected
* execution handler.
*
* <p>It may be more convenient to use one of the {@link Executors}
* factory methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
等待队列是线程池技术的重要组成部分。一般线程池执行的流程如下
-
如果工作线程数量小于核心线程数量,则创建新的线程处理提交的任务。
-
如果工作线程数量大于等于核心线程数量,且没有超过最大线程数,则将新提交的任务,加入工作队列中等待执行。
-
如果工作线程数量大于等于核心线程数量,且工作队列已满,工作线程数量又于小最大线程数量,则创建新的线程处理提交的任务。
-
如果工作线程数量大于最大线程数量或者线程池是不在运行,执行拒绝策略。
而不同的线程池会使用的不同的队列行实现自己需要的对待队列。队列类型有:直接提交队列SynchronousQueue;有界队列ArrayBlockingQueue;无界队列LinkedBlockingQueue;优先级队列 PriorityBlockingQueue。
例如比较常用的newFixedThreadPool就是使用了无界队列LinkedBlockingQueue实现自己的等待队列。理论上只要系统资源支持,它可以一直增大。
线程池创建时其实还有个重要的参数,拒绝策略。它一般会在线程池工作线程数大于最大线程数时执行。其中jdk内置的策略有
image.png
最后要说的是,其实哪怕是线程池已经是比线程更进一步的好用了,我们其实也不一定要直接通过jdk创建。像hutool这样的工具,已经提供了比较好的封装ThreadUtil。尤其是它给了一个自定义线程池构造器ExecutorBuilder。
image.png
网友评论