背景
对于服务化的系统,一个请求涉及多个服务的调用是司空见惯的。如果是涉及到多个服务写的请求,如何保证多个服务写状态的一致性一直是服务化系统面对的一个技术难题。本文会介绍笔者之前参与设计的一种解决该问题的方案-异步任务框架。
框架思想
框架主要思想是异步+足够次数的重试+补偿。
异步:指请求处理流程异步化。多服务之间的调用都是通过,网络调用可能超时,由超时到终态(成功或失败)这个转换过程的时间并不能得到保证。而每次请求时长都是有限制的,同步的话请求时长不能得到保证了。
足够次数的重试:出现超时时,一般我们会设置重试次数以求得到确定的结果。其实这儿是有风险的,一般我们重试是针对网络超时的,还有一种是上游服务代码性能很差(也许超时时间不合理)或者系统资源不足导致服务端响应不及时,这个时候如果系统没有设置服务降级和隔离策略,重试会导致系统负载更高,最终导致雪崩。本框架假设业务开发人员已掌握相关知识。本框架的重试策略也是针对网络超时,并且乐观的认定重试足够次数(默认无限次)肯定可以得到一个终态。网络超时短时间内重试是没有意义的,所以重试间隔是指数级增长。
补偿:一个请求会涉及到多个服务的写操作。如果当前一步执行失败了,之前的操作就需要回滚。称之为补偿。
框架核心设计
架构图,略了,要现画,还要洗澡睡觉觉呢。
该框架也是以SDK的方式实现,需要业务方建一张辅助表,表结构参照下方AsyncTaskDTO的属性。接下来以下单请求为例,介绍一下本框架。
一个简单的下单流程如下:更新限购(1,1)-> 预占库存(1,2)->订单数据落地(1,3)->通知支付(1,4)。
本框架把这一个请求抽象成一个流程,一个流程的多个写步骤抽象成流程的一个任务,就如上面小括号标记的。每一步需要实现框架定义的任务处理器接口:
/**
* 异步任务处理器接口
*
* @author jacky
* @created 2017/5/9
*/
public interface IAsyncTaskProcessor {
/**
* 处理对应的任务, 也就是业务方这一步具体要做的事情,该方法要能保证幂等
*
* @param task
* @return
*/
public AsyncTaskRunResult process(AsyncTaskParam task);
/**
* 当前任务信息
*
* @return
*/
public TaskTaskDTO currentTaskInfo();
/**
* 获取运行的线程池, 返回空时使用默认的
*
* @return
*/
public default Executor getExecutor() {
return null;
}
}
process方法参数AsyncTaskParam:
// 关联的业务数据,建议只存储id
private String bizData = StringUtils.EMPTY;
// 流程id
private int flowId;
// 任务id
private int taskId;
// 可执行时间
private Date executeTime = new Date();
返回值AsyncTaskRunResult:
// 结果
private AsyncTaskRunResultEnum result;
// 失败或重试的中止原因
private String terminateReason;
// 后续步骤
private List<AsyncTaskDTO> nextTask;
AsyncTaskRunResultEnum:
SUC(1, "成功"), FAIL(2, "失败"), RETRY(3, "重试");
表结构参照AsyncTaskDTO:
// 任务表id
private long id;
// 业务参数,建议只存储id
private String bizData = StringUtils.EMPTY;
// 流程id
private int flowId;
// 任务id
private int taskId;
// 尝试次数
private int tryNum = 0;
// 执行时间
private Date executeTime = new Date();
// 状态
private int status = AsyncTaskStatusEnum.WAITING.getValue();
// 终止原因
private String terminateReason = StringUtils.EMPTY;
// 处理节点,目前是ip
private String node = StringUtils.EMPTY;
// 机器分组,目前无用
private String group = StringUtils.EMPTY;
// 扩展信息
private String extInfo = StringUtils.EMPTY;
// 添加时间
private Date addTime;
// 修改时间
private Date modTime;
AsyncTaskStatusEnum:
WAITING(1, "待处理"), PROCESSING(2, "处理中"), SUC(3, "处理成功"), FAIL(4, "处理失败");
当服务启动时,异步任务框架会扫描该服务中的业务任务处理器Bean。详细点就是通过Spring的ApplicationContext,拿到IAsyncTaskProcessor类型的所有Bean。然后借助currentTaskInfo方法,将bean缓存。外层key是flowId,内层key是taskId,value是这一步对应的processor。
/**
* 任务处理器映射
*/
private Map<Integer, Map<Integer, IAsyncTaskProcessor>> processorMap = Maps.newConcurrentMap();
任务的触发分为两种。一种是业务方主动调用。一种是定时扫描异步任务表执行。
业务方主动调用,框架根据业务方传入的flowId和taskId找到相应的bean执行其process方法。process方法的返回值要告诉框架两个信息:
-
这一步任务的执行结果。失败,成功还是重试。重试的话,定时任务1ms后会再次调起该处理器。
-
下一步要执行哪个流程的哪一个任务。
服务启动时也会扫描表里待执行和未执行的任务,触发相应的处理器。
execute核心代码如下:
/**
* 提交一个任务至线程池中执行
*
* @param taskId
*/
public void execute(long taskId) {
executorPool.execute(new Runnable() {
@Override
public void run() {
AsyncTaskDTO task = asyncTaskService.queryById(taskId);
if (null == task) {
return;
}
// 未到可执行时间
if (null != task.getExecuteTime() && new Date().before(task.getExecuteTime())) {
return;
}
// 查找任务对应的处理器bean
IAsyncTaskProcessor processor = queryTaskProcessor(task);
if (null == processor) {
LOG.error("任务查找不到对应的处理器,task id={}", taskId);
return;
}
// 优先使用执行器里指定的线程池
if (null == processor.getExecutor()) {
startAndRunTask(processor, task);
} else {
processor.getExecutor().execute(new Runnable() {
@Override
public void run() {
startAndRunTask(processor, task);
}
});
}
}
});
}
异步任务表的数据会定期删除。
回顾
可能有读者会问为啥不用现成的分布式事务框架呢?
根据当时的测试异步任务框架性能要比当时公司用自研的分布式任务框架性能要高,而且据说和业务代码耦合也很严重。
这个异步任务框架依然很粗糙,就实践来讲存在如下问题:
-
随着业务迭代,会存在processor爆炸的问题
-
每个任务和机器是绑定的。如果涉及到迁机器,相应的需要迁移数据。如果服务长时间宕机,绑定该服务节点的任务无法执行。
这个框架本可以做的更好,但是由于时间和精力问题一直没有迭代和维护。其实挺遗憾的。
网友评论