SpringBoot redis系列 -延时队列(1)
背景
PS:笔者所在公司目前业务比较简单,相对项目架构也比较简单,暂时未有引入MQ等消息中间件,但是某天突然收到一个需求,需要在用户关注了我们的公众号之后,延迟几秒钟给用户在发送几条消息。最初考虑用要不DelayQueue或者定时线程池ScheduledThreadPoolExecutor走一波?感觉都不够优雅,刚好项目中有用到Redis,干脆就用Redis做个延时队列,也方便以后复用。当然,Redis实现的队列不是专业的MQ 对消息可靠性有高度要求的话,并不建议使用。比较简单的业务场景下还是可以用来异步延时解耦的。
9150e4e5gy1g9cxmqsitfj2073073wef.jpg正文
延迟队列可以通过Zset(有序列表实现),Zset类似于java中SortedSet和HashMap的结合体,它是一个Set结构,保证了内部value值的唯一,同时他还可以给每个value设置一个score作为排序权重,Redis会根据score自动排序,我们每次拿到的就是最先需要被消费的消息,利用这个特性我们可以很好实现延迟队列。
spring:
application:
name: redis-example
redis:
host: localhost
port: 6379
# redis有16个库 默认选择第0个使用
database: 0
password:
# 端口给个0 代表随机选择一个未被使用端口
server:
port: 0
封装一个统一的Message类,方便统一管理所有延迟消息格式
package com.smalljop.redis.example.queue;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* @description: 消息统一封装类
* @author: smalljop
* @create: 2020-01-03 10:20
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
/**
* 消息唯一标识
*/
private String id;
/**
* 消息渠道 如 订单 支付 代表不同业务类型
* 为消费时不同类去处理
*/
private String channel;
/**
* 具体消息 json
*/
private String body;
/**
* 延时时间 被消费时间 取当前时间戳+延迟时间
*/
private Long delayTime;
/**
* 创建时间
*/
private LocalDateTime createTime;
}
封装一个延时队列工具类 负责维护队列 提供常用操作
package com.smalljop.redis.example.queue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @description: 延时队列功能类
* @author: smalljop
* @create: 2020-01-03 10:11
**/
@Component
@Slf4j
@AllArgsConstructor
public class DelayingQueueService {
private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
private final StringRedisTemplate redisTemplate;
/**
* 可以不同业务用不同的key
*/
public static final String QUEUE_NAME = "message:queue";
/**
* 插入消息
*
* @param message
* @return
*/
@SneakyThrows
public Boolean push(Message message) {
Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, mapper.writeValueAsString(message), message.getDelayTime());
return addFlag;
}
/**
* 移除消息
*
* @param message
* @return
*/
@SneakyThrows
public Boolean remove(Message message) {
Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(message));
return remove > 0 ? true : false;
}
/**
* 拉取最新需要
* 被消费的消息
* rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
*
* @return
*/
public List<Message> pull() {
Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis());
if (strings == null) {
return null;
}
List<Message> msgList = strings.stream().map(msg -> {
Message message = null;
try {
message = mapper.readValue(msg, Message.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return message;
}).collect(Collectors.toList());
return msgList;
}
}
接下来写一个消息提供者,创建消息
package com.smalljop.redis.example.queue;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.UUID;
/**
* @description: 消息提供者
* @author: smalljop
* @create: 2020-01-03 10:44
**/
@Component
@Slf4j
@AllArgsConstructor
public class MessageProvider {
private final DelayingQueueService delayingQueueService;
private static String USER_CHANNEL = "USER_CHANNEL";
/**
* 发送消息
*
* @param messageContent
*/
public void sendMessage(String messageContent, long delay) {
try {
if (messageContent != null) {
String seqId = UUID.randomUUID().toString();
Message message = new Message();
//时间戳默认为毫秒 延迟5s即为 5*1000
long time = System.currentTimeMillis();
LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
message.setDelayTime(time + (delay * 1000));
message.setCreateTime(dateTime);
message.setBody(messageContent);
message.setId(seqId);
message.setChannel(USER_CHANNEL);
delayingQueueService.push(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
再创建一个消息消费者,定时轮训去拉取队列中的消息。
package com.smalljop.redis.example.queue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;
/**
* @description: 延迟队列消费
* @author: smalljop
* @create: 2020-01-03 10:51
**/
@Component
@Slf4j
@AllArgsConstructor
public class MessageConsumer {
private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
private final DelayingQueueService delayingQueueService;
/**
* 定时消费队列中的数据
* zset会对score进行排序 让最早消费的数据位于最前
* 拿最前的数据跟当前时间比较 时间到了则消费
*/
@Scheduled(cron = "*/1 * * * * *")
public void consumer() throws JsonProcessingException {
List<Message> msgList = delayingQueueService.pull();
if (null != msgList) {
long current = System.currentTimeMillis();
msgList.stream().forEach(msg -> {
// 已超时的消息拿出来消费
if (current >= msg.getDelayTime()) {
try {
log.info("消费消息:{}:消息创建时间:{},消费时间:{}", mapper.writeValueAsString(msg), msg.getCreateTime(), LocalDateTime.now());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//移除消息
delayingQueueService.remove(msg);
}
});
}
}
}
使用springboot的定时需要再启动类上加上开启定时注解
@SpringBootApplication
//打开定时
@EnableScheduling
public class RedisExampleApplication {
public static void main(String[] args) {
SpringApplication.run(RedisExampleApplication.class, args);
}
}
创建测试类,进行测试 延迟20s消费消息
@SpringBootTest
class MessageProviderTest {
@Autowired
private MessageProvider messageProvider;
@Test
void sendMessage() {
messageProvider.sendMessage("同时发送消息1", 20);
messageProvider.sendMessage("同时发送消息2", 20);
}
}
image-20200103115839751.png
2020-01-03 11:58:18.003 INFO 102328 --- [ scheduling-1] c.s.redis.example.queue.MessageConsumer : 消费消息:{"id":"cf2cd12e-a8f0-4e04-b92f-511b033ba1bb","channel":"USER_CHANNEL","body":"同时发送消息1","delayTime":1578023897393,"createTime":[2020,1,3,11,57,57,393000000]}:消息创建时间:2020-01-03T11:57:57.393,消费时间:2020-01-03T11:58:18.003
2020-01-03 11:58:19.002 INFO 102328 --- [ scheduling-1] c.s.redis.example.queue.MessageConsumer : 消费消息:{"id":"7c220af3-9215-4cf0-82cb-a01063c2b5db","channel":"USER_CHANNEL","body":"同时发送消息2","delayTime":1578023898778,"createTime":[2020,1,3,11,57,58,778000000]}:消息创建时间:2020-01-03T11:57:58.778,消费时间:2020-01-03T11:58:19.002
小结
延时队列一个实现大约就是这样子了,redis毕竟不是专业的MQ,如果是比较严谨的场景建议还是不要使用redis实现队列。代码连接地址:https://github.com/smalljop/my-blog-java-project
稿定设计导出-20200103-120507.gif
网友评论