灵魂疑问:服务下线时,Spring线程池会调用shutdown()还是shutdownNow()去关闭线程池?
1. 线程池的关闭策略
- JDK原始的线程池在服务下线的时候,不会调用shutdown()或者shutdownNow()相关API来销毁线程池;
- Spring的线程池在服务下线的时候,会调用
org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#destroy
方法去调用shutdown()或者shutdownNow()方法关闭线程池。
@Bean("asyncServiceExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
threadPoolTaskExecutor.setCorePoolSize(1);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(1);
//配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
//服务关闭时,线程池的终止策略。
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setAwaitTerminationSeconds(10);
//配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("async-service-");
return threadPoolTaskExecutor;
}
Spring线程池提供了setWaitForTasksToCompleteOnShutdown
和setAwaitTerminationSeconds
配置,来决定服务下线时,是否要调用shutdown方法,以及等待shutdown方法的进行终止的事件。默认情况下:Spring提供的线程池采用的是shutdownNow()的策略来关闭线程池。
2. 提供一种优雅的服务关闭机制
2.1 Spring提供的服务关闭机制
Spring提供了org.springframework.beans.factory.DisposableBean#destroy
方法,可以在容器销毁的时候进行回调。
本质上是调用的org.springframework.context.support.AbstractApplicationContext#registerShutdownHook
方法来进行注册事件,以便在JVM关闭时来进行回调shutdownHook
方法。
public void registerShutdownHook() {
if (this.shutdownHook == null) {
// No shutdown hook registered yet.
this.shutdownHook = new Thread() {
@Override
public void run() {
synchronized (startupShutdownMonitor) {
doClose();
}
}
};
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
}
2.2 ShutdownHook实现服务关闭回调
JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,允许用户注册一个JVM关闭的钩子。这个钩子可以在以下几种场景被调用:
- 程序正常退出;
- 使用System.exit();
- 终端使用Ctrl+C触发的终端;
- 系统关闭;
- 使用kill pid命令干掉进程;
一般地发布系统会通过kill命令来停止服务。这个时候服务可以接收到关闭信号并执行钩子程序进行清理工作。
在使用ShutdownHook的时候,我们往往控制不了钩子的执行顺序。java.Runtime.addShutdownHook是对外公开的API接口。在前述场景里面,假若是独立注册钩子,在更复杂的项目里面是不是就没办法保证执行的顺序呢?曾在实际场景中遇到过这样的问题,从kafka队列消费消息,交给内部线程池去处理,我们自定义了线程池的拒绝策略为一直等待(为了保证消息确实处理),然后就会偶尔出现服务无法关闭的问题。原因正是线程池先被关闭,kafka队列却还在消费消息,导致消费线程一直在等待。
测试代码:
@Slf4j
@RestController
public class ExecutorController {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));
{
Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdownAndAwaitTermination(threadPoolExecutor, 1L, TimeUnit.HOURS)));
}
@RequestMapping(value = "/e/t2")
public String test2() {
for (int i = 0; i < 10; i++) {
CompletableFuture.runAsync(() -> {
sleepWithNoException(2000);
log.info("打印数据");
}, threadPoolTaskExecutor);
}
return "success";
}
private void sleepWithNoException(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注意引入guava的MoreExecutors
的shutdownAndAwaitTermination
方法来控制shutdown()方法执行的最大时间。
2.3 signal实现服务关闭回调
Java同时提供了signal信号机制,我们的服务也可以接收到关闭信号。
使用Signal机制有以下原因:
- ShutdownHook执行顺序无法保障,第三方组件也可能注册,导致业务自定义的退出流程依赖的资源会被提前关闭和清理;
- Signal是非公开API,第三方组件基本很少使用,我们可以在内部托管服务关闭的执行顺序;
- 在完成清理工作后可以执行exit调用,保证资源清理不会影响ShutdownHook的退出清理逻辑;
这里核心的原因还是希望能完全保证服务关闭的顺序,避免出现问题。
核心工具类代码:
@Slf4j
public class MyTermHelper {
private static final int SLEEP_BEFORE_EXIT = 5;
private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
private static AtomicBoolean stopping = new AtomicBoolean(false);
private static AtomicBoolean registeredHolder = new AtomicBoolean(false);
//维护一个双端队列
private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();
private static CountDownLatch stoppedLatch = new CountDownLatch(1);
private static AtomicBoolean sleepBeforeExit = new AtomicBoolean(false);
/**
* 注册term队列
*/
private static void registerIfNeeded() {
boolean previousRegistered = registeredHolder.getAndSet(true);
if (!previousRegistered) {
registerTermSignalIfNeeded();
}
}
/**
* 监听kill or kill -15的命令
*/
private static void registerTermSignalIfNeeded() {
//可以监听很多kill的事件。
Signal termSignal = new Signal("TERM");
//进一步获取原先的term signal handler,
Signal.handle(termSignal, signal -> {
boolean previous = signalTriggered.getAndSet(true);
log.info("调试....{}", termSignal.getName());
//已经注册了
if (previous) {
log.warn("Signal TERM has been triggered.");
return;
}
try {
//执行任务
doTermWithLog();
//系统以0作为信号量退出
System.exit(0);
} catch (Exception e) {
System.exit(1);
}
});
}
public static void doTermWithLog() {
Thread current = Thread.currentThread();
String threadName = current.getName();
current.setName(threadName + "(退出线程)");
try {
log.info("force term cleanup....");
doTerm();
log.info("TermHelper processed success");
} catch (Throwable e) {
log.info("TermHelper processed error", e);
throw e;
} finally {
current.setName(threadName);
}
}
/**
* 在并发状态下,强制的执行term的任务。
*/
public static void doTerm() {
boolean previousStopping = stopping.getAndSet(true);
if (previousStopping) {
log.warn("Term routine already running, wait until done!");
Uninterruptibles.awaitUninterruptibly(stoppedLatch);
return;
}
//串行执行双端队列维护的所有任务。
for (Runnable runnable : terms) {
try {
log.info("execute term runnable : " + runnable);
//服务被关闭时,强制执行等待的任务。
runnable.run();
} catch (Throwable e) {
log.error("fail to exec {}", runnable, e);
}
}
if (sleepBeforeExit.get()) {
Uninterruptibles.sleepUninterruptibly(SLEEP_BEFORE_EXIT, TimeUnit.SECONDS);
}
stoppedLatch.countDown();
}
public static void addTerm(Runnable runnable) {
registerIfNeeded();
terms.addLast(runnable);
}
public static void addFirstTerm(Runnable runnable) {
registerIfNeeded();
terms.addFirst(runnable);
}
/**
* 类似kafka producer,有buffer要被输出的。
* <p>
* 设置此变量,则会被阻塞5秒。
*/
public static void setSleepBeforeExit() {
registerIfNeeded();
sleepBeforeExit.set(true);
}
}
注意点Signal termSignal = new Signal("TERM");
这个是监听的kill or kill -15
的事件。
可以运行该命令查询到事件名:
$ kill -l
HUP INT QUIT ILL TRAP ABRT EMT FPE KILL BUS SEGV SYS PIPE ALRM TERM URG STOP TSTP CONT CHLD TTIN TTOU IO XCPU XFSZ VTALRM PROF WINCH INFO USR1 USR2
# 下面是常用的信号。
# 只有第9种信号(SIGKILL)才可以无条件终止进程,其他信号进程都有权利忽略。
HUP 1 终端挂断
INT 2 中断(同 Ctrl + C)
QUIT 3 退出(同 Ctrl + \)
KILL 9 强制终止
TERM 15 终止
CONT 18 继续(与STOP相反,fg/bg命令)
STOP 19 暂停(同 Ctrl + Z)
网友评论