美文网首页
redis入门第十一课:实战之延迟队列

redis入门第十一课:实战之延迟队列

作者: 阿狸404 | 来源:发表于2019-05-07 15:09 被阅读0次

    1.需求

    在淘宝购物时,如果过了一定时间没有评论,系统自动替用户给商家进行评论。像这种延迟动作的需求,随处可见。那么有什么解决方案呢?

    • 方案一:定时24h扫表,如果订单成功时间大于24小时为评论,则自动添加评论。否则,不处理。
    • 方案二:使用消息中间件,在订单成功生产订单时,生产一条消息发送到mq,定时任务消费。
      这里采用方案2,但不是采用第三方mq,而是使用redis实现一个简单的优先队列来处理。

    2. 实现思路

    主要利用redis的有序集合作为一个队列来实现;

      1. 将整个redis作为一个消息池,消息以key-value形式存储,key为消息id,格式为:"Message:Pool:"+messageId(此id为uuid随机生成的id)。value为消息体;对key-value设置超时时间,以便于释放redis空间。超时时间是24h+0.5h;
      1. 用zset作为优先队列,当前时间+延迟时间(24h)作为score来排序维持优先级;
      1. 生产者在生成消息的时候,一方面将消息添加到消息池中,一方面将消息id放入到优先队列中,以便消费者消费;
      1. 消费者定时任务没24h执行一次,消费者从队列中取到消息id,然后从消息池中取到完整消息,比较当前时间和score来判断该消息是否可以消费;
      1. 消费成功,从队列中与消息池删除该消息;
      1. 消费失败,放回到消息池。

    3. 具体实现

    消息体message.java

    public class Message implements Serializable {
    
        private static final long serialVersionUID = -8646024765148921221L;
    
        /**
         * 消息id
         */
        private String id;
        /**
         * 消息延迟/毫秒
         */
        private long delay;
    
        /**
         * 消息剩余时间
         */
        private int ttl;
        /**
         * 消息体
         */
        private String body;
        /**
         * 创建时间
         */
        private long createTime;
    
    }
    

    队列RedisMQ.java

    public class RedisMQ {
        
        @Autowired
        private RedisUtil redisUtil;
        
        /**
         * 消息池前缀,以此前缀加上传递的消息id作为key,以消息
         * 的消息体body作为值存储
         */
        public static final String MSG_POOL = "Message:Pool:";
        /**
         * zset队列 名称 queue
         */
        public static final String QUEUE_NAME = "Message:Queue:";
        
        
        private static final int SEMIH = 30 * 60;//半小时
    
        
        /**
         * 消息存入消息池,过期时间多设置半小时。以保证通道延时消息不会过期
         * @param message
         * @return
         */
        public boolean addMsgPool(Message message) {
     
            if (null != message) {
                redisUtil.setSingleObjectInCache(MSG_POOL + message.getId(), 
                                                message.getBody(), 
                                                Integer.valueOf(message.getTtl() + SEMIH), 
                                                TimeUnit.SECONDS);
                return true;
            }
            return false;
        }
     
        /**
         * 从消息池中删除消息
         * @param id
         * @return
         */
        public void deMsgPool(String id) {
            redisUtil.remove(MSG_POOL + id);
        }
        
        /**
         * 向队列中添加元素
         * @param key
         * @param score
         * @param val
         */
        public void pushMessage(String key, long score, String val){
            redisUtil.zSet(key, val, score);
        }
        /**
         * 从队列中删除元素
         * @param key
         * @param val
         */
        public void popMessage(String key,String val){
            redisUtil.remove(key, val);
        }
    
    }
    

    生产者

    public class MessageProvider {
        
        private static int delay = 60 * 60 * 24;//24h
        
        
        @Autowired
        private RedisMQ redisMQ;
        
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        
        /**
         * 发送消息
         * @param content
         * @param routingKey
         */
        public void sendMessage(String content){
            if (StringUtils.isEmpty(content)) {
                System.out.println("消息为空!");
            } else {
                String messageId = UUID.randomUUID().toString();
                // 将有效信息放入消息队列和消息池中,延迟24小时,而且过期时间也为24h
                Message message = new Message(  messageId, 
                                                delay, 
                                                delay, 
                                                content, 
                                                System.currentTimeMillis());
                redisMQ.addMsgPool(message);
                
                //当前时间加上延时的时间,作为score
                Long delayTime = message.getCreateTime() + message.getDelay();
                String d = sdf.format(message.getCreateTime());
                System.out.println("当前时间:" + d+",消费的时间:" + sdf.format(delayTime));
        
                //入队列
                redisMQ.pushMessage(RedisMQ.QUEUE_NAME, delayTime, message.getId());
            }
        }
    
    }
    

    消费者

    public class MessageConsumer {
    
        @Autowired
        private RedisUtil redisUtil;
    
        @Autowired
        private RedisMQ redisMQ;
    
        @Autowired
        private MessageProvider provider;
    
        /**
         * cron表达式: second(秒), minute(分), hour(时),day of month(日),month(月),day of
         * week(周几)
         *
         */
        @Scheduled(cron = "* * 24 * * *")
        public void consumerMessage() {
            // 取出队列中截止当前时间所有的集合成员。
            Set<String> values = redisUtil.rangeByScore(RedisMQ.QUEUE_NAME, 0,System.currentTimeMillis());
            if (values != null) {
                Long current = System.currentTimeMillis();
                for (String messageId : values) {
                    String body = "";
                    body = redisUtil.getByKeyFromCache(RedisMQ.MSG_POOL + messageId, String.class);
                    Long score = redisUtil.getScore(RedisMQ.MSG_POOL + messageId,body).longValue();
                    if (current >= score) {
                        // 消息已过期,消费消息
                        try {
                            // 开始消费消息了
                            System.out.println("开始消费消息了哦!我要开始检验是否评论了,如果未评论,默认好评哦!" + body);
                        } catch (Exception e) {
                            // 消费失败,重新放回队列
                            provider.sendMessage(body);
                        } finally {
                            redisMQ.popMessage(RedisMQ.QUEUE_NAME, messageId);
                            redisMQ.deMsgPool(messageId);
                        }
                    }
    
                }
            }
    
        }
    
    }
    

    至于RedisUtil,仅提供几个用到的方法。

    /**
         * 添加元素到有序集合
         * 
         * @param key
         * @param obj
         * @param score
         */
        public void zSet(String key, Object obj, double score) {
            if (!isEnableRedisCache) {
                return;
            }
    
            zsetOps.add(key, FastJosnUtils.toJson(obj),score);
        }
    
        /**
         * 获取有序集合中指定key-value的分数
         * @param key
         * @param obj
         * @return
         */
        public Double  getScore(String key,Object obj) {
            return zsetOps.score(key, obj);
        }
    
        /**
         * 从有序集合中删除元素
         * @param key
         * @param values
         * @return
         */
        public Long remove(String key, Object... values){
            return zsetOps.remove(key, values);
        }
        /**
         * 返回有序集合中指定分数区间内的成员,分数由低到高排序
         * @param key
         * @param min
         * @param max
         * @return
         */
        public Set<String> rangeByScore(String key, double min, double max){
            return zsetOps.rangeByScore(key, min, max);
        }
        /**
         * 移除有序集合中指定分数区间内的成员
         * @param key
         * @param min
         * @param max
         * @return
         */
        public Long removeRangeByScore(String key, double min, double max){
            return zsetOps.removeRangeByScore(key, min, max);
        }
    

    相关文章

      网友评论

          本文标题:redis入门第十一课:实战之延迟队列

          本文链接:https://www.haomeiwen.com/subject/ftgeoqtx.html