美文网首页一些收藏
redis消息队列的四种实现方式之Sorted Set的延时队列

redis消息队列的四种实现方式之Sorted Set的延时队列

作者: 山巅自相见 | 来源:发表于2022-01-09 17:23 被阅读0次

    目录

    Zset里面,每一个成员都有一个所谓的分数:score。


    Redis-Sort Set数据结构

    把当前时间作为分数,因为Zset是有序的,时间越小的排名越靠前。所以使用Zset作为延时队列就充分利用了score。
    代码详解:

    /**
     * SortedSet生产数据
     * 当生产者发送消息的时候 实际生产中 相关参数 比如订单信息 过期时间等应该传入 可以考虑将订单信息json化存入redis
     */
    @PostMapping(value = "producerSortedSet")
    public void producerSortedSet() {
      try {
        for (int i = 0; i < 5; i++) {
          // 创建Sorted Set实例
          ZSetOperations zSetOperations = redisTemplate.opsForZSet();
          // 生成value和分数
          String value = UuidUtil.getUuid();
          double score = System.currentTimeMillis() + (i * 1000);
          // 添加数据
          zSetOperations.add("testSortedSetOne", value, score);
          System.out.println("testSortedSetOne在" + score + "添加进数据:" + value);
        }
      } catch (Exception exception) {
        exception.printStackTrace();
      }
    }
    
    /**
     * SortedSet取数据
     */
    @PostMapping(value = "consumerSortedSet")
    public void consumerSortedSet() {
      try {
        // 创建SortedSet实例
        ZSetOperations zSetOperations = redisTemplate.opsForZSet();
        while (true) {
          System.out.println("testSortedSetOne:" + zSetOperations.rangeByScore("testSortedSetOne", 0, -1));
          // 拿取数据
          Set<String> order = zSetOperations.rangeByScore("testSortedSetOne", 0, System.currentTimeMillis(), 0, 1);
            if (ObjectUtils.isEmpty(order)) {
            System.out.println("当前没有数据 当前线程睡眠3秒");
            TimeUnit.MICROSECONDS.sleep(3000);
            // 跳过本次循环 重新循环拿取数据
            continue;
          }
          // 利用迭代器拿取Set中的数据
          String str = order.iterator().next();
          // 分割线===================================
          if (zSetOperations.remove("testSortedSetOne", str) > 0) {
            /**
             * 业务处理
             */
            System.out.println(str);
          }
        }
      } catch (Exception exception) {
        exception.printStackTrace();
      }
    }
    

    生产者中,利用当前时间System.currentTimeMillis() + (i * 1000)作为分数,而消费者就有点麻烦了。Zset与List类型不同的是,Zset类型中并没有提供阻塞式拿取数据的方法,所以只能采用轮询式获取数据。
    那么有一个问题需要思考一下:消费者的当前代码格式是否满足线程安全?


    答案是:满足!
    因为分割线以上只是,从redis中查询数据,并没有做更新操作,而业务处理的代码在分割线下,所以分割线以上没有线程安全不安全这一说。再来看分割线以下,从redis中通过remove方法拿取元素并作业务处理的,有且仅有一个客户端是能够处理成功的,remove方法中只有一个线程能从redis中拿到唯一的元素。其实这也是没有问题的。

    相关文章

      网友评论

        本文标题:redis消息队列的四种实现方式之Sorted Set的延时队列

        本文链接:https://www.haomeiwen.com/subject/rhjhcrtx.html