美文网首页
RedissonDelayedQueue源码解读

RedissonDelayedQueue源码解读

作者: 圣瓦伦 | 来源:发表于2021-01-21 22:14 被阅读0次

    maven

                <dependency>
                    <groupId>org.redisson</groupId>
                    <artifactId>redisson</artifactId>
                    <version>3.11.4</version>
                </dependency>
    

    使用demo

    public static void main(String[] args) {
            Config config = new Config();
            config.useClusterServers()
                    .addNodeAddress("redis://10.23.3.24:7000")
                    .addNodeAddress("redis://10.23.3.24:7001")
                    .addNodeAddress("redis://10.23.3.24:7002")
            RedissonClient redisson = Redisson.create(config);
            RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue2");
            RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
            new Thread() {
                public void run() {
                    while(true) {
                        try {
                            //阻塞队列有数据就返回,否则wait
                            String take = blockingQueue.take();
                            System.out.println("take:"+take);
                            long time = Long.valueOf(take.split("---")[0]);
                            //消费可能产生的延迟
                            System.out.println("delay:"+(System.currentTimeMillis()-time-2*60*1000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
            }.start();
    
            for(int i=1;i<=100000;i++) {
                // 向阻塞队列放入数据
                long time = System.currentTimeMillis();
                delayedQueue.offer(time+"---"+i, 2, TimeUnit.MINUTES);
            }
            delayedQueue.destroy();
        }
    

    这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,元素到期之后进入blockingQueue,通过while循环取出到期的元素。

    demo测试了延迟队列到期之后的延迟,结果比较震惊,达到了160s。分析发现,其实delayedQueue进入blockingQueue是准时的,但是同一时间到期的元素过多导致blockingQueue堆积,且只有一个线程消费blockingQueue,所以导致延迟越来越高;搞成两个线程消费延迟就正常到毫秒级了。

    源码

    RedissonDelayedQueue初始化

    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
            super(codec, commandExecutor, name);
            channelName = prefixName("redisson_delay_queue_channel", getName());
            queueName = prefixName("redisson_delay_queue", getName());
            timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
            //新建一个调度任务
            QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
                //pushTaskAsync:异步将到期元素转移到阻塞队列
                @Override
                protected RFuture<Long> pushTaskAsync() {
                    //这里使用一段lua脚本,KEYS[1]为getName(),KEYS[2]为timeoutSetName,KEYS[3]为queueName;ARGV[1]为当前时间戳,ARGV[2]为100
                    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                            //这里调用zrangebyscore,对timeoutSetName的zset使用timeout参数进行排序,取得分介于0和当前时间戳的元素(即到期的元素),取前100条
                            "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                          + "if #expiredValues > 0 then "
                              + "for i, v in ipairs(expiredValues) do "
                                  + "local randomId, value = struct.unpack('dLc0', v);"
                                   //调用rpush移交到阻塞队列
                                  + "redis.call('rpush', KEYS[1], value);"
                                  //调用lrem从元素队列移除
                                  + "redis.call('lrem', KEYS[3], 1, v);"
                              + "end; "
                              //从timeoutSetName的zset中删除掉已经处理的这些元素
                              + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                          + "end; "
                            // get startTime from scheduler queue head task
                            //取timeoutSetName的zset的第一个元素的得分返回,如果没有返回nil,后面有用
                          + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                          + "if v[1] ~= nil then "
                             + "return v[2]; "
                          + "end "
                          + "return nil;",
                          Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                          System.currentTimeMillis(), 100);
                }
                
                @Override
                protected RTopic getTopic() {
                    return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
                }
            };
            //开启任务调度
            queueTransferService.schedule(queueName, task);
            
            this.queueTransferService = queueTransferService;
        }
    

    QueueTransferService.schedule

    public class QueueTransferService {
    
        private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();
        
        public synchronized void schedule(String name, QueueTransferTask task) {
            QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
            if (oldTask == null) {
                //旧调度不存在,直接开启
                task.start();
            } else {
                //调度已存在,则数量+1
                oldTask.incUsage();
            }
        }
        
        public synchronized void remove(String name) {
            QueueTransferTask task = tasks.get(name);
            if (task != null) {
                if (task.decUsage() == 0) {
                    tasks.remove(name, task);
                    task.stop();
                }
            }
        }
    }
    

    QueueTransferTask.start

    public void start() {
            RTopic<Long> schedulerTopic = getTopic();
            //订阅时候触发pushTask
            statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
                @Override
                public void onSubscribe(String channel) {
                    pushTask();
                }
            });
            //监听消息,收到消息执行scheduleTask
            messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {
                @Override
                public void onMessage(CharSequence channel, Long startTime) {
                    scheduleTask(startTime);
                }
            });
        }
    
        private void scheduleTask(final Long startTime) {
            TimeoutTask oldTimeout = lastTimeout.get();
            if (startTime == null) {
                return;
            }
            
            if (oldTimeout != null) {
                oldTimeout.getTask().cancel();
            }
            //delay:即还有多久到期
            long delay = startTime - System.currentTimeMillis();
            if (delay > 10) {
                //delay大于10ms,则新建一个定时器,到期之后再执行pushTask
                //这里底层通过HashedWheelTimer实现
                Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                    @Override
                    public void run(Timeout timeout) throws Exception {
                        pushTask();
                        
                        TimeoutTask currentTimeout = lastTimeout.get();
                        if (currentTimeout.getTask() == timeout) {
                            lastTimeout.compareAndSet(currentTimeout, null);
                        }
                    }
                }, delay, TimeUnit.MILLISECONDS);
                if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                    timeout.cancel();
                }
            } else {
                //delay小于10ms,立即执行pushTask
                pushTask();
            }
        }
    
    
    private void pushTask() {
            //这里就是执行初始化时候构建的QueueTransferTask的pushTaskAsync方法
            RFuture<Long> startTimeFuture = pushTaskAsync();
            startTimeFuture.onComplete((res, e) -> {
                if (e != null) {
                    if (e instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error(e.getMessage(), e);
                    //如果执行异常,则5s之后继续调度
                    scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                    return;
                }
                //res即pushTaskAsync方法最后取timeoutSetName的zset的第一个元素的得分返回
                if (res != null) {
                    //如果不为空,继续执行调度
                    scheduleTask(res);
                }
            });
        }
    

    RedissonDelayedQueue.offer

        @Override
        public void offer(V e, long delay, TimeUnit timeUnit) {
            get(offerAsync(e, delay, timeUnit));
        }
        
        @Override
        public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
            if (delay < 0) {
                throw new IllegalArgumentException("Delay can't be negative");
            }
            
            long delayInMs = timeUnit.toMillis(delay);
            //到期时间:当前时间+延迟时间
            long timeout = System.currentTimeMillis() + delayInMs;
         
            long randomId = ThreadLocalRandom.current().nextLong();
            //这里使用的是一段lua脚本,其中keys参数数组有四个值,KEYS[1]为getName(), KEYS[2]为timeoutSetName, KEYS[3]为queueName, KEYS[4]为channelName
            //变量有三个,ARGV[1]为timeout,ARGV[2]为randomId,ARGV[3]为encode(e)
            return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                    "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
                  //对timeoutSetName的zset添加一个结构体,其score为timeout值
                  + "redis.call('zadd', KEYS[2], ARGV[1], value);"
                  //对queueName的list的表尾添加结构体
                  + "redis.call('rpush', KEYS[3], value);"
                  // if new object added to queue head when publish its startTime 
                  // to all scheduler workers 
                  //判断timeoutSetName的zset的第一个元素是否是当前的结构体,如果是则对channel发布timeout消息
                  //这个作用是判断第一个添加的元素,触发定时器(初始化时候订阅了channel->onMessage->scheduleTask)
                  + "local v = redis.call('zrange', KEYS[2], 0, 0); "
                  + "if v[1] == value then "
                     + "redis.call('publish', KEYS[4], ARGV[1]); "
                  + "end;",
                  Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
                  timeout, randomId, encode(e));
        }
    

    总结

    • Redisson延迟队列使用三个结构来存储,一个是queueName的list,值是添加的元素;一个是timeoutSetName的zset,值是添加的元素,score为timeout值;还有一个是getName()的blockingQueue,值是到期的元素。
    • 主要方法是逻辑是:将元素及延时信息入队,之后定时任务将到期的元素转移到阻塞队列。
    • 使用HashedWheelTimer做定时,定时到期之后从zset中取头部100个到期元素,所以定时和转移到阻塞队列是解耦的,无论是哪个task触发的pushTask,最终都是先取zset的头部先到期的元素。
    • 元素数据都是存在redis服务端的,客户端只是执行HashedWheelTimer任务,所以单个客户端挂了不影响服务端数据,做到分布式的高可用。

    相关文章

      网友评论

          本文标题:RedissonDelayedQueue源码解读

          本文链接:https://www.haomeiwen.com/subject/nzkdzktx.html