前言
最近团队里面的小同学搞出了一个线上故障。在一个离线给商家发送消息的场景,由于定时任务程序Bug导致部分商家没有收到相应的消息。
由于定时任务问题导致线上故障,在以往的工作中也是常见的。在此,我结合多年的经验,对于定时任务设计关键点做了简单的总结。
关键点一:要分批处理数据,避免出现性能问题
定时任务往往用于离线数据处理。基本处理流程分为三个阶段:获取待处理数据、逐条数据处理、保存处理结果。代码示例如下:
// 获取待处理数据。SQL执行语句:
// select * from record where is_sended=false;
List<Item> list = RecordDao.query();
// 逐条数据处理
for (Item item : list) {
}
// 保存处理结果
请思考一下,上面的代码是否有问题?在业务发展初期,需要处理数据量级比较小,定时任务能正常运行。但是,随着业务发展,待处理的数据会变的很大。这时候,待处理数据阶段的SQL语句执行就非常耗时,甚至会执行超时。上面我提到的消息没发送成功的Case,就是这个原因导致的。
解决的方法也比较简单,核心思路是分批获取数据。代码示例如下:
bool isFinished = false;
int lastId = 0;
while (isFinished) {
// 获取待处理数据。SQL执行语句:
// select * from record where is_sended=false and id>lastId order by id asc limit 10;
List<Item> list = RecordDao.query(lastId);
// 获取的数据为空,则代表要处理的数据已经处理完毕
if (list.isEmpty()) {
isFinished = true;
}
// 逐条数据处理
for (Item item : list) {
// 记录最后一条处理的数据 id
lastId = item.getId();
}
}
// 保存处理结果
通过上面的代码优化,无论待处理的数据量级有多大,整体性能不会下降很明显。
关键点二:要有容错机制,避免出现异常终断
一次定时任务的执行往往需要处理多条数据,而且数据之间可能没有关联关系。如果没有控制好异常,将会导致一条数据处理异常,导致整个定时任务执行失败。代码示例如下:
bool isFinished = false;
int lastId = 0;
while (isFinished) {
List<Item> list = RecordDao.query(lastId);
if (list.isEmpty()) {
isFinished = true;
}
for (Item item : list) {
lastId = item.getId();
// 处理单条数据
deal(item);
}
}
// 保存结果
如果 deal 方法执行过程中抛出了一个异常,将会导致定时任务终止,后续数据无法处理。这个时候,我们需要增加 try catch,避免一个老鼠坏了一锅粥。代码示例如下:
bool isFinished = false;
int lastId = 0;
while (isFinished) {
List<Item> list = RecordDao.query(lastId);
if (list.isEmpty()) {
isFinished = true;
}
for (Item item : list) {
lastId = item.getId();
// 处理单条数据
try {
deal(item);
} catch (Exception e) {
// 记录错误日志
}
}
}
// 保存结果
关键点三:要有完善的日志,方便跟踪定位
当遇到现实问题时,我们往往需要通过日志来定位分析根因。比如,要包含整体处理进度、处理成功失败、执行耗时情况等。代码示例如下:
bool isFinished = false;
int lastId = 0;
int total = 0; // 整体处理条数
int success = 0; // 成功条数
int fail = 0; // 失败条数
long start = System.currentTimeMillis(); // 耗时
while (isFinished) {
List<Item> list = RecordDao.query(lastId);
if (list.isEmpty()) {
isFinished = true;
}
for (Item item : list) {
total++;
lastId = item.getId();
try {
deal(item);
success++;
} catch (Exception e) {
fail++;
// 记录错误日志
log.error("信息处理异常,信息id{}", item.getId(), e);
}
}
}
// 保存结果
// 整体处理结果日志
long end = System.currentTimeMillis();
log.error("整体处理{}条, 成功{}条, 失败{}条,耗时{}", total, success, fail, end-start);
关键点四:要有降级能力,方便应急止损
所谓的降级能力,就是能随时终止任务执行的能力。比如,我们发现定时任务代码有bug,需要立即终止任务执行。如果是单机,你还可以快速登录机器,kill 掉执行定时任务的进程。但是,在一个大的集群中执行定时任务,查找到低在哪些机器正在执行任务,逐一登录上进行止损操作,就会比较好使。因此,我们就需要在代码中增加降级的能力。代码示例如下:
bool isFinished = false;
int lastId = 0;
int total = 0;
int success = 0;
int fail = 0;
long start = System.currentTimeMillis();
while (isFinished) {
List<Item> list = RecordDao.query(lastId);
if (list.isEmpty()) {
isFinished = true;
}
for (Item item : list) {
// 增加降级判断
if (isDegrade()) {
isFinished = true;
break;
}
total++;
lastId = item.getId();
try {
deal(item);
success++;
} catch (Exception e) {
fail++;
log.error("信息处理异常,信息id{}", item.getId(), e);
}
}
}
// 保存结果
long end = System.currentTimeMillis();
log.error("整体处理{}条, 成功{}条, 失败{}条,耗时{}", total, success, fail, end-start);
代码中的 isDegrade() 方法可以接受外部的降级指令。
关键点五:要具备幂等性,方便任务重复执行
具备幂等性是指多次执行定时任务,执行的结果依然是正确的。比如,上面给商家发消息的定时任务,不能重复执行多次,就多次给商家发消息。而是要做到无论执行多少次,都按预期的只给商家发一条信息。代码示例如下:
bool isFinished = false;
int lastId = 0;
int total = 0;
int success = 0;
int fail = 0;
long start = System.currentTimeMillis();
while (isFinished) {
List<Item> list = RecordDao.query(lastId);
if (list.isEmpty()) {
isFinished = true;
}
for (Item item : list) {
if (isDegrade()) {
isFinished = true;
break;
}
// 判断数据是否需要处理
if (isNeedProcess(item) == false) {
continue;
}
total++;
lastId = item.getId();
try {
deal(item);
success++;
} catch (Exception e) {
fail++;
log.error("信息处理异常,信息id{}", item.getId(), e);
}
}
}
// 保存结果
long end = System.currentTimeMillis();
log.error("整体处理{}条, 成功{}条, 失败{}条,耗时{}", total, success, fail, end-start);
当然,如果在获取数据阶段已经做了相关条件判断,并且每处理一条数据,就对相应数据进行了保存,isNeedProcess() 方法也就不需要了。
网友评论