美文网首页
微服务中,实现自定义的webhook

微服务中,实现自定义的webhook

作者: 天草二十六_简村人 | 来源:发表于2023-01-08 14:38 被阅读0次

    一、背景

    成熟的系统都会支持webhook回调,在本业务实体发生变更的时候,异步/同步触发回调订阅方。

    具体的实现方案,首选http api接口,第二种方案是采用mq的方式,发布一个mq消息,由需要订阅该事件变更的服务去订阅消息。

    本文主要是讲述前种方案。至于后者,本人不是很建议,它存在一定的技术依赖和网络要求。

    之前我写过关于延时任务通知的文章,需要注意的是,本文是实时任务通知,并不实现延时的功能。

    二、参考UI

    这里我省去画UI的功夫,参考了禅道的界面,希望能帮助理解webhook的设计。

    image.png image.png image.png

    三、目标

    • 1、回调支持重试,指数级的重试策略
    • 2、支持数据归档,减少当前回调任务的数据量

    四、数模设计

    image.png
    CREATE TABLE  `webhook_config` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
      `task_code` varchar(64) NOT NULL COMMENT '任务编号',
      `callback_url` varchar(255) NOT NULL COMMENT '回调地址',
      `created_date` datetime DEFAULT NULL COMMENT '创建时间',
      `modified_date` datetime DEFAULT NULL COMMENT '更新时间',
      `created_by` varchar(64) DEFAULT NULL COMMENT '创建人员',
      `modified_by` varchar(64) DEFAULT NULL COMMENT '更新人员',
      `remark` varchar(128) DEFAULT NULL COMMENT '备注',
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='业务回调配置表';
     
     
    CREATE TABLE `webhook_task` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
      `task_code` varchar(64) NOT NULL COMMENT '任务编号',
      `notify_params` varchar(4000) DEFAULT NULL COMMENT '通知内容Json',
      `notify_url` varchar(255) NOT NULL COMMENT '回调地址',
        
      `finish_time` datetime DEFAULT NULL COMMENT '完成时间',
      `is_finish` int(11) NOT NULL DEFAULT 0 COMMENT '是否完成',
      `last_time` datetime DEFAULT NULL COMMENT '最后重试时间',
      `next_time` datetime DEFAULT NULL COMMENT '下次尝试时间',
      `retry_times` int(11) DEFAULT 0 COMMENT '重试次数',
       
      `created_date` datetime DEFAULT NULL COMMENT '创建时间',
       
      PRIMARY KEY (`id`) USING BTREE,
      KEY `IDX_REF_TASK_nextTime` (`next_time`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='通知补偿任务表';
     
     -- 表webhook_tasks_history和表webhook_tasks的设计一致
    CREATE TABLE `webhook_tasks_history` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
      `task_code` varchar(64) NOT NULL COMMENT '任务编号',
      `notify_params` varchar(4000) DEFAULT NULL COMMENT '通知内容Json',
      `notify_url` varchar(255) NOT NULL COMMENT '回调地址',
        
      `finish_time` datetime DEFAULT NULL COMMENT '完成时间',
      `is_finish` int(11) NOT NULL DEFAULT 0 COMMENT '是否完成',
      `last_time` datetime DEFAULT NULL COMMENT '最后重试时间',
      `next_time` datetime DEFAULT NULL COMMENT '下次尝试时间',
      `retry_times` int(11) DEFAULT 0 COMMENT '重试次数',
       
      `created_date` datetime DEFAULT NULL COMMENT '创建时间',
       
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='通知补偿任务归档表';
    

    五、其他设计

    还有一种的设计思路是,首次回调如果是成功,就不写入到webhoook_task表,只有当回调失败时,才持久化到任务表进行重试处理。

    六、源码实现

    image.png

    6.1、核心类

    
    import cn.hutool.http.ContentType;
    import cn.hutool.http.HttpResponse;
    import cn.hutool.http.HttpUtil;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    
    import java.nio.charset.Charset;
    import java.util.List;
    
    /**
     * 通知任务
     *
     * @author xxx
     **/
    @Slf4j
    @Service
    public class NotifyTasksService {
    
        private NotifyTaskScheduler notifyTaskScheduler;
    
        @Autowired
        private NotifyConfigRepository notifyConfigRepository;
        /**
         * 连接超时
         */
        private static final int connectionTimeout = 10000;
    
        /**
         * 读取超时
         */
        private static final int readTimeout = 5000;
    
        public void schedule(String taskCode, String notifyUrl, String remark) {
            notifyTaskScheduler.schedule(taskCode, notifyUrl, remark);
        }
    
        public NotifyTasksService(NotifyTaskScheduler notifyTaskScheduler) {
            this.notifyTaskScheduler = notifyTaskScheduler;
            notifyTaskScheduler.registerHandler(Constants.TaskCode.NOTIFY_POINTS_ACCOUNT_FLOW, new NotifyTaskScheduler.NotifyTaskHandler() {
                @Override
                public boolean run(NotifyTasks task) {
                    boolean success = execute(Constants.TaskCode.NOTIFY_POINTS_ACCOUNT_FLOW, task.getNotifyUrl(), task.getNotifyParams(), true);
                    return success;
                }
            });
    
            //TODO 注册其他任务
        }
    
    // 暂时没用上,适用于在接口中指定了回调url。
        public void notify(String taskCode, String notifyUrl, String requestJson) {
            if (StringUtils.isEmpty(taskCode)) {
                return;
            }
            if (StringUtils.isNotEmpty(notifyUrl)) {
                execute(taskCode, notifyUrl, requestJson, false);
            }
    
            this.notify(taskCode, requestJson);
        }
    
        public void notify(String taskCode, String requestJson) {
            List<NotifyConfig> bizNotifyConfigs = notifyConfigRepository.findByTaskCode(taskCode);
    
            if (CollectionUtils.isEmpty(bizNotifyConfigs)) {
                return;
            }
    
            bizNotifyConfigs.forEach(bizNotifyConfig -> {
                // 通知
                String callbackUrl = bizNotifyConfig.getCallbackUrl();
    
                execute(taskCode, callbackUrl, requestJson, false);
            });
        }
    
    // 具体指定回调,这里使用的是hutool框架,会打印调用日志。
        private boolean execute(String taskCode, String callbackUrl, String requestJson, boolean taskExecute) {
            try {
                HttpResponse httpResponse = HttpUtil.createPost(callbackUrl).body(requestJson, ContentType.build(ContentType.JSON.getValue(), Charset.defaultCharset()))
                        .setConnectionTimeout(connectionTimeout).setReadTimeout(readTimeout).execute();
    
                int status = httpResponse.getStatus();
    
                if (!httpResponse.isOk()) {
                    if (log.isWarnEnabled()) {
                        log.warn("执行回调URL:{}失败, \n 请求入参是:{}, \n 返回状态码是:{}, \n 返回体是:{} \n taskCode={}",
                                callbackUrl, requestJson, status, httpResponse.body(), taskCode);
                    }
    
                    this.addTask2Schedule(taskExecute, taskCode, callbackUrl, requestJson);
                } else {
                    if (log.isInfoEnabled()) {
                        log.info("执行回调URL:{}成功, \n 请求入参是:{}, \n 返回状态码是:{}, \n 返回体是:{} \n taskCode={}",
                                callbackUrl, requestJson, status, httpResponse.body(), taskCode);
                    }
                }
                return httpResponse.isOk();
            } catch (Exception e) {
                log.error("执行回调URL出现异常, taskCode={}, 回调URL:{}, 请求入参是:{}", taskCode, callbackUrl, requestJson, e);
    
                this.addTask2Schedule(taskExecute, taskCode, callbackUrl, requestJson);
            }
            return false;
        }
    
    
        private void addTask2Schedule(boolean taskExecute, String taskCode, String callbackUrl, String requestJson) {
            if (!taskExecute) {
                this.schedule(taskCode, callbackUrl, requestJson);
            }
        }
    
    
    }
    
    
    /**
         * 通知任务代码
         **/
        public static class TaskCode {
            /**
             * 积分账户
             **/
            public final static String NOTIFY_POINTS_ACCOUNT_FLOW = "POINTS_ACCOUNT_FLOW";
        }
    

    6.2、NotifyTaskScheduler

    
    import com.google.common.collect.Maps;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.PlatformTransactionManager;
    
    import java.util.Date;
    import java.util.Map;
    import java.util.Set;
    import java.util.stream.Collectors;
    
    /**
     * Description 通知任务调度
     *
     **/
    @Component
    public class NotifyTaskScheduler extends AbstractTaskScheduler<NotifyTasks> {
    
        @Autowired
        private NotifyTasksRepository notifyTasksRepository;
    
        @Autowired
        private NotifyTasksHistoryRepository notifyTasksHistoryRepository;
    
        private Map<String, NotifyTaskHandler> taskHandlers = Maps.newConcurrentMap();
    
        public NotifyTaskScheduler(PlatformTransactionManager platformTransactionManager) {
            super(platformTransactionManager);
        }
    
        public void registerHandler(String type, NotifyTaskHandler handler) {
            taskHandlers.put(type, handler);
        }
    
        public void schedule(String taskCode, String notifyUrl, String remark) {
            NotifyTasks task = new NotifyTasks(taskCode, notifyUrl, remark);
            notifyTasksRepository.save(task);
        }
    
        private NotifyTaskHandler getHandler(String type) {
            return this.taskHandlers.get(type);
        }
    
        @Override
        protected boolean runTask(NotifyTasks task) {
            NotifyTaskHandler handler = getHandler(task.getTaskCode());
            if (handler != null) {
                return handler.run(task);
            }
            return false;
        }
    
        @Override
        protected void onSuccess(NotifyTasks task) {
            NotifyTaskHandler handler = getHandler(task.getTaskCode());
            if (handler != null) {
                handler.onSuccess(task);
            }
        }
    
        @Override
        protected Set<NotifyTasks> getTasks(Date date) {
            Set<NotifyTasks> availableTasks = notifyTasksRepository.findTop10ByNextTimeBeforeOrderByNextTimeAsc(date);
            return availableTasks;
        }
    
        @Override
        protected void updateTasks(Set<NotifyTasks> tasks) {
            notifyTasksRepository.saveAll(tasks);
        }
    
        @Override
        protected void moveTaskToHistory(Set<NotifyTasks> tasks) {
            notifyTasksRepository.deleteAll(tasks);
            final Set<NotifyTasksHistory> historyOrderTasks = tasks.stream()
                    .map(t -> new NotifyTasksHistory(t))
                    .collect(Collectors.toSet());
            notifyTasksHistoryRepository.saveAll(historyOrderTasks);
        }
    
        /**
         * 任务处理
         */
        public interface NotifyTaskHandler {
    
            /**
             * 运行任务
             *
             * @param task
             * @return
             */
            boolean run(NotifyTasks task);
    
            /**
             * 任务处理成功
             * <p>
             * 处于事务中,请勿进行复杂操作
             *
             * @param task
             */
            default void onSuccess(NotifyTasks task) {
            }
        }
    
    
    }
    
    

    6.3、AbstractTaskScheduler

    
    import cn.hutool.core.date.DateUtil;
    import com.google.common.collect.Sets;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.transaction.PlatformTransactionManager;
    
    import java.util.Date;
    import java.util.Set;
    
    /**
     * 任务调度
     *
     */
    public abstract class AbstractTaskScheduler<T extends AbstractTask> extends AbstractTransaction {
    
        private static final Logger logger = LoggerFactory.getLogger(AbstractTaskScheduler.class);
    
        public AbstractTaskScheduler(PlatformTransactionManager platformTransactionManager) {
            super(platformTransactionManager);
        }
    
        public void runSchedule() {
            // 从数据库获取任务
            runInTransaction(() -> {
                Set<T> availableTasks = this.getTasks(DateUtil.date());
                availableTasks.forEach(task -> {
                    task.retry();
                });
                if (!availableTasks.isEmpty()) {
                    this.updateTasks(availableTasks);
                }
                return availableTasks;
            }).forEach(task -> {
                try {
                    this.run(task);
                } catch (Exception ex) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("执行任务失败: " + task, ex);
                    }
                }
            });
        }
    
        private void run(T task) {
            if (logger.isInfoEnabled()) {
                logger.info("正在执行任务: " + task);
            }
            boolean success = this.runTask(task);
            if (success) {
                if (logger.isInfoEnabled()) {
                    logger.info("执行任务成功,回调: " + task);
                }
                this.onSuccess(task);
    
                // 移到历史任务表,这里使用到了自定义事务
                runInTransaction(() -> {
                    if (logger.isInfoEnabled()) {
                        logger.info("执行任务成功,更新任务状态: " + task);
                    }
                    task.finish();
    
                    Set<T> tasks = Sets.newHashSet(task);
                    this.moveTaskToHistory(tasks);
    
                    return 0;
                });
            } else {
                if (logger.isInfoEnabled()) {
                    logger.info("执行任务失败: " + task);
                }
            }
        }
    
    
        /**
         * 获取需要执行任务
         * <p>
         * <pre>需要加分布式锁,或者数据库锁</pre>
         * <pre>已处于事务中</pre>
         *
         * @param date
         * @return
         */
        protected abstract Set<T> getTasks(Date date);
    
        /**
         * 更新任务状态
         * <p>
         * <pre>已处于事务中</pre>
         *
         * @param tasks
         */
        protected abstract void updateTasks(Set<T> tasks);
    
        /**
         * 移到历史任务表
         *
         * @param tasks
         */
        protected abstract void moveTaskToHistory(Set<T> tasks);
    
        /**
         * 执行任务
         *
         * @param task
         * @return
         */
        protected abstract boolean runTask(T task);
    
        /**
         * 任务执行成功后处理
         *
         * @param task
         */
        protected void onSuccess(T task) {
            /* NOT-IMPL */
        }
    }
    
    

    6.4、AbstractTask

    import javax.persistence.Column;
    import javax.persistence.MappedSuperclass;
    import java.time.Instant;
    import java.time.LocalDateTime;
    import java.time.ZoneId;
    import java.time.temporal.ChronoUnit;
    import java.util.Date;
    
    /**
     * 定时处理任务
     *
     */
    @MappedSuperclass
    public abstract class AbstractTask {
    
        @Column(name = "next_time", columnDefinition = "DATETIME default NOW() COMMENT '下次尝试时间'")
        private Date nextTime;
    
        @Column(name = "retry_times", columnDefinition = "INT default 0 COMMENT '重试次数'")
        private int retryTimes;
    
        @Column(name = "last_time", columnDefinition = "DATETIME COMMENT '最后重试时间'")
        private Date lastTime;
    
        @Column(name = "is_finish", nullable = false, columnDefinition = "INT default 0 COMMENT '是否完成'")
        private boolean isFinish;
    
        @Column(name = "finish_time", columnDefinition = "DATETIME COMMENT '完成时间'")
        private Date finishTime;
    
        public AbstractTask() {
            this(new Date(), 0, null, false, null);
        }
    
        public AbstractTask(AbstractTask task) {
            this(task.nextTime, task.retryTimes, task.lastTime, task.isFinish, task.finishTime);
        }
    
        public AbstractTask(Date nextTime, int retryTimes, Date lastTime, boolean isFinish, Date finishTime) {
            this.nextTime = nextTime;
            this.retryTimes = retryTimes;
            this.lastTime = lastTime;
            this.isFinish = isFinish;
            this.finishTime = finishTime;
        }
    
        /**
         * 指数时间退避重试
         *
         * @return
         */
        public void retry() {
            this.retryTimes += 1;
            this.nextTime = calcNextRetryTime();
            this.lastTime = new Date();
        }
    
    // 计算下一次重试时间
        private Date calcNextRetryTime() {
            final ZoneId zone = ZoneId.systemDefault();
            final LocalDateTime lastTime = LocalDateTime.ofInstant(Instant.now(), zone);
            final LocalDateTime nextTime = lastTime.plus(exp(this.retryTimes), ChronoUnit.MILLIS);
    
            final Instant instant = nextTime.atZone(zone).toInstant();
            return Date.from(instant);
        }
    
    // 指数级重试
        public static long exp(int retryCount) {
            long waitTime = ((long) Math.pow(2, retryCount) * 1000L);
            return waitTime;
        }
    
        public void finish() {
            this.isFinish = true;
            this.finishTime = new Date();
        }
    
    }
    
    
    
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.TransactionDefinition;
    import org.springframework.transaction.TransactionStatus;
    import org.springframework.transaction.support.DefaultTransactionDefinition;
    import rx.functions.Func0;
    
    /**
     *
     **/
    public abstract class AbstractTransaction {
    
        private PlatformTransactionManager platformTransactionManager;
    
        public AbstractTransaction(PlatformTransactionManager platformTransactionManager) {
            this.platformTransactionManager = platformTransactionManager;
        }
    
        public <R> R runInTransaction(Func0<R> func0) {
            R ret = null;
            TransactionDefinition definition = new DefaultTransactionDefinition(
                    TransactionDefinition.PROPAGATION_REQUIRED);
            TransactionStatus status = platformTransactionManager.getTransaction(definition);
            try {
                ret = func0.call();
    
                platformTransactionManager.commit(status);
            } catch ( Exception e ) {
                platformTransactionManager.rollback(status);
            }
            return ret;
        }
    }
    
    

    6.5、xxl-job定时任务

    用于补偿回调失败的任务

    image.png
        private final NotifyTaskScheduler notifyTaskScheduler;
    
    /**
         * 回调通知
         *
         * @param param
         * @return
         * @throws Exception
         */
        @XxlJob("notifyTasksHandler")
        public ReturnT<String> notifyTasksHandler(String param) throws Exception {
            notifyTaskScheduler.runSchedule();
            return ReturnT.SUCCESS;
        }
    

    相关文章

      网友评论

          本文标题:微服务中,实现自定义的webhook

          本文链接:https://www.haomeiwen.com/subject/ixsucdtx.html