美文网首页一些收藏
redis消息队列的四种实现方式之Sorted Set的延时队列

redis消息队列的四种实现方式之Sorted Set的延时队列

作者: 山巅自相见 | 来源:发表于2022-01-09 17:23 被阅读0次

目录

Zset里面,每一个成员都有一个所谓的分数:score。


Redis-Sort Set数据结构

把当前时间作为分数,因为Zset是有序的,时间越小的排名越靠前。所以使用Zset作为延时队列就充分利用了score。
代码详解:

/**
 * SortedSet生产数据
 * 当生产者发送消息的时候 实际生产中 相关参数 比如订单信息 过期时间等应该传入 可以考虑将订单信息json化存入redis
 */
@PostMapping(value = "producerSortedSet")
public void producerSortedSet() {
  try {
    for (int i = 0; i < 5; i++) {
      // 创建Sorted Set实例
      ZSetOperations zSetOperations = redisTemplate.opsForZSet();
      // 生成value和分数
      String value = UuidUtil.getUuid();
      double score = System.currentTimeMillis() + (i * 1000);
      // 添加数据
      zSetOperations.add("testSortedSetOne", value, score);
      System.out.println("testSortedSetOne在" + score + "添加进数据:" + value);
    }
  } catch (Exception exception) {
    exception.printStackTrace();
  }
}
/**
 * SortedSet取数据
 */
@PostMapping(value = "consumerSortedSet")
public void consumerSortedSet() {
  try {
    // 创建SortedSet实例
    ZSetOperations zSetOperations = redisTemplate.opsForZSet();
    while (true) {
      System.out.println("testSortedSetOne:" + zSetOperations.rangeByScore("testSortedSetOne", 0, -1));
      // 拿取数据
      Set<String> order = zSetOperations.rangeByScore("testSortedSetOne", 0, System.currentTimeMillis(), 0, 1);
        if (ObjectUtils.isEmpty(order)) {
        System.out.println("当前没有数据 当前线程睡眠3秒");
        TimeUnit.MICROSECONDS.sleep(3000);
        // 跳过本次循环 重新循环拿取数据
        continue;
      }
      // 利用迭代器拿取Set中的数据
      String str = order.iterator().next();
      // 分割线===================================
      if (zSetOperations.remove("testSortedSetOne", str) > 0) {
        /**
         * 业务处理
         */
        System.out.println(str);
      }
    }
  } catch (Exception exception) {
    exception.printStackTrace();
  }
}

生产者中,利用当前时间System.currentTimeMillis() + (i * 1000)作为分数,而消费者就有点麻烦了。Zset与List类型不同的是,Zset类型中并没有提供阻塞式拿取数据的方法,所以只能采用轮询式获取数据。
那么有一个问题需要思考一下:消费者的当前代码格式是否满足线程安全?


答案是:满足!
因为分割线以上只是,从redis中查询数据,并没有做更新操作,而业务处理的代码在分割线下,所以分割线以上没有线程安全不安全这一说。再来看分割线以下,从redis中通过remove方法拿取元素并作业务处理的,有且仅有一个客户端是能够处理成功的,remove方法中只有一个线程能从redis中拿到唯一的元素。其实这也是没有问题的。

相关文章

网友评论

    本文标题:redis消息队列的四种实现方式之Sorted Set的延时队列

    本文链接:https://www.haomeiwen.com/subject/rhjhcrtx.html