美文网首页
延迟性任务实现解析

延迟性任务实现解析

作者: Briseis | 来源:发表于2018-09-16 17:59 被阅读39次

    背景

    很多人在面试的时候可能都碰到过这样的一个面试题:设计一个秒杀系统,30分钟没付款就自动关闭交易,这里我们主要来看下在实际的项目中如何结合业务需求来实现类似"xxx分钟后自动完成xxx"这种属于延迟任务的功能。

    业务场景

    下面来看看具体的业务场景,我们在系统中有很多的需求如活动报名,活动签到,活动取消,活动审核等等都需要发送短信或者消息,其中有些比较特殊的如活动开始或者结束前xxx小时发送消息、活动到期后自动发送通知等等都属于延迟性触发的任务,那么针对于类似这样的任务,一般我们都是怎么处理的呢?

    设计思路

    由于系统是目前仍属于单机应用,所以在实现上暂时不考虑分布式,为了简单快速采用了JDK自带的本地延迟队列DelayQueue结合redis作为数据灾备的方案.DelayQueue是JUC框架中提供的一个具备延迟机制的队列.
    DelayQueue具有有如下特点:

    • 队列中存储的元素必须实现Delayed接口,且元素具有时效性.
    • Delayed接口提供了getDelay方法来返回对象的延迟时间.
    • Delayed接口提供了compareTo方法用于队列内部元素的比较排序.
    • 内部使用了优先级队列PriorityQueue来实现每次从队首中取出来的都是最先要过期的元素.
    • 实现了BlockingQueue接口,是一个无界阻塞队列,且元素不允许为null.
    • 提供了如阻塞方法take()返回队首元素,put()方法添加元素,remove()方法元素出队等等...

    那么大致实现的思路如下:
    在创建活动时会将活动id与计算好的时间差值存储到redis缓存中,服务器后台开启守护线程实时监控本地队列中到期的任务,并触发相应的推送操作,同时为了防止服务器意外重启等情况,在系统初始化时会将缓存数据load到本地队列中,这样可以避免由于数据丢失导致消息与短信数据没有推送到,下面来看看实现步骤:
    1.先创建任务对象dto,内部定义了任务标识ID与延迟时间戳并实现Delayed接口和Runnable接口,我们来看看其中一个dto的实现,其他的类似,代码如下:

    public class NotifyDto implements Delayed, Runnable, Serializable {
        private static final long serialVersionUID = 1L;
        private final static Logger logger = LoggerFactor.getLogger(NotifyDto.class);
    
        private String redisMsg;//缓存数据即任务标识ID
        private long expireTime;//延迟时间
    
            //带参构造函数
        public NotifyDto(String redisMsg, long delayTime) {
            this.redisMsg = redisMsg;
            this.expireTime = TimeUnit.NANOSECONDS.convert(delayTime,
                    TimeUnit.MILLISECONDS) + System.nanoTime();
        }
            
        public String getRedisMsg() {
            return redisMsg;
        }
    
        public void setRedisMsg(String redisMsg) {
            this.redisMsg = redisMsg;
        }
    
        public long getExpireTime() {
            return expireTime;
        }
    
        public void setExpireTime(long expireTime) {
            this.expireTime = expireTime;
        }
        
        /**
         * 用于延迟队列内部比较排序   当前对象的延迟时间 - 比较对象的延迟时间
         * @see java.lang.Comparable#compareTo(java.lang.Object)
         **/
        @Override
        public int compareTo(Delayed o) {
            NotifyDto task = (NotifyDto) o;
            long result = this.getDelay(TimeUnit.NANOSECONDS)
                    - task.getDelay(TimeUnit.NANOSECONDS);
            if (result < 0) {
                return -1;
            } else if (result > 0) {
                return 1;
            } else {
                return 0;
            }
        }
    
        /**
         * 返回对象延迟时间
         * @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
         **/
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expireTime - System.nanoTime(),
                    TimeUnit.NANOSECONDS);
        }
    
        /**
         * 任务回调处理
         * @see java.lang.Runnable#run()
         **/
        @Override
        public void run() {
            logger.debug("当前任务队列:msgQueue,延迟时间:{},活动ID:{}", this.expireTime
                    + "=========================================", this.redisMsg);
            this.msgPus h(this.redisMsg);
        }
    
        private void msgPush(String redisMsg) {
            Map<String, Object> msgMap = new HashMap<>();
            Integer activeId = NumberHelpUtils.toInt(redisMsg);
            //省略发送消息动作
                    //清空缓存记录
            JedisUtils.zRemove("active:review:notify", activeId + "");
        }
    
        @Override
        public String toString() {
            return "NotifyDto [redisMsg=" + redisMsg + ", expireTime=" + expireTime
                    + "]";
        }
    
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result
                    + ((redisMsg == null) ? 0 : redisMsg.hashCode());
            return result;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            NotifyDto other = (NotifyDto) obj;
            if (redisMsg == null) {
                if (other.redisMsg != null)
                    return false;
            } else if (!redisMsg.equals(other.redisMsg))
                return false;
            return true;
        }
    }
    

    2.创建一个任务调度服务service监控所有队列中的过期任务对象,其内部包含各种任务队列初始化,出队与入队等方法,大致代码如下.

    public class MsgQueueService {
    
        private final static Logger logger = LoggerFactory
                .getLogger(MsgQueueService.class);
    
        private MsgQueueService() {
    
        }
    
        private static class MsgHolder {
            private static MsgQueueService msgQueueService = new MsgQueueService();
        }
    
        public static MsgQueueService getInstance() {
            return MsgHolder.msgQueueService;
        }
    
        /**
         * 执行任务线程池
         */
        private final static Executor es = Executors.newFixedThreadPool(5);
    
        /**
         * 创建3个守护线程
         */
        private Thread expireActiveThread;
    
        private Thread startByActiveThread;
    
        private Thread beforeEndActiveThread;
    
        /**
         * 创建延迟任务队列
         */
        private DelayQueue<NotifyDto> msgQueue = new DelayQueue<>();
    
        private DelayQueue<StartByNotifyDto> msgQueue2 = new DelayQueue<>();
    
        private DelayQueue<BeforEndNotifyDto> msgQueue3 = new DelayQueue<>();
    
        /**
         * 
         * 系统启动时初始化
         */
        public void init() {
            //初始化数据
            initRedisMsg();
            //监听活动结束后任务
            expireActiveThread = new Thread(() -> execute());
            expireActiveThread.setDaemon(true);
            expireActiveThread.setName("ExpireActive Daemon Thread");
            expireActiveThread.start();
            //监听活动开始前2小时任务
            startByActiveThread = new Thread(() -> execute2());
            startByActiveThread.setDaemon(true);
            startByActiveThread.setName("StartByActive Daemon Thread");
            startByActiveThread.start();
            //监听活动结束前2小时任务
            beforeEndActiveThread = new Thread(() -> execute3());
            beforeEndActiveThread.setDaemon(true);
            beforeEndActiveThread.setName("BeforeEndActive Daemon Thread");
            beforeEndActiveThread.start();
        }
    
        /**
         * 
         * 从Redis中初始化任务到队列中
         */
        public void initRedisMsg() {
            Set<String> keySet = JedisUtils.zRange("active:review:notify", 0, -1);
            if (CollectionHelpUtils.isNotEmpty(keySet)) {
                keySet.stream().forEach(
                        o -> {
                            long expireTime = JedisUtils.zScore(
                                    "active:review:notify", o).longValue();
                            NotifyDto redisTask = new NotifyDto(o, expireTime
                                    - System.currentTimeMillis());
                            this.push(redisTask);
                        });
            }
            Set<String> applyStartSet = JedisUtils.zRange(
                    "active:applyStart:notify", 0, -1);
            if (CollectionHelpUtils.isNotEmpty(applyStartSet)) {
                applyStartSet.stream().forEach(
                        o -> {
                            long expireTime = JedisUtils.zScore(
                                    "active:applyStart:notify", o).longValue();
                            StartByNotifyDto redisTask = new StartByNotifyDto(o,
                                    expireTime - System.currentTimeMillis());
                            this.push(redisTask);
                        });
            }
            Set<String> applyEndSet = JedisUtils.zRange("active:applyEnd:notify",
                    0, -1);
            if (CollectionHelpUtils.isNotEmpty(applyEndSet)) {
                applyEndSet.stream().forEach(
                        o -> {
                            long expireTime = JedisUtils.zScore(
                                    "active:applyEnd:notify", o).longValue();
                            BeforEndNotifyDto redisTask = new BeforEndNotifyDto(o,
                                    expireTime - System.currentTimeMillis());
                            this.push(redisTask);
                        });
            }
        }
    
        /**
         * 监听队列,如果没有过期对象则阻塞
         * @param es
         */
        private void execute() {
            while (true) {
                try {
                    NotifyDto task = msgQueue.take();
                    if (task != null) {
                        logger.debug("当前任务队列:{},执行时间:{}", "msgQueue",
                                DateTimeUtils.getTime());
                                            //此处真正执行了任务对象中的run方法,触发了业务推送动作
                        es.execute(task);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    
        private void execute2() {
            while (true) {
                try {
                    StartByNotifyDto task2 = msgQueue2.take();
                    if (task2 != null) {
                        logger.debug("当前任务队列:{},执行时间:{}", "msgQueue2",
                                DateTimeUtils.getTime());
                                            //此处真正执行了任务对象中的run方法,触发了业务推送动作
                        es.execute(task2);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    
        private void execute3() {
            while (true) {
                try {
                    BeforEndNotifyDto task3 = msgQueue3.take();
                    if (task3 != null) {
                        logger.debug("当前任务队列:{},执行时间:{}", "msgQueue3",
                                DateTimeUtils.getTime());
                                            //此处真正执行了任务对象中的run方法,触发了业务推送动作
                        es.execute(task3);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    
        /**
         * 
         * 任务入队
         * @param time
         * @param task
         */
        public void push(NotifyDto task) {
            msgQueue.put(task);
        }
    
        public void push(StartByNotifyDto task) {
            msgQueue2.put(task);
        }
    
        public void push(BeforEndNotifyDto task) {
            msgQueue3.put(task);
        }
    
        /**
         * 
         * 任务出队
         * @param task
         */
        public void remove(NotifyDto task) {
            msgQueue.remove(task);
        }
    
        public void remove(StartByNotifyDto task) {
            msgQueue2.remove(task);
        }
    
        public void remove(BeforEndNotifyDto task) {
            msgQueue3.remove(task);
        }
    }
    

    注意: MsgQueueService的实现使用了单例模式,并且其中的init方法在系统初始化时被调度执行.
    3.创建一个系统初始化服务类,默认实现Spring框架提供的接口InitializingBean,重写其接口方法afterPropertiesSet以便在系统启动后自动执行初始化逻辑.

        @Override
        public void afterPropertiesSet() throws Exception {
            logger.debug("系统初始化...");
            MsgQueueService.getInstance().init();
      }
    

    4.最后一步就是具体的业务逻辑处理了,我们在业务service中的调用方法代码如下:

        /**
         * 
         * 把活动任务推送到延迟队列中
         * @param activeId
         */
        private void Msg2Queue(Active active, boolean isAdd) {
            long delayTime = 0;
            if (isAdd) {
                //计算出当前任务延迟时间
                delayTime = active.getActiveEndTime().getTime()
                        - active.getCreateTime().getTime();
                //推送到redis中
                JedisUtils.zAdd("active:review:notify", active.getActiveEndTime()
                        .getTime(), active.getId() + "");
                //活动到期未点评的任务入队
                NotifyDto task = new NotifyDto(active.getId() + "", delayTime);
                MsgQueueService.getInstance().push(task);
                //判断活动报名截止状态
                if (active.getApplyAudit() == 1) {
                    if (active.getApplyAbort() == 1) {
                        //活动开始截止报名
                        delayTime = active.getActiveStartTime().getTime() - 2 * 60
                                * 60 * 1000 - System.currentTimeMillis();
                        //推送到redis中
                        JedisUtils.zAdd("active:applyStart:notify", active
                                .getActiveStartTime().getTime()
                                - 2
                                * 60
                                * 60
                                * 1000, active.getId() + "");
                        StartByNotifyDto task2 = new StartByNotifyDto(
                                active.getId() + "", delayTime);
                        MsgQueueService.getInstance().push(task2);
                    } else {
                        //活动结束前可报名
                        delayTime = active.getActiveEndTime().getTime() - 2 * 60
                                * 60 * 1000 - System.currentTimeMillis();
                        //推送到redis中
                        JedisUtils.zAdd("active:applyEnd:notify", active
                                .getActiveEndTime().getTime() - 2 * 60 * 60 * 1000,
                                active.getId() + "");
                        BeforEndNotifyDto task3 = new BeforEndNotifyDto(
                                active.getId() + "", delayTime);
                        MsgQueueService.getInstance().push(task3);
                    }
                }
            } else {
                //动态取消任务
                Double score = JedisUtils.zScore("active:review:notify",
                        active.getId() + "");
                if (score != null) {
                    long expireTime = score.longValue();
                    MsgQueueService.getInstance().remove(
                            new NotifyDto(active.getId() + "", expireTime));
                    //清空redis中记录
                    JedisUtils.zRemove("active:review:notify", active.getId() + "");
                    //重新计算出当前任务延迟时间
                    delayTime = active.getActiveEndTime().getTime()
                            - System.currentTimeMillis();
                    //推送到redis中
                    JedisUtils.zAdd("active:review:notify", active
                            .getActiveEndTime().getTime(), active.getId() + "");
                    //延迟任务入队
                    NotifyDto task = new NotifyDto(active.getId() + "", delayTime);
                    MsgQueueService.getInstance().push(task);
                }
            }
        }
    

    注意:由于实际业务中任务可以被修改或取消,所以定义任务dto时需重写其hashcode方法与equals方法,来防止队列中的对象出现冲突,由于dto中的redisMsg字段对应了mysql中的表自增主键,所以我们使用了这个字段来重写这两个方法.

    总结

    此方案的优点:

    • 代码实现相对比较简单,利用JDK自带的容器类来解决延迟处理问题,无需自己造轮子.
    • 效率高,任务触发时间延迟低.
    • 由于线程安全还可以实现多生产者与消费者.
    • 结合redis做数据灾备,避免由于服务重启或其他异常退出导致的数据丢失问题.

    此方案的缺点:

    • DelayQueue属于单机队列,若在分布式集群环境下,要自己做横向扩展以实现高可用,难度较高.
    • 服务器一旦宕机,数据将丢失,需结合其他底层存储做持久化,增加了编码的复杂性.
    • 数据存储在单机内存中,受物理条件限制,数据量大时容易OOM.

    其他的解决方案:

    • JDK自带的线程池ScheduledExecutorService.
    • 简单的定时任务轮询,扫描数据表,数据量大时会有性能瓶颈.
    • 可以定时任务结合redis,任务和到期时间都保存在redis中,启动定时任务扫描redis,到期的key删除,并且异步更新数据库.
    • google guava的缓存也可实现类似的功能.
    • 消息队列如ActiveMQ或者RobbitMQ都提供了死信队列可实现延迟功能,设定任务的到期时间,到期之后自动进入死信队列,后台开启守护线程监控死信队列.
    • 时间轮算法,Netty提供了一个HashedWheelTimer来实现.
    • 利用redis的zset可实现延迟队列或者redis的Keyspace Notifications(键空间机制)实现key失效后提供回调函数.
      ......

    相关文章

      网友评论

          本文标题:延迟性任务实现解析

          本文链接:https://www.haomeiwen.com/subject/dbopsftx.html