了解了线程相关的知识,我们回到我们的系统开发中,怎么在实际的系统的开发中使用到线程,如果我们需要使用线程来实现群发邮件的效果,从消息队列中取到邮件相关的信息,然后通过邮件发送服务器发送邮件。实现如下,当有一个消息需要发送就需要new一个线程,那么当系统中需要有几千个或几万个邮件需要发送时,那么就会出现线程数量太多而造成资源消耗严重,甚至是难以想象的异常。那么怎么控制创建多个线程,怎么管理多个线程,Java并发编程提供了一个非常重要的类--线程池。
优点:
1.降低资源消耗:线程池可以重复使用已创建的线程,降低了重新创建线程和销毁线程的好处。
2.提高响应速度:当任务到达需要执行时,任务不需要等待线程的创建而直接执行任务
3.提高线程的可靠性:使用线程池可以对任务做到统一分配,调优和监控。
关键类:
线程池关键类和方法图1. Executor: 线程池最顶层接口,只提供了execute(Runnable command)方法。之后的类都是在此在接口上进行扩展。
2.ExecutorService: 对顶层接口扩展的接口,提供关闭(shutDown),立即关闭(shutDownNow),是否关闭(isShutDown),是否终止(isTerminated),提交Callable任务的抽象方法。
3.AbstractExecutorService: 对ExecutorService接口的扩展,提供了newTaskFor(),主要是将Runnable、Callable的任务封装为一个实现了Future和Runnable的对象,能够实现对结果的返回。
4. Executors :线程池提供的一个工具类,可以用于创建固定大小的线程池、缓存线程池、单线程线程池等。
后边会详细介绍剩余的两个类:ThreadPoolExecutor和SchedulePoolExecutor
ThreadPoolExecutor详解:
创建线程池构造方法图1 创建线程池构造方法图2 创建线程池构造方法图3主要参数:
1. corePoolSize: 核心线程池线程数量,
2. maximumPoolSize: 最大线程数量,
3. keepAliveTime: 线程空闲时间
4.unit:空闲时间单位
5.workQueue: 工作队列,主要有四种实现方式 。
a. ArrayBlockingQueue: 创建固定大小的阻塞队列, 采用的是数组的结构方式
b. LinkedBlockingQueue: 创建固定大小的阻塞队列,如果为传入参数,则会创建Integer.MaxValue大小的队列
c. SynchronousQueue: 创建一个不存储元素的阻塞队列,每一个元素的插入都必须等待一个元素的移除操作,不然会一直阻塞。
d. PriorityBlockingQueue: 一个具有优先级的无限阻塞队列。
6. ThreadFactory: 用于设置线程的工厂,通过改类可以给提交的线程创建更有意义的名字。
实现原理:
1.当一个任务被提交时,线程池会判断当前核心线程是否已经达到最大,如果没有则会创建一个新的工作线程执行任务,该步骤会进行加锁; 如果已满,则会 执行步骤2。
2.尝试将任务进入到阻塞队列中,如果成功添加到阻塞队列中,之后当有工作线程执行完毕之后就会从工作队列中获取一个任务继续执行;如果加入到阻塞队列失败,则会执行步骤3。
3.判断运行的线程数量是否已达到最大的设置的线程数,如果未达到,则创建新的工作线程来执行任务,该步骤会进行加锁;如果已达到则执行用户设置的执行策略,转到步骤4。
4.线程池根据以下4种RejectExecutionHandle执行策略来进行当不能执行任务时具体的做法。
(1). AbortPolicy: 实现了该接口,但是在实现的方法中直接抛出一个异常。默认不执行,直接返回异常。
(2).DiscardPolicy: 实现方法没有做任何操作。
(3).CallerRunsPolicy: 判断当前线程池是否是未关闭的状态,如果是未关闭的状态则直接调用任务的run方法。
CallerRunsPolicy实现方法图(4).CallerOldestPolicy: 如果线程池处于未关闭的状态,则获取线程池中的队列,丢弃队列最近的一个任务,并执行当前任务。
CallerOldestPolicy实现方法图(5). 用户可以自己实现RejectExecutionHandler 方法,然后在实例化ThreadPoolExecutor时,将其作为参数传入。
源码分析:
1. 在进行源码分析之前,先看下ThreadPoolExecutor中几个非常重要的静态常量和静态方法。
ThreadPoolExecutor常量和方法(1) ctl: 一个在线程池整个运行过程中都非常重要的一个常量,高3位表示运行的状态,低29未表示线程池中运行的线程数量。
(2)COUNT_BITS: 将Interger的大小32-3, 即为29。
(3)CAPACITY: 将1左移29位,然后再减掉1,即低29位都为1.
(4)RUNNING: 将-1转换为2进制转换位,然后将转换后二进制为往左移动29位,移动后高3位即为110。
(5)SHUTDOWN: 同(2),移动后高3位000。
(6)STOP: 同(2),移动后高3位001。
(7)TIDYING: 同(2),移动后高3位010。
(8)TERMINATED: 同(2), 移动后高3位101。
(9)runStateOf(int c): c为当前的ctl值,将其和CAPACITY的值取反(即低29位为0,高3位为1),然后位与之后只即为当前线程池的状态。
(10)workCountof(int c): c为当前ctl的值, 将其和CAPACITY的值进行&之后,获取的即是当前线程池线程的数量。
2. 进入到ThreadPoolExecutor的入口方法executor(Runnable command),代码如下:
ThreadPoolExecutor入口executor()方法图(1) 首先获取当前ctl的值,之前已经说过ctl可以保存当前线程池的状态和线程数量。
(2)判断线程池中的数量是否小于核心线程数,如果小于,则将当前的任务加入到工作线程中。若不小于核心线程数,则执行步骤3.
(3)判断线程池的状态是否处于正在运行中,如果处于的话则将当前的任务加入到工作队列中。我之前一直怀疑为什么在为什么在addWorker方法中会加锁机制,而加入队列时未加锁机制,在看了BlockingQueue的源码之后发现,它的插入、删除操作也使用重入锁机制,这样就保证了并发加入队列的操作是安全的。若执行加入队列成功,则执行步骤4,若不成功则执行步骤6.
(4)重新检查当前线程池的状态,如果当前线程池的状态是在运行中,是则移除之前加入的任务,并执行异常策略。默认的是AbortPolicy。否则执行步骤5.
(5)如果当前线程的数量是0,则表明当前处于shutDown状态,并且线程池中没有线程则运行,这时像工作线程中添加一个空的任务。
(6)继续像工作线程中添加任务,此时传入false表示创建的不是核心的线程。如果不成功,则执行异常策略。
3. 上述execute 方法中多次调用了addWorker方法,接下来我们看下addWorker方法,代码如下:
(1)采用循环+CAS操作来更新工作线程的数量,首先获取当前的ctl值,然后获取其状态rs。
(2)检查当前的状态,当状态满足大于关闭状态或者处于关闭状态,当前任务为空以下条件则直接返回:
(3)获取当前的工作线程的数量,如果工作的线程数量大于CAPACITY或创建核心线程大于配置的核心线程数或创建最大的线程数大于配置的最大线程数量则直接返回false.
(4)使用CAS操作对工作线程数量+1,如果新增成功则跳出循环,进入到下图2;否则继续执行
(5)获取当前的ctl值,判断ctl和之前开始取的值是否一致,这里主要是为了防止在做新增线程数量时,ctl的内存存储的值已发生了改变,这样做可以规避,让其重新获取内存值。
addWorker图1--增加工作线程数量图(6)将当前的任务封装为一个Work,Work的主要包含firstTask和thread变量。firstWork即是我们提交的任务。
(7)判断线程是否为空,若不为null,则进行加锁,这里是为了防止多个任务提交时,出现竞争导致works的大小不准确。
(8)获取线程池状态,若是处于运行中或是处于关闭状态,判断工作线程是否还存活,若存活,则抛出异常。
(9)向集合中添加该Worker,并判断当前线程的大小是否大于线程池中出现最多线程数量,如果多余则替换,之后将启动Worker的标志设为true。
(10)最后在finally方法中判断启动worker的标志是否为true,若为true则启动,并且释放掉锁。
至此向线程池提交任务的源码基本就完成了,当然如果启动Worker的标志为false,就会执行addWorkfailed方法,将线程数量-1,并且执行异常策略。
addWorker图2--启动工作线程4. 我们发现上述一直会判断是否处于关闭状态,并且对关闭状态做了一些处理,那么线程池是如何实现关闭的呢?
(1)shutDown(): 关闭线程池状态,线程池不再接受任务, 并且尝试中断掉那些不处于运行中的任务。
shutDown方法图 interruptIdleWorker方法图(2)shutDownNow(): 关闭线程池状态,线程池不再接受任务,并且尝试中断掉存在的任务。与shutDown不同的是,它不管线程是不是处于运行中,都会去进行中断。shutDownNow()方法只是调用处理线程的方法不一样,如下:不会获取到Work对象的锁,而是如果该线程未中断,就尝试中断。
shutdowNow方法调用的interruptWorkers图Executors工具类:
1. newFixedThreadPool(int core):创建固定大小的线程池,其内部实现调用了ThreadPoolExecutor(core, core, 0L, TimeUnit.MILLSECONDS,new LinkedBlockingQueue)的方法。即核心线程数和最大线程数想等,队列使用无界队列,这样子线程池的核心线程不会超过最大的线程,等待的线程都在工作队列上等待。若不调用shutDown()或shutDownNow()方法,则不会执行异常策略。
2.newSingleThreadPollExecutor(): 创建一个只有一个工作线程的线程池,其内部实际调用了ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLSECONDS,new LinkedBlockingQueue)。这个和固定线程池非常相似,固定线程池核心线程由传入的参数决定,而单线程池则默认核心线程数为1。使用无界队列作为工作队列,其效果和固定线程池一样。
3. newCachedThreadPool: 创建一个最大线程数为int类型最大值的线程池,内部调用了ThreadPoolExecutor(0, Interger.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchroncousQueue)方法。没来一个任务则创建一个工作线程来进行运行,但是空闲线程的时间设置为60S ,如果空闲的线程超过60s,则会关闭该线程。该线程池采用了SynchroncousQueue作为工作队列,队列本身不存储任务。
4.newScheduledThreadPool(int nThreads): 创建一个线程数量为nThreads的线程池。可以延迟、周期线的执行任务。
什么情况使用什么队列?
(1) 速度快,任务小,newCachedThreadPool没有错
(2)任务重,消耗大,newFixedThreadPool顶呱呱
(3)顺序执行用newSingleThreadPollExecutor
(4)想延迟,有周期,那你就得用newScheduledThreadPool。
总结:
线程池在实际的开发中起着至关重要的作用,本文也只是对其中的一些简单的知识点和代码实现做了一个讲述,想代码中使用到的锁和AbstractSynchronousQueue(队列同步器)都没有做做介绍,之后写下锁和队列同步器的相关知识。 最后,谢谢各位大佬的阅读!
网友评论