目录
- 《redis消息队列的四种实现方式之List的简单队列和延时队列》
- 《redis消息队列的四种实现方式之Sorted Set的延时队列》(本篇)
Zset里面,每一个成员都有一个所谓的分数:score。

把当前时间作为分数,因为Zset是有序的,时间越小的排名越靠前。所以使用Zset作为延时队列就充分利用了score。
代码详解:
/**
* SortedSet生产数据
* 当生产者发送消息的时候 实际生产中 相关参数 比如订单信息 过期时间等应该传入 可以考虑将订单信息json化存入redis
*/
@PostMapping(value = "producerSortedSet")
public void producerSortedSet() {
try {
for (int i = 0; i < 5; i++) {
// 创建Sorted Set实例
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
// 生成value和分数
String value = UuidUtil.getUuid();
double score = System.currentTimeMillis() + (i * 1000);
// 添加数据
zSetOperations.add("testSortedSetOne", value, score);
System.out.println("testSortedSetOne在" + score + "添加进数据:" + value);
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
/**
* SortedSet取数据
*/
@PostMapping(value = "consumerSortedSet")
public void consumerSortedSet() {
try {
// 创建SortedSet实例
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
while (true) {
System.out.println("testSortedSetOne:" + zSetOperations.rangeByScore("testSortedSetOne", 0, -1));
// 拿取数据
Set<String> order = zSetOperations.rangeByScore("testSortedSetOne", 0, System.currentTimeMillis(), 0, 1);
if (ObjectUtils.isEmpty(order)) {
System.out.println("当前没有数据 当前线程睡眠3秒");
TimeUnit.MICROSECONDS.sleep(3000);
// 跳过本次循环 重新循环拿取数据
continue;
}
// 利用迭代器拿取Set中的数据
String str = order.iterator().next();
// 分割线===================================
if (zSetOperations.remove("testSortedSetOne", str) > 0) {
/**
* 业务处理
*/
System.out.println(str);
}
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
生产者中,利用当前时间System.currentTimeMillis() + (i * 1000)
作为分数,而消费者就有点麻烦了。Zset与List类型不同的是,Zset类型中并没有提供阻塞式拿取数据的方法,所以只能采用轮询式获取数据。
那么有一个问题需要思考一下:消费者的当前代码格式是否满足线程安全?

答案是:满足!
因为分割线以上只是,从redis中查询数据,并没有做更新操作,而业务处理的代码在分割线下,所以分割线以上没有线程安全不安全这一说。再来看分割线以下,从redis中通过
remove
方法拿取元素并作业务处理的,有且仅有一个客户端是能够处理成功的,remove
方法中只有一个线程能从redis中拿到唯一的元素。其实这也是没有问题的。
网友评论