一、背景
成熟的系统都会支持webhook回调,在本业务实体发生变更的时候,异步/同步触发回调订阅方。
具体的实现方案,首选http api接口,第二种方案是采用mq的方式,发布一个mq消息,由需要订阅该事件变更的服务去订阅消息。
本文主要是讲述前种方案。至于后者,本人不是很建议,它存在一定的技术依赖和网络要求。
之前我写过关于延时任务通知的文章,需要注意的是,本文是实时任务通知,并不实现延时的功能。
二、参考UI
image.png image.png image.png这里我省去画UI的功夫,参考了禅道的界面,希望能帮助理解webhook的设计。
三、目标
- 1、回调支持重试,指数级的重试策略
- 2、支持数据归档,减少当前回调任务的数据量
四、数模设计
image.pngCREATE TABLE `webhook_config` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`task_code` varchar(64) NOT NULL COMMENT '任务编号',
`callback_url` varchar(255) NOT NULL COMMENT '回调地址',
`created_date` datetime DEFAULT NULL COMMENT '创建时间',
`modified_date` datetime DEFAULT NULL COMMENT '更新时间',
`created_by` varchar(64) DEFAULT NULL COMMENT '创建人员',
`modified_by` varchar(64) DEFAULT NULL COMMENT '更新人员',
`remark` varchar(128) DEFAULT NULL COMMENT '备注',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='业务回调配置表';
CREATE TABLE `webhook_task` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`task_code` varchar(64) NOT NULL COMMENT '任务编号',
`notify_params` varchar(4000) DEFAULT NULL COMMENT '通知内容Json',
`notify_url` varchar(255) NOT NULL COMMENT '回调地址',
`finish_time` datetime DEFAULT NULL COMMENT '完成时间',
`is_finish` int(11) NOT NULL DEFAULT 0 COMMENT '是否完成',
`last_time` datetime DEFAULT NULL COMMENT '最后重试时间',
`next_time` datetime DEFAULT NULL COMMENT '下次尝试时间',
`retry_times` int(11) DEFAULT 0 COMMENT '重试次数',
`created_date` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `IDX_REF_TASK_nextTime` (`next_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='通知补偿任务表';
-- 表webhook_tasks_history和表webhook_tasks的设计一致
CREATE TABLE `webhook_tasks_history` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`task_code` varchar(64) NOT NULL COMMENT '任务编号',
`notify_params` varchar(4000) DEFAULT NULL COMMENT '通知内容Json',
`notify_url` varchar(255) NOT NULL COMMENT '回调地址',
`finish_time` datetime DEFAULT NULL COMMENT '完成时间',
`is_finish` int(11) NOT NULL DEFAULT 0 COMMENT '是否完成',
`last_time` datetime DEFAULT NULL COMMENT '最后重试时间',
`next_time` datetime DEFAULT NULL COMMENT '下次尝试时间',
`retry_times` int(11) DEFAULT 0 COMMENT '重试次数',
`created_date` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='通知补偿任务归档表';
五、其他设计
还有一种的设计思路是,首次回调如果是成功,就不写入到webhoook_task表,只有当回调失败时,才持久化到任务表进行重试处理。
六、源码实现
image.png6.1、核心类
import cn.hutool.http.ContentType;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.nio.charset.Charset;
import java.util.List;
/**
* 通知任务
*
* @author xxx
**/
@Slf4j
@Service
public class NotifyTasksService {
private NotifyTaskScheduler notifyTaskScheduler;
@Autowired
private NotifyConfigRepository notifyConfigRepository;
/**
* 连接超时
*/
private static final int connectionTimeout = 10000;
/**
* 读取超时
*/
private static final int readTimeout = 5000;
public void schedule(String taskCode, String notifyUrl, String remark) {
notifyTaskScheduler.schedule(taskCode, notifyUrl, remark);
}
public NotifyTasksService(NotifyTaskScheduler notifyTaskScheduler) {
this.notifyTaskScheduler = notifyTaskScheduler;
notifyTaskScheduler.registerHandler(Constants.TaskCode.NOTIFY_POINTS_ACCOUNT_FLOW, new NotifyTaskScheduler.NotifyTaskHandler() {
@Override
public boolean run(NotifyTasks task) {
boolean success = execute(Constants.TaskCode.NOTIFY_POINTS_ACCOUNT_FLOW, task.getNotifyUrl(), task.getNotifyParams(), true);
return success;
}
});
//TODO 注册其他任务
}
// 暂时没用上,适用于在接口中指定了回调url。
public void notify(String taskCode, String notifyUrl, String requestJson) {
if (StringUtils.isEmpty(taskCode)) {
return;
}
if (StringUtils.isNotEmpty(notifyUrl)) {
execute(taskCode, notifyUrl, requestJson, false);
}
this.notify(taskCode, requestJson);
}
public void notify(String taskCode, String requestJson) {
List<NotifyConfig> bizNotifyConfigs = notifyConfigRepository.findByTaskCode(taskCode);
if (CollectionUtils.isEmpty(bizNotifyConfigs)) {
return;
}
bizNotifyConfigs.forEach(bizNotifyConfig -> {
// 通知
String callbackUrl = bizNotifyConfig.getCallbackUrl();
execute(taskCode, callbackUrl, requestJson, false);
});
}
// 具体指定回调,这里使用的是hutool框架,会打印调用日志。
private boolean execute(String taskCode, String callbackUrl, String requestJson, boolean taskExecute) {
try {
HttpResponse httpResponse = HttpUtil.createPost(callbackUrl).body(requestJson, ContentType.build(ContentType.JSON.getValue(), Charset.defaultCharset()))
.setConnectionTimeout(connectionTimeout).setReadTimeout(readTimeout).execute();
int status = httpResponse.getStatus();
if (!httpResponse.isOk()) {
if (log.isWarnEnabled()) {
log.warn("执行回调URL:{}失败, \n 请求入参是:{}, \n 返回状态码是:{}, \n 返回体是:{} \n taskCode={}",
callbackUrl, requestJson, status, httpResponse.body(), taskCode);
}
this.addTask2Schedule(taskExecute, taskCode, callbackUrl, requestJson);
} else {
if (log.isInfoEnabled()) {
log.info("执行回调URL:{}成功, \n 请求入参是:{}, \n 返回状态码是:{}, \n 返回体是:{} \n taskCode={}",
callbackUrl, requestJson, status, httpResponse.body(), taskCode);
}
}
return httpResponse.isOk();
} catch (Exception e) {
log.error("执行回调URL出现异常, taskCode={}, 回调URL:{}, 请求入参是:{}", taskCode, callbackUrl, requestJson, e);
this.addTask2Schedule(taskExecute, taskCode, callbackUrl, requestJson);
}
return false;
}
private void addTask2Schedule(boolean taskExecute, String taskCode, String callbackUrl, String requestJson) {
if (!taskExecute) {
this.schedule(taskCode, callbackUrl, requestJson);
}
}
}
/**
* 通知任务代码
**/
public static class TaskCode {
/**
* 积分账户
**/
public final static String NOTIFY_POINTS_ACCOUNT_FLOW = "POINTS_ACCOUNT_FLOW";
}
6.2、NotifyTaskScheduler
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Description 通知任务调度
*
**/
@Component
public class NotifyTaskScheduler extends AbstractTaskScheduler<NotifyTasks> {
@Autowired
private NotifyTasksRepository notifyTasksRepository;
@Autowired
private NotifyTasksHistoryRepository notifyTasksHistoryRepository;
private Map<String, NotifyTaskHandler> taskHandlers = Maps.newConcurrentMap();
public NotifyTaskScheduler(PlatformTransactionManager platformTransactionManager) {
super(platformTransactionManager);
}
public void registerHandler(String type, NotifyTaskHandler handler) {
taskHandlers.put(type, handler);
}
public void schedule(String taskCode, String notifyUrl, String remark) {
NotifyTasks task = new NotifyTasks(taskCode, notifyUrl, remark);
notifyTasksRepository.save(task);
}
private NotifyTaskHandler getHandler(String type) {
return this.taskHandlers.get(type);
}
@Override
protected boolean runTask(NotifyTasks task) {
NotifyTaskHandler handler = getHandler(task.getTaskCode());
if (handler != null) {
return handler.run(task);
}
return false;
}
@Override
protected void onSuccess(NotifyTasks task) {
NotifyTaskHandler handler = getHandler(task.getTaskCode());
if (handler != null) {
handler.onSuccess(task);
}
}
@Override
protected Set<NotifyTasks> getTasks(Date date) {
Set<NotifyTasks> availableTasks = notifyTasksRepository.findTop10ByNextTimeBeforeOrderByNextTimeAsc(date);
return availableTasks;
}
@Override
protected void updateTasks(Set<NotifyTasks> tasks) {
notifyTasksRepository.saveAll(tasks);
}
@Override
protected void moveTaskToHistory(Set<NotifyTasks> tasks) {
notifyTasksRepository.deleteAll(tasks);
final Set<NotifyTasksHistory> historyOrderTasks = tasks.stream()
.map(t -> new NotifyTasksHistory(t))
.collect(Collectors.toSet());
notifyTasksHistoryRepository.saveAll(historyOrderTasks);
}
/**
* 任务处理
*/
public interface NotifyTaskHandler {
/**
* 运行任务
*
* @param task
* @return
*/
boolean run(NotifyTasks task);
/**
* 任务处理成功
* <p>
* 处于事务中,请勿进行复杂操作
*
* @param task
*/
default void onSuccess(NotifyTasks task) {
}
}
}
6.3、AbstractTaskScheduler
import cn.hutool.core.date.DateUtil;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.Date;
import java.util.Set;
/**
* 任务调度
*
*/
public abstract class AbstractTaskScheduler<T extends AbstractTask> extends AbstractTransaction {
private static final Logger logger = LoggerFactory.getLogger(AbstractTaskScheduler.class);
public AbstractTaskScheduler(PlatformTransactionManager platformTransactionManager) {
super(platformTransactionManager);
}
public void runSchedule() {
// 从数据库获取任务
runInTransaction(() -> {
Set<T> availableTasks = this.getTasks(DateUtil.date());
availableTasks.forEach(task -> {
task.retry();
});
if (!availableTasks.isEmpty()) {
this.updateTasks(availableTasks);
}
return availableTasks;
}).forEach(task -> {
try {
this.run(task);
} catch (Exception ex) {
if (logger.isWarnEnabled()) {
logger.warn("执行任务失败: " + task, ex);
}
}
});
}
private void run(T task) {
if (logger.isInfoEnabled()) {
logger.info("正在执行任务: " + task);
}
boolean success = this.runTask(task);
if (success) {
if (logger.isInfoEnabled()) {
logger.info("执行任务成功,回调: " + task);
}
this.onSuccess(task);
// 移到历史任务表,这里使用到了自定义事务
runInTransaction(() -> {
if (logger.isInfoEnabled()) {
logger.info("执行任务成功,更新任务状态: " + task);
}
task.finish();
Set<T> tasks = Sets.newHashSet(task);
this.moveTaskToHistory(tasks);
return 0;
});
} else {
if (logger.isInfoEnabled()) {
logger.info("执行任务失败: " + task);
}
}
}
/**
* 获取需要执行任务
* <p>
* <pre>需要加分布式锁,或者数据库锁</pre>
* <pre>已处于事务中</pre>
*
* @param date
* @return
*/
protected abstract Set<T> getTasks(Date date);
/**
* 更新任务状态
* <p>
* <pre>已处于事务中</pre>
*
* @param tasks
*/
protected abstract void updateTasks(Set<T> tasks);
/**
* 移到历史任务表
*
* @param tasks
*/
protected abstract void moveTaskToHistory(Set<T> tasks);
/**
* 执行任务
*
* @param task
* @return
*/
protected abstract boolean runTask(T task);
/**
* 任务执行成功后处理
*
* @param task
*/
protected void onSuccess(T task) {
/* NOT-IMPL */
}
}
6.4、AbstractTask
import javax.persistence.Column;
import javax.persistence.MappedSuperclass;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Date;
/**
* 定时处理任务
*
*/
@MappedSuperclass
public abstract class AbstractTask {
@Column(name = "next_time", columnDefinition = "DATETIME default NOW() COMMENT '下次尝试时间'")
private Date nextTime;
@Column(name = "retry_times", columnDefinition = "INT default 0 COMMENT '重试次数'")
private int retryTimes;
@Column(name = "last_time", columnDefinition = "DATETIME COMMENT '最后重试时间'")
private Date lastTime;
@Column(name = "is_finish", nullable = false, columnDefinition = "INT default 0 COMMENT '是否完成'")
private boolean isFinish;
@Column(name = "finish_time", columnDefinition = "DATETIME COMMENT '完成时间'")
private Date finishTime;
public AbstractTask() {
this(new Date(), 0, null, false, null);
}
public AbstractTask(AbstractTask task) {
this(task.nextTime, task.retryTimes, task.lastTime, task.isFinish, task.finishTime);
}
public AbstractTask(Date nextTime, int retryTimes, Date lastTime, boolean isFinish, Date finishTime) {
this.nextTime = nextTime;
this.retryTimes = retryTimes;
this.lastTime = lastTime;
this.isFinish = isFinish;
this.finishTime = finishTime;
}
/**
* 指数时间退避重试
*
* @return
*/
public void retry() {
this.retryTimes += 1;
this.nextTime = calcNextRetryTime();
this.lastTime = new Date();
}
// 计算下一次重试时间
private Date calcNextRetryTime() {
final ZoneId zone = ZoneId.systemDefault();
final LocalDateTime lastTime = LocalDateTime.ofInstant(Instant.now(), zone);
final LocalDateTime nextTime = lastTime.plus(exp(this.retryTimes), ChronoUnit.MILLIS);
final Instant instant = nextTime.atZone(zone).toInstant();
return Date.from(instant);
}
// 指数级重试
public static long exp(int retryCount) {
long waitTime = ((long) Math.pow(2, retryCount) * 1000L);
return waitTime;
}
public void finish() {
this.isFinish = true;
this.finishTime = new Date();
}
}
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import rx.functions.Func0;
/**
*
**/
public abstract class AbstractTransaction {
private PlatformTransactionManager platformTransactionManager;
public AbstractTransaction(PlatformTransactionManager platformTransactionManager) {
this.platformTransactionManager = platformTransactionManager;
}
public <R> R runInTransaction(Func0<R> func0) {
R ret = null;
TransactionDefinition definition = new DefaultTransactionDefinition(
TransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = platformTransactionManager.getTransaction(definition);
try {
ret = func0.call();
platformTransactionManager.commit(status);
} catch ( Exception e ) {
platformTransactionManager.rollback(status);
}
return ret;
}
}
6.5、xxl-job定时任务
image.png用于补偿回调失败的任务
private final NotifyTaskScheduler notifyTaskScheduler;
/**
* 回调通知
*
* @param param
* @return
* @throws Exception
*/
@XxlJob("notifyTasksHandler")
public ReturnT<String> notifyTasksHandler(String param) throws Exception {
notifyTaskScheduler.runSchedule();
return ReturnT.SUCCESS;
}
网友评论