为在工作中用到的数据库排他锁select ... for update的使用来写一个总结。
我的需求背景简化来说,一个定时任务触发一个事件的产生(定时器和事件是一一对应的关系),一个事件可以有多个决策对应,多个决策在执行的时候不一定会都成功,所以得有一个决策记录来记录决策的执行结果,并且需要做幂等判断,防止事件在重试的过程中,绑定的多个决策中已经成功的决策再次执行,这个幂等判断条件我就用事件记录id和决策方案id。
因为事件由定时任务触发并且失败了会发起重试,这个重试也是借助定时器完成。定时任务的超时时间设置为比如说5分钟,当第一次定时任务触发事件后,决策执行发生异常(失败都用异常来传递),事件记录的状态不更新。定时器再次触发时,因为下一次触发时间没有更新,则会再次触发事件执行决策,当决策全部成功后,则更新事件状态为成功,并更新该定时器的下次触发时间;如果决策一直未成功,则事件的状态一直未更新,那么当定时器再次触发时,就会发现此时这个事件的触发已经超时,这时候则报警提示事件处理超时,修改定时器的下次触发事件,并且进行一些后续处理,比如说将该事件的状态置为超时。
在这个过程中, 事件记录是非常重要的,为了防止因网络或者其他问题而带来的并发问题,我们需要对事件记录加锁,这里使用了排他锁select ... for update。
//事件开始执行
public void starEventProcess(final EventScheme eventScheme, final String uniqueCode, final Map<String, Object> dataMap) {
transactionTemplateRequiresNew.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
EventLogDO eventLogDO = lockEventLog(uniqueCode, eventScheme);
if (eventLogDO == null) {
throw BaseException.valueOf(ExceptionEnum.UNKNOWN_EXCEPTION,
"锁定eventLog失败,eventSchemeId={},uniqueCode={}", eventScheme.getId(), uniqueCode);
}
//幂等判断,如果状态以为SUCCESS,则无须再执行,
if (EventLogStatusEnum.SUCCESS.getCode().equals(eventLogDO.getEventLogStatus())) {
return;
}
//调用决策
EventInfo eventInfo = new EventInfo(eventScheme.getTntInstId(), eventScheme.getId(), eventLogDO.getId());
decisionService.triggerDecisionSchemeByEvent(eventInfo);
//更新触发记录状态
eventLogDO.setEventLogStatus(EventLogStatusEnum.SUCCESS.getCode());
eventLogRepository.updateLogStatus(eventLogDO);
}
});
}
/**
* 锁定一条触发记录
*
* @param uniqueCode
* @param eventScheme
* @return
*/
private EventLogDO lockEventLog(final String uniqueCode, final EventScheme eventScheme) {
//锁定记录,select ... for update
EventLogDO eventLog = eventLogRepository.queryByEventAndUniqueForUpdate(uniqueCode, eventScheme.getId());
if (eventLog != null) {
return eventLog;
}
//尝试创建记录
transactionTemplateRequiresNew.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
EventLogDO eventLogDO = new EventLogDO();
eventLogDO.setEventLogStatus(EventLogStatusEnum.INIT.getCode());
//防止并发创建,主键id采用自增方式
try{
long eventLogId = eventLogRepository.insert(eventLogDO);
eventLogDO.setId(eventLogId);
}catch (DataIntegrityViolationException e){
//插入异常时,判断记录是否存在。如果记录已经存在说明并发冲突。
EventLogDO eventLog = eventLogRepository.queryByEventAndUnique(uniqueCode, eventScheme.getId());
if (eventLog == null) {
throw BaseException.valueOf(ExceptionEnum.DB_EXCEPTION,
"TriggerService.process 异常,获取事件记录失败,参数信息:triggerId={},dateTime={}", eventScheme.getId(), uniqueCode);
}
}
}
});
eventLog = eventLogRepository.queryByEventAndUniqueForUpdate(uniqueCode, eventScheme.getId());
return eventLog;
}
这段处理逻辑以及排他锁的使用方法还是比较清晰的。select... for update是在事务中才会生效,并且排他锁是其他事物的读写均不可执行。
做一个实验:
在事务中对同一条数据进行select ... for udpate操作
事务A.PNG
第一个事务完美查询,此时不提交commit,开启另一个事务,发现读取不到并且会提示超时。
事务B.PNG
如果此时改成 select * from t for update nowait,那么不等待行锁释放,直接提示锁冲突,不返回结果。
如果在联调与测试的情况需要验证加锁是否正确,可以在debug的模式下进行。
网友评论