美文网首页
SpringBoot redis系列 -延时队列(1)

SpringBoot redis系列 -延时队列(1)

作者: smalljop | 来源:发表于2020-01-06 13:38 被阅读0次

    SpringBoot redis系列 -延时队列(1)

    背景

    PS:笔者所在公司目前业务比较简单,相对项目架构也比较简单,暂时未有引入MQ等消息中间件,但是某天突然收到一个需求,需要在用户关注了我们的公众号之后,延迟几秒钟给用户在发送几条消息。最初考虑用要不DelayQueue或者定时线程池ScheduledThreadPoolExecutor走一波?感觉都不够优雅,刚好项目中有用到Redis,干脆就用Redis做个延时队列,也方便以后复用。当然,Redis实现的队列不是专业的MQ 对消息可靠性有高度要求的话,并不建议使用。比较简单的业务场景下还是可以用来异步延时解耦的。

    9150e4e5gy1g9cxmqsitfj2073073wef.jpg

    正文

    延迟队列可以通过Zset(有序列表实现),Zset类似于java中SortedSet和HashMap的结合体,它是一个Set结构,保证了内部value值的唯一,同时他还可以给每个value设置一个score作为排序权重,Redis会根据score自动排序,我们每次拿到的就是最先需要被消费的消息,利用这个特性我们可以很好实现延迟队列。

    spring:
      application:
        name: redis-example
      redis:
        host: localhost
        port: 6379
        # redis有16个库 默认选择第0个使用
        database: 0
        password:
    # 端口给个0 代表随机选择一个未被使用端口
    server:
      port: 0
    
    

    封装一个统一的Message类,方便统一管理所有延迟消息格式

    package com.smalljop.redis.example.queue;
    
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.time.LocalDateTime;
    
    /**
     * @description: 消息统一封装类
     * @author: smalljop
     * @create: 2020-01-03 10:20
     **/
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Message {
        /**
         * 消息唯一标识
         */
        private String id;
        /**
         * 消息渠道 如 订单 支付 代表不同业务类型
         * 为消费时不同类去处理
         */
        private String channel;
        /**
         * 具体消息 json
         */
        private String body;
    
        /**
         * 延时时间 被消费时间  取当前时间戳+延迟时间
         */
        private Long delayTime;
    
        /**
         * 创建时间
         */
        private LocalDateTime createTime;
    }
    
    

    封装一个延时队列工具类 负责维护队列 提供常用操作

    package com.smalljop.redis.example.queue;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import lombok.AllArgsConstructor;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    import java.util.Set;
    import java.util.stream.Collectors;
    
    /**
     * @description: 延时队列功能类
     * @author: smalljop
     * @create: 2020-01-03 10:11
     **/
    @Component
    @Slf4j
    @AllArgsConstructor
    public class DelayingQueueService {
    
        private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
    
        private final StringRedisTemplate redisTemplate;
    
        /**
         * 可以不同业务用不同的key
         */
        public static final String QUEUE_NAME = "message:queue";
    
    
        /**
         * 插入消息
         *
         * @param message
         * @return
         */
        @SneakyThrows
        public Boolean push(Message message) {
            Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, mapper.writeValueAsString(message), message.getDelayTime());
            return addFlag;
        }
    
        /**
         * 移除消息
         *
         * @param message
         * @return
         */
        @SneakyThrows
        public Boolean remove(Message message) {
            Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(message));
            return remove > 0 ? true : false;
        }
    
    
        /**
         * 拉取最新需要
         * 被消费的消息
         * rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
         *
         * @return
         */
        public List<Message> pull() {
            Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis());
            if (strings == null) {
                return null;
            }
            List<Message> msgList = strings.stream().map(msg -> {
                Message message = null;
                try {
                    message = mapper.readValue(msg, Message.class);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
                return message;
            }).collect(Collectors.toList());
            return msgList;
        }
    
    
    }
    
    

    接下来写一个消息提供者,创建消息

    package com.smalljop.redis.example.queue;
    
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import java.time.Instant;
    import java.time.LocalDateTime;
    import java.time.ZoneOffset;
    import java.util.UUID;
    
    /**
     * @description: 消息提供者
     * @author: smalljop
     * @create: 2020-01-03 10:44
     **/
    @Component
    @Slf4j
    @AllArgsConstructor
    public class MessageProvider {
    
        private final DelayingQueueService delayingQueueService;
    
        private static String USER_CHANNEL = "USER_CHANNEL";
    
        /**
         * 发送消息
         *
         * @param messageContent
         */
        public void sendMessage(String messageContent, long delay) {
            try {
                if (messageContent != null) {
                    String seqId = UUID.randomUUID().toString();
                    Message message = new Message();
                    //时间戳默认为毫秒 延迟5s即为 5*1000
                    long time = System.currentTimeMillis();
                    LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
                    message.setDelayTime(time + (delay * 1000));
                    message.setCreateTime(dateTime);
                    message.setBody(messageContent);
                    message.setId(seqId);
                    message.setChannel(USER_CHANNEL);
                    delayingQueueService.push(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    

    再创建一个消息消费者,定时轮训去拉取队列中的消息。

    package com.smalljop.redis.example.queue;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import lombok.AllArgsConstructor;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    import java.util.List;
    import java.util.Set;
    
    /**
     * @description: 延迟队列消费
     * @author: smalljop
     * @create: 2020-01-03 10:51
     **/
    @Component
    @Slf4j
    @AllArgsConstructor
    public class MessageConsumer {
        private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
        private final DelayingQueueService delayingQueueService;
    
        /**
         * 定时消费队列中的数据
         * zset会对score进行排序 让最早消费的数据位于最前
         * 拿最前的数据跟当前时间比较 时间到了则消费
         */
        @Scheduled(cron = "*/1 * * * * *")
        public void consumer() throws JsonProcessingException {
            List<Message> msgList = delayingQueueService.pull();
            if (null != msgList) {
                long current = System.currentTimeMillis();
                msgList.stream().forEach(msg -> {
                    // 已超时的消息拿出来消费
                    if (current >= msg.getDelayTime()) {
                        try {
                            log.info("消费消息:{}:消息创建时间:{},消费时间:{}", mapper.writeValueAsString(msg), msg.getCreateTime(), LocalDateTime.now());
                        } catch (JsonProcessingException e) {
                            e.printStackTrace();
                        }
                        //移除消息
                        delayingQueueService.remove(msg);
                    }
                });
            }
        }
    
    
    }
    

    使用springboot的定时需要再启动类上加上开启定时注解

    @SpringBootApplication
    //打开定时
    @EnableScheduling
    public class RedisExampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RedisExampleApplication.class, args);
        }
    
    }
    

    创建测试类,进行测试 延迟20s消费消息

    @SpringBootTest
    class MessageProviderTest {
    
        @Autowired
        private MessageProvider messageProvider;
    
        @Test
        void sendMessage() {
            messageProvider.sendMessage("同时发送消息1", 20);
            messageProvider.sendMessage("同时发送消息2", 20);
        }
    }
    
    image-20200103115839751.png
    2020-01-03 11:58:18.003  INFO 102328 --- [   scheduling-1] c.s.redis.example.queue.MessageConsumer  : 消费消息:{"id":"cf2cd12e-a8f0-4e04-b92f-511b033ba1bb","channel":"USER_CHANNEL","body":"同时发送消息1","delayTime":1578023897393,"createTime":[2020,1,3,11,57,57,393000000]}:消息创建时间:2020-01-03T11:57:57.393,消费时间:2020-01-03T11:58:18.003
    2020-01-03 11:58:19.002  INFO 102328 --- [   scheduling-1] c.s.redis.example.queue.MessageConsumer  : 消费消息:{"id":"7c220af3-9215-4cf0-82cb-a01063c2b5db","channel":"USER_CHANNEL","body":"同时发送消息2","delayTime":1578023898778,"createTime":[2020,1,3,11,57,58,778000000]}:消息创建时间:2020-01-03T11:57:58.778,消费时间:2020-01-03T11:58:19.002
    
    

    小结

    延时队列一个实现大约就是这样子了,redis毕竟不是专业的MQ,如果是比较严谨的场景建议还是不要使用redis实现队列。代码连接地址:https://github.com/smalljop/my-blog-java-project

    稿定设计导出-20200103-120507.gif

    相关文章

      网友评论

          本文标题:SpringBoot redis系列 -延时队列(1)

          本文链接:https://www.haomeiwen.com/subject/maqkactx.html