JDK1.5中提供了Executor
接口,处于java.util.concurrent
包下,写这篇文章是因为我的Netty源码分析系列文章中需要用到该接口,因此打算以单篇文章的形式来学习一下,也算是对并发编程的深入学习吧
Executor
该接口提供了一种优雅的方式去
解耦
任务处理机制中的任务提交
和任务如何运行
(也包含线程的使用,调度)
重点是解耦,解耦,解耦,重要的事情说三遍,哈哈
使用场景
通常用于非显示的创建线程
,那么什么是显示的创建线程
?
new Thread(new RunnableTask()).start()
那么什么是非显示创建线程
呢?
// 1. 创建执行器实例
Executor executor = new MyExecutor();
// 2. 调用执行器的executor方法执行任务
executor.execute(new RunnableTask());
class MyExecutor implements Executor {
public void execute (Runnable r) {
new Thread(r).run();
}
}
大家比较一下两者有什么差别?很明显的可以看出来非显示创建线程的方式更优雅,我只要关心一点就是我
把任务交给了执行人(器)
,剩下的事情就是执行人(器)考虑的问题了,至于执行人会把我的任务安排一个人做
还是交给多个人做
,那是他的事情,具体的活怎么干我不关心,总结下来就是我们不用关心执行器内部是如何运行任务的细节
执行器调度方案
这里先打个比方,我们将
执行人(器)
比喻成一家工厂的调度人员
,把需要执行的任务
比作生产任务
,而将执行任务的线程
比作流水线
,而递交生产任务的是客户
当我们向调度人员递交生产任务时,调度人员有以下几种调度方案:
-
客户每有一个生产任务,我就开一条流水线去生产
-
客户的所有生产任务排队等待,只开一条流水线
-
流水线机器不能一直启动,需要每隔3小时开一次,以保证机器运行良好
-
客户递交了生产任务后,是让客户
一直在等待生产结果
还是立即给客户
一个取生产出来的产品的凭据(Future)
-
当有多个客户递交生产任务时,我如何保证先递交生产任务的客户先拿到产品?
-
如果接受客户生产任务的工厂是一家
皮包公司
呢?只接受生产任务,将客户的生产任务包装一层成自己公司的生产任务,并放入一个队列中,最后将这些包装好的生产任务再递交给真正生产客户产品的工厂去生产,这就是一个复合执行器
根据以上调度人员的调度方案不同,就会衍生出多种不同的调度器
下面根据源码文档给出一个简单的复合执行器示例
,该执行器会将客户提交的任务以序列化的方式再递交给第二个执行器
class MySerialExecutor impliments Executor {
// 一个用于存放客户递交的任务的队列
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
// 真实执行任务的执行器
final Executor executor;
// 激活任务(即客户提交的生产任务)
Runnable active;
// 构造函数
MySerialExecutor(Executor executor) {
this.executor = executor;
}
// 覆写接收客户任务方法,加入synchronized保证多线程下递交任务的安全
public synchronized void execute(final Runnable r) {
// 将任务重新包装一层,主要目的就是下面的finally,为了一直调度下一个任务
// 推进任务队列tasks中
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
// 第一个任务被提交后,会触发首次scheduleNext方法
if (active == null) {
scheduleNext();
}
}
// 同样以synchronized修饰,防止真实执行任务的executor内部是多线程的方式执行,引起线程安全问题
// 当任务队列中的任务不为空时,取出任务再交给第二个执行器执行
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
以上是官方文档给出的一个复合执行器的示例
,给出示例的目的是为了告诉大家执行器会有很多中不同的实现方式,也就会衍生出各种各样的执行器
ExecutorService
同样是
JDK1.5并发包
下提供的一个接口
,该接口提供了多个方法
去管理终止(所谓管理终止即执行器可以被关闭)
和跟踪一个或多个异步任务的进度(Future)
下面列举一下该接口中几个重要的方法
- /
一个ExecutorService能够被关闭(shut down)
,ExecutorService提供了两种不同的方式
去shutting down
,不管哪种方式,都会拒绝新的任务提交
,
- shutdown
void shutdown();
在terminating终止之前
,允许早前已提交的任务继续执行
- shutdownNow
List<Runnable> shutdownNow();
尝试去停止正在执行的任务
以及阻止等待执行任务的启动
,返回从未开始执行的任务列表
<T> Future<T> submit(Callable<T> task);
对基础提交任务方法execute的扩展,创建并返回一个Future对象
,通过该对象能够去取消任务执行
或等待任务的完成
- /
提供提交执行批量任务
的能立,注意该方法是同步的
- invokeAny
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
同步执行给定的批量任务,返回任意一个已成功完成的任务结果
(没有抛出异常的情况下)或异常返回
正常返回或异常返回,任务集合中其他未完成的任务都会被取消
注意一下如果在该方法执行期间,任务集合被修改了
会造成该方法返回结果为undefined
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
也提供了超时时间的重载方法,规定了该方法最大的超时等待时间
,避免出现无限阻塞等待的情况,注意超时的话会抛出TimeoutException
,(即给定的超时时间之内没有任何成功结果返回或其他异常返回)
- invokeAll
执行提交的任务列表,当所有任务
都是完成
状态(并不一定都是completed successfully
)才返回,返回值是一个Future集合
,每个Future对象
代表了每个任务执行的结果和状态
,注意Future集合的顺序和提交的任务列表的顺序是一致的
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
同理,invokeAll也提供了超时等待的重载方法,
如果该方法执行没有超时
,每个任务将会以完成状态的Future返回
,如果该方法执行超时
,那么这里不是抛出异常
,而是其中一些任务将没有完成
,仍然返回Future集合
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
ScheduledExecutorService
该接口继承了ExecutorService接口,自身扩展了能够让命令(任务)执行被
计划运行
所谓计划运行,就是增加了时间相关的能立,比如延迟多少时间后再执行
,亦或任务需要周期性执行
等,下面分析其源码,看其定义了哪些方法
- UML类图
- 方法
该方法用于创建和执行一个延迟指定时间
的一次执行
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
参数command
为需要被调度执行的命令(任务),delay
为从此刻开始需要被延迟执行的时间,init
则为时间的单位,返回值是一个 Future
,代表了任务等待完成的结果
,可以通过Future对象的get方法
获取任务执行结果或通过cancel方法
取消任务,若为null,则代表未完成
另外schedule方法还有一个重载的方法,参数command变为了Callable<V> 类型的callable,其余参数以及返回值不变,至于Runnable和Callable的区别,请移步我的另一篇文章你该知道的Runnable和Callable的区别
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
该方法创建和执行一个周期性
的行为,当到达执行的延迟时间参数initialDelay
后会执行第一次执行,之后会根据指定的period周期参数
进行后续周期性的调度执行,注意参数unit
时间单位是同时作用于initialDelay和period的,返回值同样是一个Future对象
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
该方法也是创建和执行一个周期性的任务,在initialDelay
后首次执行,之后的每一次执行终止和下一次执行开始之间
都会有delay参数
的延迟
简单描述一下scheduleWithFixedDelay
和scheduleAtFixedRate方法
的区别,这也是很多面试官喜欢问的点
二者区别的关键就在于你的任务执行本身需要花费的时间
上,下面用一张图来说明
二者的区别总结一下就是:
scheduleWithFixedDelay
方法和任务本身执行时间
相关,即下一次任务执行的开始时间
永远和上一次任务执行的结束时间之间
有一个delay的时间间隔
而scheduleAtFixedRate
方法于任务本身执行时间
无关,但是需要注意一点的是当任务本身执行时间超过了period参数
指定的时间,则会导致下一次的任务在上一次任务执行完后立即执行(这里不会并发执行任务)
AbstractExecutorService
一个抽象类,提供了
ExecutorService接口相应执行方法的默认实现
,包括submit,invokeAny,invokeAll等方法
- UML类图
- 抽象类设计原则
抽象类的作用就是抽象一些子类都有的公共步骤,再在公共步骤中调用其接口的抽象方法,即由其子类去实现的真正逻辑
设计模式中的模板方法模式
就是基于抽象类
去做的
- 默认实现方法
- \
ExecutorService接口中关于submit方法的定义是需要返回Future对象的,那么AbstractExecutorService方法实现的内部就是去new一个Future对象,再转而调用execute方法就好了,至于execute方法到底做了什么,那就由其子类去实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// new一个RunnableFuture对象
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 调用execute方法,具体实现有子类自己实现
execute(ftask);
return ftask;
}
这里看下newTaskFor方法做了什么
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
可以看到这里是自身定义的一个protected
方法,就是根据传入的命令(任务)和返回值value参数,new了一个FutureTask对象
,FutureTask是RunnableFuture接口
的实现类,而Runnable接口同时继承
了Runnable接口
和Future接口
这里不去细说,继续看AbstractExecutorService实现了哪些方法
- \
上面分析了invokeAny接口的定义,这里看下抽象类中时如何实现的
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
这里转而去调用了doInvokeAny方法,重点分析下这个方法
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
该方法首先进行参数校验(任务集合),然后根据提交任务集合数量,new了一个相同数量Future的集合
接着new了一个ExecutorCompletionService,该类就是提供了一个成功任务的队列,以及持有了真正执行execute方法的执行器引用,这里传入了this
下面就是做一些是否需要超时逻辑的判断,再for循环提交任务执行期间先开始了第一个任务的提交执行,后续的增量执行,for循环内部就是进行不断提交,以及是否已超时的判断,当获取到队列队首已完成的任务时,跳出循环并在finally中将其他任务强制取消掉
- \
而invokeAll方法
的实现则相对简单,就是将集合任务循环遍历
进行提交执行
,再利用一个for循环去获取每个任务的执行结果是否是done,如果不是就调用其get方法同步阻塞等待任务执行完成
(抛出异常也会返回)下面是源码部分
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
最后我们以concurrent包
下提供的线程池执行器ThreadPoolExecutor
为例,讲解如果我们自己实现一个简单的线程池执行器,应该思考哪些方面
ThreadPoolExecutor
线程池执行器,被用来解决两个不同的问题:
1:执行大量异步任务
的性能提升
2:提供了一种限制和管理资源
的方式,比如线程
,同时,也包含一些基础统计
,如已执行完成的任务数
等
要能够被广泛使用,该类提供了很多可调参数以及可扩展的钩子,然而程序员被要求使用更为便利的执行器工厂Executors
的方法们,如:
-
newCachedThreadPool(...)
不限制的线程池执行器 -
newFixedThreadPool(...)
固定线程数量的线程池执行器 -
newSingleThreadExecutor(...)
单线程数量的线程池执行器
以上工厂方法为大部分场景做了预配置,实际上内部都是调用了newThreadPoolExecutor(...)的构造函数,我们在使用的时候直接使用即可,另外如果需要手动配置和调优ThreadPoolExecutor,可以参考以下指导:
手动配置调优
- Core and maximum pool sizes
一个线程池执行器会自动调节
池中线程数量根据corePoolSize属性
和maximumPoolSize属性
,同时也提供了相应的get方法获取这两个属性值
当通过execute方法提交一个任务
的时候,如果当前
线程池中的线程数量少于corePoolSize的数量
,那么一个新的线程会被创建
去处理这个请求,即使此时有其他的工作线程是idle状态
如果此时的线程数量超过了corePoolSize但是少于maximumPoolSize的话,新线程只有在queue队列满的情况下才会被创建
通过设置相同大小的corePoolSize 和 maximumPoolSize,你可以创建一个固定大小的线程池,当然你可以将maximumPoolSize的值设置为Integer.MAX_VALUE
,以允许最大数量的并发任务
最典型的情况是,corePoolSize 和 maximumPoolSize参数通过构造函数设置,但是它们也可以被动态的改变
(通过相应的set函数
实现)
- 线程怎么创建?
线程被创建是使用一个ThreadFactory
,如果不特别指定,会使用
一个默认的线程工厂去创建线程,默认的创建线程具有相同的优先级,线程组,以及非守护线程状态
,一般建议的做法
是提供自定义的线程工厂
,从而修改线程名,线程组,优先级以及是否后台线程等属性,需要注意的是如果我们自定义的线程工厂的newThread方法返回null,会导致不能执行任何的任务
- 线程存活时间 keep-alive
如果当前池中有超过corePoolSize的线程数量的线程,如果他们的idle时间超过了keepAliveTime,那么这些线程会被终止
这样提供了一种方式去避免
了线程池没有被积极使用下的资源浪费
,如果后面线程池又被积极利用了,那么新的线程会被创建
keepAliveTime参数也是可以被动态改变的,通过set方法即可
同时这种超时终止策略也可以应用于corePoolSize线程
,通过方法allowCoreThreadTimeOut(true)
,可以实现当corePoolSize线程idle超时时也被终止掉
- 排队 Queuing?
一个阻塞队列被用于转移和持有已提交的任务,需要注意的是任务队列和poolSize线程的相互作用
-
如果
少于corePoolSize线程数量的线程在运行
,执行器
总是去新建一个线程
而不是任务排队(任务进入阻塞队列) -
如果线程数量
超过了corePoolSize单少于maximumPoolSize
,执行器
会将请求任务加入队列排队
而不是直接新建一个线程 -
如果
queue任务已满
,此时会创建新线程直到数量达到maximumPoolSize
,若queue仍满
的情况下,则提交的任务会被拒绝
那说到阻塞队列,又分为以下几种情况:
- 直接传递 Direct handoffs
对应Executors.newCachedThreadPool(...)
所谓直接传递就是不去持有任务,即采用一个SynchronousQueue,当没有可用线程来执行任务时,会返回错误
需要注意的是如果任务的处理速度小于任务的提交速度,那么池中的线程数会一直增加
官方给的策略是使用newCachedThreadPool
时需要将maximumPoolSizes参数
设置为Integer.MAX_VALUE
,这样能尽量避免任务提交失败
此种场景适用于任务需要被立即执行的,在实际开发中,可以根据业务需求设置maximumPoolSizes的值
- 无界队列 Unbounded queues
对应Executors.newSingleThreadExecutor(...)
或Executors.newFixedThreadPool(...)
LinkedBlockingQueue
,没有预定义的大小,如果所有的corePoolSize线程是忙的状态下,将导致新任务的提交会进入到队列中等待被执行,这将会导致永远不会创建超过corePoolSize数量的线程(因为队列不可能满啊)即maximumPoolSize参数无效
此种场景适用于任务之间彼此无关联,不需要被立即执行,比如web 页面服务请求处理
注意的是这种情况下如果任务提交速度高于任务处理速度,当导致无界队列大小无限制增加,即可能出现内存不足
- 有界队列 Bounded queues
有界队列的代表是ArrayBlockingQueue
,可以防止资源耗尽
此种方式很难做到调整和控制,为什么这样说呢?
队列大小和maximumPoolSizes会互相影响,比如大队列,小maximumPoolSize值
会造成人为的低吞吐量
,虽然对于CPU,OS资源,上下文切换方面会少很多
。
而如果使用小队列
,通常需要大的线程池数量
,从而保持CPU繁忙(较高的CPU利用率
)但是上下文切换如果过度,也会造成吞吐量的下降,处理速度降低
因此具体的业务场景需要进行不同的参数配置调整
- 拒绝任务方面 Rejected tasks
当Executor被关闭
,或当执行器同时使用了有界队列
和有界maximum threads
并且达到了饱和状态
时,新提交的任务
会被拒绝
即execute方法会调用RejectedExecutionHandler的rejectedExecution(Runnable, ThreadPoolExecutor)方法
提供了四种预定义的处理策略:
-
默认
处理器是AbortPolicy
,该处理器会抛出
一个运行时异常RejectedExecutionException
-
CallerRunsPolicy
处理器,该策略处理器会在执行器未关闭的情况下,使用调用者线程去执行该拒绝的任务
注意若执行器此时已关闭,那么任务会丢弃掉,什么也不做
-
DiscardPolicy
策略处理器,丢弃任务,不做任何事情 -
DiscardOldestPolicy
策略处理器,如果执行器未关闭,那么在队列头部的任务
会被丢弃
(即丢弃最老的任务),继续提交该任务
若执行器此时已关闭,则丢弃任务,什么也不做
- 钩子函数 Hook methods
该类提供了protected类型
的可覆写
的beforeExecute函数
和afterExecute函数
,terminated函数
等,这些方法会在任务被执行前和执行后,以及执行器终止时调用
可以利用这些函数来操作执行环境
,比如:
重新初始化threadlocal
、收集统计信息
或添加日志
条目,亦或者当执行器完全关闭后
做一些特殊的处理
注意钩子函数如果抛出异常,可能会导致内部工作线程直接失败并终止
- 队列维护 Queue maintenance
方法 getQueue()
允许访问工作队列用于监控
和调试
,注意强烈反对将此方法用于任何其他目的
另外方法remove(Runnable) 和 purge被用于当存在大量任务被取消时协助仓库回收
- 最后,当线程池在程序中没有引用指向以及没有剩余线程时将会被自动关闭
如果你想确保
在用户忘记关闭没有引用的线程池
的情况下也可以自动关闭的话,你需要安排那些未使用的线程最终被干掉,可以通过适当的设置实现:
设置合适的keep-alive times
以及使用下限为0的核心线程数
和设置allowCoreThreadTimeOut
为true
线程池执行器状态流转图
线程池执行器状态流转图.png源码分析
源码分析部分,我会以问答的方式来和大家一起学习,这样会更有针对性,效果也会更好
1:ThreadPoolExecutor如何保存自身状态值
?
基于以上状态流转,我们知道线程池执行器中包含了五种状态,分别是:RUNNING
,SHUTDOWN
,STOP
,TIDYING
以及TERMINATED
,那么一定就需要有字段标识这几种状态
ThreadPoolExecutor不是采用一个单独的字段标识自身状态的,而是采用了一个AtomicInteger类型
的字段ctl
,用这一个字段,不仅标识了自身状态
而且同时还表示了
线程池执行器自身的另一个重要属性workerCount
,下面我们结合源码看看它是怎么做到的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// integer类型占用的bit位,这里是32,即COUNT_BITS 为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 采用位运算的方式,将1的二进制表示法左移29位,左移后即为2的29次方,再减去1得到workerCount的最大值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// runState状态被存储在以下值的二进制表示法的高三位
// 11100000 00...00,RUNNING 状态的高三位比特为111
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000 00...00,SHUTDOWN 状态的高三位比特为000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00100000 00...00,STOP 状态的高三位比特为001
private static final int STOP = 1 << COUNT_BITS;
// 01000000 00...00,TIDYING 状态的高三位比特为010
private static final int TIDYING = 2 << COUNT_BITS;
// 01100000 00...00,TERMINATED 状态的高三位比特为011
private static final int TERMINATED = 3 << COUNT_BITS;
不熟悉Java位运算的同学可以参考清浅池塘博主的这篇文章
利用这样的机制,非常巧妙的将一个32位整型,同时表示了两个重要属性runState
(高三位比特)和 workerCount
(低29位比特)
同是也说明了我们得线程池最大得workerCount不能超过
229-1个 (大概5亿多),文档中表达了若将来这里出了issue,可以将
ctl的类型改为AtomicLong
下面看其提供了哪些方法来操作这个ctl属性的打包和拆包的
-
ctlOf(int runState, int workerCount)
打包ctl
该方法传入两个参数,采用Java 位运算中的位或运算符
,打包
成ctl的值返回
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 运算规则:两个数都转为二进制,然后从高位开始比较,两个数只要有一个为1则为1,否则就为0。
这里拿ctlOf(RUNNING, 0)
举例
参数 | 值 | 二进制表示法 |
---|---|---|
rs | -1 << 29 |
111 00000 00000000 00000000 00000000 |
wc | 0 |
000 00000 00000000 00000000 00000000 |
运算结果 | rs | wc |
111 00000 00000000 00000000 00000000 |
可以看到,当wc值为0时,这里的位或运算符结果就是参数rs的二进制表示法
runStateOf(int ctl)
该方法为拆包ctl方法
,即返回
ctl值中runState部分的值
,用到了Java位运算符中的位与运算符
和位非运算符
-
位与运算符运算规则:两个数都转为二进制,然后从高位开始比较,如果两个数都为1则为1,否则为0
-
位非运算符运算规则:如果位为0,结果是1,如果位为1,结果是0
private static int runStateOf(int c) { return c & ~CAPACITY; }
这里拿runStateOf(ctlOf(RUNNING, 0))
举例
参数 | 值 | 二进制表示法 |
---|---|---|
ctl | ctlOf(RUNNING, 0) |
111 00000 00000000 00000000 00000000 |
CAPACITY | (1 << 29) - 1 |
000 11111 11111111 11111111 11111111 |
~CAPACITY | ~((1 << 29) - 1) |
111 00000 00000000 00000000 00000000 |
运算结果 | c & ~CAPACITY |
111 00000 00000000 00000000 00000000 |
可见,通过该方法运算后就是为了取出ctl中高三位比特的值
,也就是runState属性
的值
workerCountOf(int ctl)
该方法为拆包ctl方法
,即返回
ctl值中workerCount部分的值
,用到了Java的位运算符中的位与运算符
private static int workerCountOf(int c) { return c & CAPACITY; }
这里拿workerCountOf(ctlOf(RUNNING, 0))
举例
参数 | 值 | 二进制表示法 |
---|---|---|
ctl | ctlOf(RUNNING, 0) |
111 00000 00000000 00000000 00000000 |
CAPACITY | (1 << 29) - 1 |
000 11111 11111111 11111111 11111111 |
运算结果 | c & CAPACITY |
000 00000 00000000 00000000 00000000 |
这里利用与运算提取出了ctl中低29位的比特,为0代表此时还没有任何的工作线程
简单总结一下提取比特位的规则:
想提取ctl哪个部位的比特数,就让ctl与(被提取部位比特位全是1且非提取部位的比特位全是0的二进制数,假设为A)做位与运算即可
- 增加workerCount数
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
就是利用CAS对ctl做自增
- 减少workerCount数
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
2:ThreadPoolExecutor任务保存在哪里
?
ThreadPoolExecutor利用一个final修饰
的BlockingQueue<Runnable> workQueue
来持有提交的任务和交给工作者线程,需要注意的是判断任务队列是否为空不能依靠workQueue.poll()返回是否为null来判断
,而必须
要通过workQueue.isEmpty()方法判断
private final BlockingQueue<Runnable> workQueue;
3:ThreadPoolExecutor工作线程保存在哪里
?
ThreadPoolExecutor使用了一个final修饰
的HashSet<Worker> workers
来保存工作者线程
private final HashSet<Worker> workers = new HashSet<Worker>();
4:ThreadPoolExecutor中的工作线程由谁来创建
?
ThreadPoolExecutor中有一个volatile修饰的ThreadFactory threadFactory属性,用来创建所有的工作线程
5:ThreadPoolExecutor中的还有哪些重要属性
?
属性列表
参数 | 类型 | 作用 |
---|---|---|
corePoolSize | volatile int | 核心池大小:即最小数量的存活工作线程(不允许核心线程超时的情况下),如果设置了允许核心线程超时,则最小数量的存活工作线程值会为0 |
maximumPoolSize | volatile int | 最大池大小:即池中工作线程数量的上限值,注意该值是内部有界的(229-1个) |
handler | volatile RejectedExecutionHandler | 拒绝执行处理器,即当池中任务饱和或shutdown时,会被调用 |
keepAliveTime | volatile long | 工作线程等待工作的超时时间,若allowCoreThreadTimeOut为true,则只要超过该时间还没有干活的线程会被干掉,否则,只有超过corePoolSize的线程会被干掉 |
allowCoreThreadTimeOut | volatile boolean | 默认false,即使核心工作线程超过了keepAliveTime时间也会处于存活状态,若设为true,则代表核心工作线程也会因为超过keepAliveTime时间没干活而被干掉 |
6:ThreadPoolExecutor如何实例化?
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
以上构造函数就是初始化各参数,实际开发过程中使用的最多的还是通过Executors这个执行器工厂类
提供的静态方法
实例化线程池执行器实例
比如:
- Executors.newFixedThreadPool(int nThreads)
该方法new一个固定大小线程数量
并且采用无上限数量LinkedBlockingQueue的任务队列
的线程池执行器,nThreads参数
同时指定了ThreadPoolExecutor的corePoolSize
和maximumPoolSize
值
- Executors.newSingleThreadExecutor()
该方法new一个单个工作线程
且无上限任务队列LinkedBlockingQueue
的执行器
7:ThreadPoolExecutor任务如何提交?
任务提交执行分两种,execute方式
和submit方式
二者的唯一区别是submit提交任务的方式可以通过后续的get方法拿到任务执行的结果,而execute方式提交的任务拿不到
其中submit方法定义在了其抽象父类AbstractExecutorService中,内部实际上还是调用子类ThreadPoolExecutor的execute方法,我们这里只分析execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 1.
int c = ctl.get();
// 2.
if (workerCountOf(c) < corePoolSize) {
// 2.1.
if (addWorker(command, true))
return;
c = ctl.get();
}
// 3.
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 4.
else if (!addWorker(command, false))
reject(command);
}
这里我们分4步去分析提交一个任务时,执行器到底做了什么
-
首先获取ctl属性的值,注意这里可能从获取到的那一刻起该值就已经被其他线程更改了,先不用管
-
调用拆包方法从ctl中获取到当前workerCount工作线程数量(可能不是最新的了),判断当前工作线程数是否小于核心线程数corePoolSize
2.1. 小于的情况下,调用addWorker方法增加核心工作线程数成功,直接return,execute方法执行完毕,若增加核心工作线程数失败(失败原因我们在addWorker方法源码时再分析),则重新获取最新的ctl值
- 如果当前执行器仍处于
RUNNING状态
下,就将此次提交的任务offer进任务队列中,若添加任务队列失败,则直接进入4. 步再去创建
非核心工作线程(不能超过maximumPoolSize数量的线程),若再次创建非工作线程失败,则执行拒绝任务提交逻辑
若RUNNING状态下添加任务到任务队列成功,则再次重新获取ctl值,赋给recheck变量,并再次检查执行器状态,如果已关闭,则移除任务,并执行拒绝任务提交逻辑,若状态正常,则检查workerCount数量是否为0,为0就增加工作线程,否则什么也不做
这里给出一张流程图来描述整个execute方法执行的过程
execute.png接下来我们分析addWorker方法
具体做了什么
- 方法定义描述
检查是否一个新的工作线程可以被创建和添加到当前执行器的线程池中
,(根据当前池的状态state和给定的corePoolSize和maximumPoolSize参数)
如果允许的话,则将参数firstTask作为其第一个任务直接运行
方法返回false的话,意味着当前执行器状态已停止或者符合关闭的条件,亦或者线程工厂创建线程失败(可能返回null或thread.start时抛出OutOfMemoryError异常)
- 参数
(Runnable firstTask, boolean core)
firstTask:表示该新线程应该首次执行的任务,也可为null
当首次任务通过方法execute提交时且此时池中工作线程数量小于corePoolSize时,工作者线程会绕过队列而直接被创建,或者当队列时满的时候,在这种情况下,我们必须绕过队列
最初,空闲线程通常是通过prestartCoreThread
创建的,或者用来替换其他垂死的工作线程
core:true代表是绑定到corePoolSize,false则为maximumPoolSize
-
返回值:true代表成功,false代表失败
-
源码
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 1. 获取当前ctl值 int c = ctl.get(); // 2. 根据当前ctl值,获取当前执行器运行状态 int rs = runStateOf(c); // Check if queue empty only if necessary. // 2.1. 若执行器运行状态为非RUNNING,则返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 3. 这里主要是判断当前工作线程数量是否大于等于最大线程容量,超过的话直接返回false // 没有超过则再根据core参数,决定是否可以新建线程 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 满足新增工作线程条件,则利用CAS更新当前工作线程数量+1,并跳出循环 // 若更新当前工作线程数量+1失败,说明有新的工作线程被新增并成功+1,这里就是单纯的继续循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 4. 程序执行到这里,说明当前工作线程数量+1已成功,接下来就是真实的新建工作线程以及启动线程的活了 // 利用两个变量workerStarted和workerAdded标识该工作线程已启动和已添加到工作线程集合中 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 4.1. 调用Worker构造函数新建一个工作者线程 w = new Worker(firstTask); // 4.2. 取得该工作者中线程引用 final Thread t = w.thread; if (t != null) { // 4.3. 这里需要将该工作线程加入到workers集合中,需要获取main锁 final ReentrantLock mainLock = this.mainLock; // 4.4. 需要获取main锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 4.5. 再次检查执行器运行状态 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 4.6. 加入工作线程集合,设置 workerAdded值为true workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 4.6. 释放main锁 mainLock.unlock(); } // 4.7. 工作线程加入workers集合成功,则启动该线程 if (workerAdded) { t.start(); // 注意这里如果启动线程抛出异常,则会导致workerStarted参数还是false,也就是会进入finally块的if语句内 workerStarted = true; } } } finally { // 如果该工作线程未启动完成,则调用新增工作线程失败方法,回滚整个工作线程的创建(包括workerCount值,workers集合,) if (! workerStarted) addWorkerFailed(w); } // 5. 返回工作线程是否启动标识 return workerStarted; }
新增工作线程失败方法
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 从workers集合移除该工作者
workers.remove(w);
// 减去工作者线程数量
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
8:ThreadPoolExecutor中的任务如何被执行的?
ThreadPoolExecutor内部有一个内部类Worker (工作者)其类定义如下
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 每个工作者持有一个线程
final Thread thread;
// 工作者被初始化时指定的任务,可能为null
Runnable firstTask;
// 每个工作者完成任务的数量
volatile long completedTasks;
// 其重要的覆写Runnable的run方法,调用了ThreadPoolExecutor类的runWorker方法
public void run() {
runWorker(this);
}
// ...
}
我们直接看runWorker方法的源码
final void runWorker(Worker w) {
// 1. 获取当前线程
Thread wt = Thread.currentThread();
// 1.1. 获取当前工作者手里的初始化任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 2. 当初始化任务不为空,则直接执行或初始化任务为空,则调用getTask()方法从workQueue中获取任务
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 2.1. 执行钩子函数 - 任务执行前
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 2.2. 任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 2.3. 执行钩子函数 - 任务执行后
afterExecute(task, thrown);
}
} finally {
// 2.4. 执行完毕,任务清除,增加该工作者完成任务数,以及释放锁
task = null;
w.completedTasks++;
w.unlock();
}
}
// 注意这里一旦while循环异常退出,则会跳过这里直接执行finally方法,
// 否则若由于while条件为false正常退出的话,会执行这里将completedAbruptly 设为false,即代表该工作者不是被打断完成的
completedAbruptly = false;
} finally {
// 3. 无论该工作者是如何完成的,只要退出while循环,就会执行这里处理工作者退出工作
processWorkerExit(w, completedAbruptly);
}
}
这里我们只看我标注了步骤好的几行代码,分析其主要逻辑即可,描绘一下runWorker的执行逻辑图
runWorker.png接下来我们看另一个重要的方法getTask
- 方法定义描述
执行阻塞获取任务或超时获取任务,以下任一情况会返回null(意味着这个工作者必须退出EXIT):
1:可能由于setMaximumPoolSize函数被调用导致的最大池线程变小了,即此时超线程了
2:执行器池已停止
3:执行器池shutdown且queue为空
4:工作者等待获取任务超时(当(allowCoreThreadTimeOut || workerCount > corePoolSize)
)
-
源码
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // 死循环,每次循环都获取当前ctl int c = ctl.get(); // 1. 拆包获取当前池运行状态 int rs = runStateOf(c); // Check if queue empty only if necessary. // 2. 池已停止(>= STOP)或SHUTDOWN下workQueue为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 2.1. CAS循环减少当前工作者数量,并返回null,强制调用线程退出 decrementWorkerCount(); return null; } // 3. 获取当前工作者数量 int wc = workerCountOf(c); // Are workers subject to culling? // 4. 判断是否允许核心线程超时或当前工作者数量已超设定的核心数量,即是否满足被Exit的前置条件 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 5. 若当前工作者数量已大于最大数量(可能由于setMaximumPoolSize函数被调用,重新修改了MaximumPoolSize的值) // 或当工人超时获取任务失败即 6.2. 步满足 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 5.1. 满足条件就执行CAS减少当前工作者数量,并返回null,CAS失败则continue继续下一次循环 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 6. 重要的来了,如果前置超时条件满足,就调用阻塞队列的poll超时获取方法,否则直接阻塞获取直到可以获取到任务为止 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 6.1. 若获取到任务,直接返回任务,getTask方法结束 if (r != null) return r; // 6.2. 否则超时获取任务失败,将timedOut超时标识设为true,此时会在下一次的while循环时,导致5. 步条件满足,从而返回null,强制工人退出 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
还是用流程图描述getTask方法的流程
getTask.png注意一下这里如果当前线程超时获取任务失败,会在下一次的while循环内
满足条件返回null
哦
至此整个线程池执行器从初始化,到提交任务,再到任务执行流程我们都分析完了,最后把其shutdown方法分析一下,做到有始有终
9:ThreadPoolExecutor 的 shutdown和shutdownNow有什么区别,都做了哪些事?
- shutdown()
启动一个有序关闭,早先提交的任务仍会被执行,新提交的任务会被拒绝,如果已经关闭了,调用该方法不会有其他的额外影响。
此方法不会阻塞等待早先提交的任务执行完成
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 1. 校验是否有shutdown权限,主要看调用者是否有针对workers集合内每个工作者包含的线程是否有
// new RuntimePermission("modifyThread")权限
checkShutdownAccess();
// 2. 提升运行状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 3. 中断闲置的工作者
interruptIdleWorkers();
// 4. 提供的关闭钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 5. 尝试终止执行器
tryTerminate();
}
我们挑选出其中主要的几个步骤来细说
-
advanceRunState(int targetState)
提升执行器状态private void advanceRunState(int targetState) {
for (;;) {
// 1. 获取当前ctl值
int c = ctl.get();// 2. 这里先执行runStateAtLeast,若当前状态至少是给定的targetState了,直接break跳出循环返回,说明可能是二次执行shutdown方法 // 若runStateAtLeast返回false,则利用ctl的CAS特性,更新ctl包装值(更新了其中的高三位bit) if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; }
}
-
interruptIdleWorkers()
中断闲置的工作者private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
这里转而调用了重载的interruptIdleWorkers方法
,并传入了false
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
// 1.
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
该方法我们需要重点分析,不然会不理解什么是中断空闲线程
,也就无法理解整个shutdown的精髓的
方法开头获取mainLock的原因是因为要操作workers 这个HashSet集合,获取到mainLock后执行了for循环遍历集合内的所有工作者
接着获取工作者持有的线程,注意这里
if (!t.isInterrupted() && w.tryLock()) {...}
这里进入if内需要有两个条件同时满足
-
线程没有被中断
-
worker.tryLock()成功
重点是第二个获取worker锁成功,我们知道Worker内部类由于继承了AbstractQueuedSynchronizer
,本身就是一把锁
,既然这里当前发起shutdown的线程需要调用尝试获取worker锁,那么说明也有其他地方在调用,存在锁竞争关系
我们看下tryLock方法内部做了什么
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
这里就是利用CAS将0设置为1,并设置当前线程独占该锁,且该锁是不可重入的,那么为什么你shutdown方法的调用线程需要获取到每个工作者线程的锁呢?答案是允许工作线程中断
我们再次看下Worker类内的run方法
runWorker.png到这里是否已经明白了worker锁的用途呢?
总结下
interruptIdleWorkers()
方法内if条件内执行线程中断之前
需要先worker.tryLock()
获取worker锁,意味着以下情况的worker锁是获取不到的即从w.lock() ---> w.unlock()
,也就是正在运行的worker,因为worker锁是排他锁
这也就是为什么叫做中断空闲工作线程
,也即shutdown方法只能对那些空闲的worker线程(刚启动的或阻塞在getTask()方法上的线程)发送中断信号
- 被中断的工作者做了什么?
阻塞在getTask()获取任务的worker在被中断后,会抛出InterruptedException,不再阻塞获取任务
getTask.png捕获中断异常后,将继续循环到getTask()内while循环最开始的判断线程池状态的逻辑,当线程池是shutdown状态
,且workQueue.isEmpty
时,return null,进行worker线程退出逻辑
这也是为什么shutdown方法被称作温柔关闭
且为什么那些已提交的任务还可以被执行掉的原因,因为内部shutdown调用interruptIdleWorkers的那一刻,那些运行中的工作线程没有接收到中断信号,会一直从queue中获取任务执行,直到workQueue为空
-
tryTerminate()
方法
- 方法描述
转换执行器状态为TERMINATED
,如果满足以下任一一种情况,则转换成功
1:当前状态为SHUTDOWN且池和队列均为空
2:当前状态为STOP且池为空
-
源码
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 1.
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;// 2. if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // 3. final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 4. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS // 5. CAS失败,说明ctl有变化,再次循环尝试 }
}
下面分步骤描述此方法
- 如果以下3种情况任一为true,return,不进行终止操作
-
状态为RUNNING
-
状态是TIDYING、或 TERMINATED,说明已经终止过了,不进行再次终止
-
状态是SHUTDOWN 且 workQueue不为空
-
能执行到这里,说明只有shutdown状态 且 workQueue为空,或者 为stop状态,如果此时线程池还有线程(正在运行任务,正在等待任务)就
中断一个
正在等任务的空闲worker -
能执行到这里,说明符合Terminate的条件了(状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了)
-
CAS尝试更新ctl内高三位状态值为TIDYING,低29位workerCount数量置为0, 成功则执行钩子函数
terminated()
-
最终将状态更新为TERMINATED,并唤醒调用了方法
awaitTermination()
等待线程池终止的线程
到了这里其实还有一个疑问:
假设这些没有接收到中断信号的运行线程很多,当他们把workQueue中的任务吃完以后怎么办?他们可是会一直阻塞在getTask方法上啊,这些工作线程是如何被释放的呢?
Doug Lea大神巧妙的在所有可能导致线程池产终止的地方安插了tryTerminated()尝试线程池终止的逻辑,该方法也在processWorkerExit内部被调用了,即如果有多个核心线程没有收到中断信号,那就先在tryTerminated内interruptIdleWorkers(ONLY_ONE);先中断一个,再利用processWorkerExit方法循环调用,直到所有的核心线程都被Exit掉,最终会执行上面的步骤5,结束整个shutdown方法
下面用一张图来描述整个shutdown内部执行流程
shutdown.png而shutdownNow这里就不在分析了,二者主要区别有三点
1:shutdownNow直接CAS设置执行器状态为STOP
,而shutdown设为SHUTDOWN
2:shutdownNow中断所有工作线程
,包括正在运行任务的
3:shutdownNow会返回workQueue中未执行的任务集合
shutdownNow直接调用中断所有已启动的工作线程,需要注意的是,对于运行中的工作线程调用Thread.interrupt()并不能保证工作线程被终止,如果提交的任务的run方法内部捕获了InterruptException,没有上抛的话,即不会执行processWorkerExit方法
,就会导致该线程一直无法结束
今天母亲节,祝愿我的妈妈节日快乐,福寿安康!!!
也祝天下所有母亲节日快乐!!!
网友评论