美文网首页
延迟队列的几种实现方式

延迟队列的几种实现方式

作者: 蒋仁勇 | 来源:发表于2020-02-14 20:40 被阅读0次

    哇,今天2019年最后一天上班,实在没心情撸代码,感觉隔了好久没写博客了,乘机写点吧。
    回想2019实在是充实的一年,买房、迁户口、领结婚证、换工作啥都来了,说到换工作,实在想吐槽一下前东家,干了两年毛都没有捞到,技术成长几乎没有,加班到后半夜写业务,钱没赚到多,扣得一逼,入职时说期权,结果毛都没看一根,两年才涨500块,还他妈是入职时压下去的500,老板人前一套人后一套,空降的领导(外号做事情的人,后面都用这个外号替代)恶心一逼,甩锅抢功劳一流,欺下媚上,总算10月底逃离苦海了,虽然咱刚走就裁员三分之一,损失了几万补偿金也心甘情愿。
    当初刚入职的时候想着创业公司50人,咱也算核心员工了,说是有期权,还蛮期待的,但是现在200-300人了,老员工走光了,项目被做事情的人架构得一塌糊涂。其实职场都差不多吧,但这么极品的人还真是第一次碰到,做事情的人客观来讲技术实力是有点,大概也就阿里P6-7的水平吧,但架子比P9都高的感觉,大概之前没当过领导,总有点小人得志的感觉。哎,算了吐槽这种人纯粹在浪费时间,这里建议各位同行还是能在大厂就尽量在大厂呆,福利待遇起码有保障,以后资历也有亮点,小厂真的就和赌博一样,可惜咱是赌输了,话说咱也从来没赌赢过。吐槽就这么多,下面上干活:

    延迟队列是什么?
    相信写过几年代码的应该都知道,不知道的自己去谷歌,我的理解:它其实是为了解决特点场景的一种技术方案。比如如下场景:

    你下单购物,但订单20分钟未支付就要失效,你会怎么做?

    可能刚入行的第一反应就是job定时扫啊,不错job定时扫是能解决问题,可是不够优雅,而且job的时间怎么设?如何保证准时性?(如果赶时间或者不要求准时性这些的话用xxl-job或者@Scheduled(cron = "")注解通过job定时扫也可以实现就不说了。)

    这时延迟队列的特点就可以完美应用于该场景,当下单时往延迟队列放一条,20分钟后执行订单失效操作。据我所知道延迟队列有以下几种实现方式:
    1、基于MQ实现:
    参考下面博客:
    https://www.cnblogs.com/hzmark/p/mq-delay-msg.html
    https://blog.csdn.net/qq_21033663/article/details/101222502

    阿里云的RocketMQ ,具体可以参考官方文档:https://help.aliyun.com/document_detail/43349.html

    大概思路是下图:


    MQ实现延迟队列.png

    2、基于Redis实现:
    这个就多了,首先目前最火的Redis框架Redisson就有这种解决方案,事实上这个框架基本上把Redis都玩出花了,分布式锁、分布式对象、分布式集合、分布式服务等都包含了。这里不多介绍,自己去GitHub上看:https://github.com/redisson/redisson

    下面主要介绍新公司的大佬基于lua脚本和Guava-EventBus的简单实现,来,一起膜拜一波:


    目录结构.png

    DelayQProducer:

    package com.company.project.service.util.delayqueue.producer;
    
    import com.company.project.service.util.delayqueue.DelayJob;
    import java.time.Duration;
    
    public interface DelayQProducer {
    
        /**
         * 提交任务,若存在则更新
         */
        void submit(String queue, DelayJob<?> job, Duration delayed);
    
        /**
         * 更新任务,不存在返回false
         */
        boolean update(String queue, DelayJob<?> job, Duration delayed);
    
        /**
         * 取消任务
         */
        void cancel(String queue, DelayJob<?> job);
    
        /**
         * 查询任务队列长度
         */
        default int getQueueSize(String queue) {
            return 0;
        }
    
        String getNamespace();
    }
    

    DelayQProducer:

    package com.company.project.service.util.delayqueue.producer;
    
    import com.alibaba.fastjson.JSON;
    import com.company.project.service.util.PooledRedisClient;
    import com.company.project.service.util.delayqueue.DelayJob;
    import lombok.Getter;
    import lombok.Setter;
    import java.time.Duration;
    import java.time.Instant;
    import static com.company.project.service.util.delayqueue.util.DQUtil.buildKey;
    
    public class DelayQProducerImpl implements DelayQProducer {
    
        @Setter
        @Getter
        private String namespace;
    
        @Setter
        private PooledRedisClient pooledRedisClient;
    
        @Override
        public void submit(String queue, DelayJob<?> job, Duration delayed) {
            double score = Instant.now().toEpochMilli() + delayed.toMillis();
            pooledRedisClient.runWithRetry(jedis -> jedis.zadd(buildKey(namespace, queue), score, JSON.toJSONString(job)));
        }
    
        @Override
        public boolean update(String queue, DelayJob<?> job, Duration delayed) {
            String jobString = JSON.toJSONString(job);
            Double oldScore = pooledRedisClient.executeWithRetry(jedis -> jedis.zscore(buildKey(namespace, queue), jobString));
            if (oldScore == null) {
                return false;
            } else {
                double newScore = Instant.now().toEpochMilli() + delayed.toMillis();
                pooledRedisClient.runWithRetry(jedis -> jedis.zadd(buildKey(namespace, queue), newScore, JSON.toJSONString(job)));
                return true;
            }
        }
    
        @Override
        public void cancel(String queue, DelayJob<?> job) {
            pooledRedisClient.runWithRetry(jedis -> jedis.zrem(buildKey(namespace, queue), JSON.toJSONString(job)));
        }
    
        @Override
        public int getQueueSize(String queue) {
            return pooledRedisClient.executeWithRetry(jedis -> jedis.zcard(buildKey(namespace, queue))).intValue();
        }
    }
    
    

    依赖PooledRedisClient:

    package com.company.project.service.util;
    
    import lombok.extern.slf4j.Slf4j;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    import java.util.function.Consumer;
    import java.util.function.Function;
    
    
    @Slf4j
    public class PooledRedisClient {
    
        private static final int DEFAULT_RETRY_TIME = 3;
    
        private static final int MIN_RETRY_TIME = 0;
    
        private static final int MAX_RETRY_TIME = 5;
    
        private static final long REDIS_EXECUTE_WARN_TIME = 50;
    
        private final JedisPool pool;
    
        public PooledRedisClient(JedisPool pool) {
            this.pool = pool;
        }
    
        public void run(Consumer<Jedis> command) {
    
            execute(jedis -> {
                command.accept(jedis);
                return null;
            });
        }
    
        public void runWithRetry(Consumer<Jedis> command) {
            runWithRetry(command, DEFAULT_RETRY_TIME);
        }
    
        public void runWithRetry(Consumer<Jedis> command, int retries) {
            executeWithRetry(jedis -> {
                command.accept(jedis);
                return null;
            }, retries);
        }
    
        public <T> T execute(Function<Jedis, T> command) {
            long startTime = System.currentTimeMillis();
            Jedis jedis = null;
            try {
                jedis = this.pool.getResource();
                return command.apply(jedis);
            } finally {
                if (jedis != null) {
                    jedis.close();
                }
                long endTime = System.currentTimeMillis();
                long useTime = endTime - startTime;
                if (useTime > REDIS_EXECUTE_WARN_TIME) {
                    log.warn("pooled redis client execute time warn. useTime:[{}],command:[{}]", useTime, command);
                }
            }
        }
    
        public <T> T executeWithRetry(Function<Jedis, T> command) {
            return executeWithRetry(command, DEFAULT_RETRY_TIME);
        }
    
        public <T> T executeWithRetry(Function<Jedis, T> command, int retryTime) {
            if (retryTime <= MIN_RETRY_TIME || retryTime > MAX_RETRY_TIME) {
                retryTime = DEFAULT_RETRY_TIME;
            }
            int failTime = 0;
    
            T result;
            while (true) {
                long startTime = System.currentTimeMillis();
                try (Jedis jedis = this.pool.getResource()) {
                    log.debug("pooled redis client execute start.command:[{}]", command);
                    result = command.apply(jedis);
                    break;
                } catch (RuntimeException e) {
                    failTime++;
                    if (failTime >= retryTime) {
                        throw e;
                    }
                } finally {
                    long useTime = System.currentTimeMillis() - startTime;
                    if (useTime > REDIS_EXECUTE_WARN_TIME) {
                        log.warn("pooled redis client execute time warn. useTime:[{}],command:[{}]", useTime, command);
                    }
                }
            }
            return result;
        }
    
        public JedisPool getPool() {
            return this.pool;
        }
    
    }
    

    BoundedDelayQProducer:

    package com.company.project.service.util.delayqueue.producer;
    
    import com.company.project.service.util.delayqueue.DelayJob;
    import lombok.Setter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.Duration;
    
    public class BoundedDelayQProducer implements DelayQProducer {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        @Setter
        private DelayQProducer delegate;
    
        /**
         * warning: 按每个job 512字节计算,200,000个job将占用redis约100MB内存
         * 后续需要考虑对queue分片
         */
        @Setter
        private int maxQueueSize = 200000;
    
        @Setter
        private int warningSize = 100000;
    
        @Override
        public void submit(String queue, DelayJob<?> job, Duration delayed) {
            int queueSize = delegate.getQueueSize(queue);
            if (queueSize > maxQueueSize) {
                logger.error("task queue:{} exceed limit:{}, task discard", queue, maxQueueSize);
            } else if (queueSize > warningSize) {
                logger.warn("task queue:{} size warning, current size:{}", queue, queueSize);
                delegate.submit(queue, job, delayed);
            } else {
                logger.debug("task queue:{}, size:{}", queue, queueSize);
                delegate.submit(queue, job, delayed);
            }
        }
    
        @Override
        public boolean update(String queue, DelayJob<?> job, Duration delayed) {
            return delegate.update(queue, job, delayed);
        }
    
        @Override
        public void cancel(String queue, DelayJob<?> job) {
            delegate.cancel(queue, job);
        }
    
        @Override
        public String getNamespace() {
            return delegate.getNamespace();
        }
    }
    
    

    DelayQConsumer:

    package com.company.project.service.util.delayqueue.consumer;
    
    public interface DelayQConsumer {
    
        void subscribe(String queue);
    
    }
    
    

    DelayQConsumerImpl:

    package com.company.project.service.util.delayqueue.consumer;
    
    import com.alibaba.fastjson.JSON;
    import com.company.project.service.util.PooledRedisClient;
    import com.company.project.service.util.delayqueue.DelayJob;
    import com.company.project.service.util.delayqueue.DelayJobProcessor;
    import com.company.project.service.util.delayqueue.util.DQUtil;
    import com.google.common.eventbus.AsyncEventBus;
    import lombok.Setter;
    import org.apache.commons.collections.CollectionUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.commons.lang3.concurrent.BasicThreadFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    import java.time.Instant;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    @SuppressWarnings("UnstableApiUsage")
    public class DelayQConsumerImpl implements DelayQConsumer, ApplicationContextAware, DisposableBean, InitializingBean {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        private static final String POP_SCRIPT = "local rlt = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', '0', ARGV[2])"
                + " if next(rlt) ~= nil then redis.call('ZREM', KEYS[1], unpack(rlt)) end"
                + " return rlt";
    
        private static final long SLEEP_FOR_EMPTY_QUEUE = 1000L;
    
        private static final long SLEEP_FOR_ERROR = 5000L;
    
        private static final int MAX_IN_FLIGHT_JOB = 2000;
    
        private static final int EVENT_BUS_THREAD_POOL_SIZE = 8;
    
        @Setter
        private String namespace;
    
        @Setter
        private String initWithQueue;
    
        @Setter
        private String batchSize = "10";
    
        @Setter
        private PooledRedisClient pooledRedisClient;
    
        private volatile State state = State.NEW;
    
        private AsyncEventBus asyncEventBus;
    
        private ExecutorService eventBusPool;
    
        @Override
        public void subscribe(String queue) {
            if (state == State.TERMINATED) {
                return;
            }
            logger.info("subscribe " + queue + " on namespace " + namespace);
            state = State.RUNNING;
            new Thread(new Consumer(queue)).start();
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            if (StringUtils.isNotBlank(initWithQueue)) {
                this.subscribe(initWithQueue);
            }
        }
    
        @Override
        public void destroy() throws Exception {
            state = State.TERMINATED;
            eventBusPool.shutdown();
            if (!eventBusPool.awaitTermination(30, TimeUnit.SECONDS)) {
                logger.warn("force shutdown delay queue consumer eventBusPool");
                eventBusPool.shutdownNow();
            }
        }
    
        @SuppressWarnings("rawtypes")
        class Consumer implements Runnable {
    
            private String queue;
    
            private Map<String, Class> classCache = new HashMap<>();
    
            public Consumer(String queue) {
                this.queue = queue;
            }
    
            @Override
            public void run() {
                Thread.currentThread().setName("delay-queue-consumer-thread-" + namespace + "-" + queue);
                String key = DQUtil.buildKey(namespace, queue);
                while (state == State.RUNNING) {
                    try {
                        String now = String.valueOf(Instant.now().toEpochMilli());
                        List jobs = null;
                        try {
                            jobs = (List) pooledRedisClient.executeWithRetry(jedis -> jedis.eval(POP_SCRIPT, 1, key, now, batchSize));
                        } catch (Exception e) {
                            logger.error("redis error, namespace:{}, queue:{}", namespace, queue, e);
                        }
                        if (CollectionUtils.isEmpty(jobs)) {
                            Thread.sleep(SLEEP_FOR_EMPTY_QUEUE);
                            continue;
                        }
                        for (Object o : jobs) {
                            DelayJob job = JSON.parseObject(o.toString(), DelayJob.class);
                            Object jobEntity = JSON.toJavaObject((JSON) job.getEntity(), getClass(job.getClassName()));
                            logger.info("receive delayed job:{}", jobEntity);
                            asyncEventBus.post(jobEntity);
                        }
                    } catch (Throwable e) {
                        logger.error("unexpected consume error, namespace:{}, queue:{}", namespace, queue, e);
                        try {
                            Thread.sleep(SLEEP_FOR_ERROR);
                        } catch (InterruptedException ignore) {
                        }
                    }
                }
                logger.info("consumer terminated, namespace:{}, queue:{}", namespace, queue);
            }
    
            private Class getClass(String className) throws ClassNotFoundException {
                Class clazz = classCache.get(className);
                if (clazz == null) {
                    clazz = Class.forName(className);
                    classCache.put(className, clazz);
                }
                return clazz;
            }
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            // AsyncEventBus内部使用无界队列存储事件,无法在post阶段限流,只能在消费时丢弃,目前没有溢出风险,后续考虑更换为mbassador
            eventBusPool = new ThreadPoolExecutor(EVENT_BUS_THREAD_POOL_SIZE, EVENT_BUS_THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(MAX_IN_FLIGHT_JOB),
                    new BasicThreadFactory.Builder().namingPattern("event-bus-delay-queue-consumer-%d").build(),
                    (r, executor) -> logger.error("delay task discarded due to overflow"));
            asyncEventBus = new AsyncEventBus(eventBusPool, (exception, context) -> logger.error("async event process failed with subscriber: {} method: {} event: {}",
                    context.getSubscriber().getClass().getSimpleName(), context.getSubscriberMethod().getName(), context.getEvent(), exception));
    
            applicationContext.getBeansWithAnnotation(DelayJobProcessor.class).values().forEach(bean -> {
                logger.info("register delay queue subscriber: {}", bean.getClass().getSimpleName());
                asyncEventBus.register(bean);
            });
        }
    
        public enum State {
            NEW, RUNNING, TERMINATED
        }
    }
    
    

    DQUtil:

    package com.company.project.service.util.delayqueue.util;
    
    import com.google.common.base.Preconditions;
    import org.apache.commons.lang3.StringUtils;
    
    
    public class DQUtil {
    
        private static final String KEY_PREFIX = "card_im_delay_queue";
    
        private static final String DELIMITER = ":";
    
        public static String buildKey(String namespace, String key) {
            Preconditions.checkArgument(StringUtils.isNotBlank(namespace));
            Preconditions.checkArgument(StringUtils.isNotBlank(key));
            return KEY_PREFIX + DELIMITER + namespace + DELIMITER + key;
        }
    
        public static String buildKey(String key) {
            return KEY_PREFIX + DELIMITER + key;
        }
    }
    
    

    DelayJob:

    package com.company.project.service.util.delayqueue;
    
    import lombok.Data;
    
    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    
    
    @Data
    public class DelayJob<T> implements Serializable {
        private static final long serialVersionUID = -2392357322522404381L;
    
        private String className;
    
        private T entity;
    
        private Map<String, Object> context;
    
        public DelayJob() {
        }
    
        public DelayJob(T entity) {
            this.entity = entity;
            this.className = entity.getClass().getName();
            this.context = new HashMap<>();
        }
    
        public DelayJob(T entity, Map<String, Object> context) {
            this.entity = entity;
            this.className = entity.getClass().getName();
            this.context = context;
        }
    }
    
    

    DelayJobProcessor:

    package com.company.project.service.util.delayqueue;
    
    import org.springframework.stereotype.Component;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * for auto register EventBus subscriber
     */
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface DelayJobProcessor {
    }
    
    

    DelayQConfig:

    package com.company.project.service.util.delayqueue;
    
    
    import com.company.project.service.util.PooledRedisClient;
    import com.company.project.service.util.delayqueue.consumer.DelayQConsumer;
    import com.company.project.service.util.delayqueue.consumer.DelayQConsumerImpl;
    import com.company.project.service.util.delayqueue.producer.BoundedDelayQProducer;
    import com.company.project.service.util.delayqueue.producer.DelayQProducer;
    import com.company.project.service.util.delayqueue.producer.DelayQProducerImpl;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    
    @Configuration
    public class DelayQConfig {
    
     /**
         * 交互事件超时结束任务延迟队列
         */
        public static final String DELAY_QUEUE_INTERACT_EVENT_AUTO_CLOSE = "interact_event_auto_close";
    
    
        @Resource
        private PooledRedisClient pooledRedisClient;
    
        @Value(value = "${business-card.delayqueue.namespace:business-card}")
        private String namespace;
    
        @Bean
        public DelayQProducer delayQProducer() {
            DelayQProducerImpl delayQProducer = new DelayQProducerImpl();
            delayQProducer.setNamespace(namespace);
            delayQProducer.setPooledRedisClient(pooledRedisClient);
    
            BoundedDelayQProducer boundedProducer = new BoundedDelayQProducer();
            boundedProducer.setDelegate(delayQProducer);
            return boundedProducer;
        }
    
        @Bean
        public DelayQConsumer delayQConsumer() {
            DelayQConsumerImpl consumer = new DelayQConsumerImpl();
            consumer.setInitWithQueue(DELAY_QUEUE_INTERACT_EVENT_AUTO_CLOSE);
            consumer.setNamespace(namespace);
            consumer.setPooledRedisClient(pooledRedisClient);
            return consumer;
        }
    
    }
    
    

    好了,怎么用呢?
    如下:


    提交事件(生产).png 到期执行(消费).png

    哎,原本想着趁过年放假在家好好进修看书看视频,然而果然还是没那自制力,这次新型冠状病毒疫情让我从1月24日到今天2月14日快在家呆一个月,人都快发霉了,接下来要立flag:
    新的一年要看完这个链接里全部的视频:https://www.bilibili.com/video/av79788004
    书的话要把现在有的基本都看一遍至少:
    《第一本docker书》
    《高性能MYSQL》
    《领域驱动设计》
    《Java虚拟机精讲》
    《设计模式》
    《Spring源码精析》
    《Spring Cloud与微服务构建》
    学习了解ServiceMesh与K8S,我相信这将是这两年的必备技术。
    然后愿接下来的十年一次的经济危机温柔点!

    相关文章

      网友评论

          本文标题:延迟队列的几种实现方式

          本文链接:https://www.haomeiwen.com/subject/wzcjzctx.html