美文网首页
SpringBoot redis系列 -延时队列(1)

SpringBoot redis系列 -延时队列(1)

作者: smalljop | 来源:发表于2020-01-06 13:38 被阅读0次

SpringBoot redis系列 -延时队列(1)

背景

PS:笔者所在公司目前业务比较简单,相对项目架构也比较简单,暂时未有引入MQ等消息中间件,但是某天突然收到一个需求,需要在用户关注了我们的公众号之后,延迟几秒钟给用户在发送几条消息。最初考虑用要不DelayQueue或者定时线程池ScheduledThreadPoolExecutor走一波?感觉都不够优雅,刚好项目中有用到Redis,干脆就用Redis做个延时队列,也方便以后复用。当然,Redis实现的队列不是专业的MQ 对消息可靠性有高度要求的话,并不建议使用。比较简单的业务场景下还是可以用来异步延时解耦的。

9150e4e5gy1g9cxmqsitfj2073073wef.jpg

正文

延迟队列可以通过Zset(有序列表实现),Zset类似于java中SortedSet和HashMap的结合体,它是一个Set结构,保证了内部value值的唯一,同时他还可以给每个value设置一个score作为排序权重,Redis会根据score自动排序,我们每次拿到的就是最先需要被消费的消息,利用这个特性我们可以很好实现延迟队列。

spring:
  application:
    name: redis-example
  redis:
    host: localhost
    port: 6379
    # redis有16个库 默认选择第0个使用
    database: 0
    password:
# 端口给个0 代表随机选择一个未被使用端口
server:
  port: 0

封装一个统一的Message类,方便统一管理所有延迟消息格式

package com.smalljop.redis.example.queue;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

/**
 * @description: 消息统一封装类
 * @author: smalljop
 * @create: 2020-01-03 10:20
 **/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
    /**
     * 消息唯一标识
     */
    private String id;
    /**
     * 消息渠道 如 订单 支付 代表不同业务类型
     * 为消费时不同类去处理
     */
    private String channel;
    /**
     * 具体消息 json
     */
    private String body;

    /**
     * 延时时间 被消费时间  取当前时间戳+延迟时间
     */
    private Long delayTime;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;
}

封装一个延时队列工具类 负责维护队列 提供常用操作

package com.smalljop.redis.example.queue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * @description: 延时队列功能类
 * @author: smalljop
 * @create: 2020-01-03 10:11
 **/
@Component
@Slf4j
@AllArgsConstructor
public class DelayingQueueService {

    private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();

    private final StringRedisTemplate redisTemplate;

    /**
     * 可以不同业务用不同的key
     */
    public static final String QUEUE_NAME = "message:queue";


    /**
     * 插入消息
     *
     * @param message
     * @return
     */
    @SneakyThrows
    public Boolean push(Message message) {
        Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, mapper.writeValueAsString(message), message.getDelayTime());
        return addFlag;
    }

    /**
     * 移除消息
     *
     * @param message
     * @return
     */
    @SneakyThrows
    public Boolean remove(Message message) {
        Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(message));
        return remove > 0 ? true : false;
    }


    /**
     * 拉取最新需要
     * 被消费的消息
     * rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
     *
     * @return
     */
    public List<Message> pull() {
        Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis());
        if (strings == null) {
            return null;
        }
        List<Message> msgList = strings.stream().map(msg -> {
            Message message = null;
            try {
                message = mapper.readValue(msg, Message.class);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return message;
        }).collect(Collectors.toList());
        return msgList;
    }


}

接下来写一个消息提供者,创建消息

package com.smalljop.redis.example.queue;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.UUID;

/**
 * @description: 消息提供者
 * @author: smalljop
 * @create: 2020-01-03 10:44
 **/
@Component
@Slf4j
@AllArgsConstructor
public class MessageProvider {

    private final DelayingQueueService delayingQueueService;

    private static String USER_CHANNEL = "USER_CHANNEL";

    /**
     * 发送消息
     *
     * @param messageContent
     */
    public void sendMessage(String messageContent, long delay) {
        try {
            if (messageContent != null) {
                String seqId = UUID.randomUUID().toString();
                Message message = new Message();
                //时间戳默认为毫秒 延迟5s即为 5*1000
                long time = System.currentTimeMillis();
                LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
                message.setDelayTime(time + (delay * 1000));
                message.setCreateTime(dateTime);
                message.setBody(messageContent);
                message.setId(seqId);
                message.setChannel(USER_CHANNEL);
                delayingQueueService.push(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

再创建一个消息消费者,定时轮训去拉取队列中的消息。

package com.smalljop.redis.example.queue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;

/**
 * @description: 延迟队列消费
 * @author: smalljop
 * @create: 2020-01-03 10:51
 **/
@Component
@Slf4j
@AllArgsConstructor
public class MessageConsumer {
    private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
    private final DelayingQueueService delayingQueueService;

    /**
     * 定时消费队列中的数据
     * zset会对score进行排序 让最早消费的数据位于最前
     * 拿最前的数据跟当前时间比较 时间到了则消费
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void consumer() throws JsonProcessingException {
        List<Message> msgList = delayingQueueService.pull();
        if (null != msgList) {
            long current = System.currentTimeMillis();
            msgList.stream().forEach(msg -> {
                // 已超时的消息拿出来消费
                if (current >= msg.getDelayTime()) {
                    try {
                        log.info("消费消息:{}:消息创建时间:{},消费时间:{}", mapper.writeValueAsString(msg), msg.getCreateTime(), LocalDateTime.now());
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                    //移除消息
                    delayingQueueService.remove(msg);
                }
            });
        }
    }


}

使用springboot的定时需要再启动类上加上开启定时注解

@SpringBootApplication
//打开定时
@EnableScheduling
public class RedisExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisExampleApplication.class, args);
    }

}

创建测试类,进行测试 延迟20s消费消息

@SpringBootTest
class MessageProviderTest {

    @Autowired
    private MessageProvider messageProvider;

    @Test
    void sendMessage() {
        messageProvider.sendMessage("同时发送消息1", 20);
        messageProvider.sendMessage("同时发送消息2", 20);
    }
}
image-20200103115839751.png
2020-01-03 11:58:18.003  INFO 102328 --- [   scheduling-1] c.s.redis.example.queue.MessageConsumer  : 消费消息:{"id":"cf2cd12e-a8f0-4e04-b92f-511b033ba1bb","channel":"USER_CHANNEL","body":"同时发送消息1","delayTime":1578023897393,"createTime":[2020,1,3,11,57,57,393000000]}:消息创建时间:2020-01-03T11:57:57.393,消费时间:2020-01-03T11:58:18.003
2020-01-03 11:58:19.002  INFO 102328 --- [   scheduling-1] c.s.redis.example.queue.MessageConsumer  : 消费消息:{"id":"7c220af3-9215-4cf0-82cb-a01063c2b5db","channel":"USER_CHANNEL","body":"同时发送消息2","delayTime":1578023898778,"createTime":[2020,1,3,11,57,58,778000000]}:消息创建时间:2020-01-03T11:57:58.778,消费时间:2020-01-03T11:58:19.002

小结

延时队列一个实现大约就是这样子了,redis毕竟不是专业的MQ,如果是比较严谨的场景建议还是不要使用redis实现队列。代码连接地址:https://github.com/smalljop/my-blog-java-project

稿定设计导出-20200103-120507.gif

相关文章

网友评论

      本文标题:SpringBoot redis系列 -延时队列(1)

      本文链接:https://www.haomeiwen.com/subject/maqkactx.html