足以让你软件稳定运行,躺着把钱赚了的细节!
去年我的小团伙,帮一家公司开发一个终端推送服务系统。在刚上线不久出现了稳定性问题,716台终端数据不推送(如果无法解决,将对该公司造成716*2000的经济损失),经过几轮测试,发现问题很有可能在于定时器部分,然后我们阅读大量文献,基本上定位到问题在于定时器遇到exception的关闭与挂起。
Timer的缺陷
Timer被设计成支持多个定时任务,通过源码发现它有一个任务队列用来存放这些定时任务,并且启动了一个线程来处理
通过这种单线程的方式实现,在存在多个定时任务的时候便会存在问题:若任务B执行时间过长,将导致任务A延迟了启动时间!
还存在另外一个问题,应该是属于设计的问题:若任务线程在执行队列中某个任务时,该任务抛出异常,将导致线程因跳出循环体而终止,即Timer停止了工作!
同样是举个栗子:
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " A: do task");
}
}, 0, 5*1000);
timer.schedule(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " B: sleep");
try {
Thread.sleep(20*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 10*1000, 5000);
timer.schedule(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " C: throw Exception");
throw new RuntimeException("test");
}
}, 30*1000, 5000);
}
通过以上程序发现:一开始,任务A能正常每隔5秒运行一次。在任务B启动后,由于任务B运行时间需要20秒,导致任务A要等到任务B执行完才能执行。更可怕的是,任务C启动后,抛了个异常,定时任务挂了!
不过这种单线程的实现也有优点:线程安全!
ScheduledThreadPoolExecutor简介
ScheduledThreadPoolExecutor可以说是Timer的多线程实现版本,连JDK官方都推荐使用ScheduledThreadPoolExecutor替代Timer。它是接口ScheduledExecutorService的子类
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以本质上说ScheduledThreadPoolExecutor还是一个线程池(可参考《Java线程池ThreadPoolExecutor简介》)。它也有coorPoolSize和workQueue,接受Runnable的子类作为任务。
特殊的地方在于它实现了自己的工作队列DelayedWorkQueue,该任务队列的作用是按照一定顺序对队列中的任务进行排序。比如,按照距离下次执行时间的长短的升序方式排列,让需要尽快执行的任务排在队首,“不那么着急”的任务排在队列后方,从而方便线程获取到“应该”被执行的任务。除此之外,ScheduledThreadPoolExecutor还在任务执行结束后,计算出下次执行的时间,重新放到工作队列中,等待下次调用。
上面通过一个程序说明了Timer存在的问题!这里我将Timer换成了用ScheduledThreadPoolExecutor来实现,注意TimerTask也是Runnable的子类。
public static void main(String[] args) {
int corePoolSize = 3;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(corePoolSize);
pool.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " A: do task");
}
}, 0 ,5, TimeUnit.SECONDS);
pool.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " B: sleep");
try {
Thread.sleep(20*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 10, 5, TimeUnit.SECONDS);
pool.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " C: throw Exception");
throw new RuntimeException("test");
}
}, 30, 5, TimeUnit.SECONDS);
}
由于有3个任务需要调度,因此我将corePoolSize设置为3。通过控制台打印可以看到这次任务A一直都在正常运行(任务时间间隔为5秒),并不受任务B的影响。任务C抛出异常后,虽然本身停止了调度,但没有影响到其他任务的调度。可以说ScheduledThreadPoolExecutor解决Timer存在的问题!
那要是将corePoolSize设置为1,变成单线程跑呢?结果当然是和Timer一样,任务B会导致任务A延迟执行,不过比较好的是任务C抛异常不会影响到其他任务的调度。
可以说ScheduledThreadPoolExecutor适用于大部分场景,甚至就算timer提供的Date参数类型的开始时间也可以通过自己转的方式来实现。任务调度框架Quatz也是在ScheduledThreadPoolExecutor基础上实现的。
一般我们都使用单线程版的ScheduledThreadPoolExecutor居多,推荐通过以下方式来构建(构建后其线程数就不可更改)
ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
Timer异常后,任务就终止
如果Timer报错的时候还要继续执行任务,解决方法:在异常处理中加代码。
另一种解决办法:java.util.concurrent.ScheduledExecutorService;
ScheduledExecutorService异常后,任务会被挂起,解决方法:在异常处理中加代码。
public static ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(1);
/**
* 1分钟执行一次
*/
public static void runTimer() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
.....
} catch (Exception e) {
e.printStackTrace();
......
}
}
}, 0, 60, TimeUnit.SECONDS);
}
RocketMQ源码分析:
public void start() {
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
if (timeDelay != null) {
this.timer.schedule(new DeliverScanJobTimerTask(level), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// log.info("scheduleAtFixedRate");
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 0, 2000);
}
public synchronized void persist() {
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("persist file [{}] exception", e);
}
}
public void shutdown() {
log.info("Shutdown");
this.timer.cancel();
}
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(32);
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.store.getBrokerConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
public Long getLevelDelayTime(int delayLevel) {
return delayLevelTable.get(delayLevel);
}
class DeliverScanJobTimerTask extends TimerTask {
private final int delayLevel;
public DeliverScanJobTimerTask(int delayLevel) {
this.delayLevel = delayLevel;
}
@Override
public void run() {
try {
//
TarminalManager tarminalManager = ScheduleScanJobService.this.store.getTarminalManager();
List<TarminalPO> tarminals = tarminalManager.scan(delayLevel);
for (TarminalPO tarminalPO : tarminals) {
this.executeOnTimeup(tarminalPO);
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleScanJobService.this.timer.schedule(new DeliverScanJobTimerTask(
this.delayLevel), DELAY_FOR_A_PERIOD);
} finally {
ScheduleScanJobService.this.timer.schedule(
new DeliverScanJobTimerTask(this.delayLevel), ScheduleScanJobService.this.getLevelDelayTime(delayLevel));
}
}
public void executeOnTimeup(final TarminalPO tarminalPO) {
ScheduleScanJobService.this.store.getSendMessageExecutor().submit(new Runnable() {
@Override
public void run() {
ScheduleScanJobService.log.info("Send Message Executor : {} => {} : {}", delayLevel, ScheduleScanJobService.this.getLevelDelayTime(delayLevel), tarminalPO.getCode());
TarminalSet tarminalSet = null;
byte[] data = null;
{
// TODO 通过接口获取消息和终端信息
}
{
//发送内容
String content = "这是一个特殊时刻,左手右手一个慢动…作!一起摇摆";
//屏参数设置
tarminalSet = new TarminalSet();
tarminalSet.setInfoModelNormal(5);
tarminalSet.setInfoSpeed(0x00);
tarminalSet.setPropertyWidth(256);
tarminalSet.setPropertyHeight(64);
tarminalSet.setInfoTimeStay(4);
//bytes
data = LedManager.getInstance().sendOntimeMessage(tarminalPO.getCode(), content, tarminalSet, null);
}
// 发送
SendFactory.factory(tarminalSet).doSend(tarminalPO.getCode(), data);
}
});
}
}
网友评论