美文网首页一些收藏
Kafka时间轮算法

Kafka时间轮算法

作者: 愤怒的老照 | 来源:发表于2020-07-27 18:07 被阅读0次

    1 背景

    Kafka存在大量的延时操作,比如延时生产、延时消费或者延时删除,实现延时操作有很多办法,JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1),所以kafka并没有像jdk的Timer或者DelayQueue那样来实现延时的功能,而是基于时间轮实现延时。

    2 时间轮

    2.1 什么是时间轮

    image.png

    对比上图的⌚️,秒针在0~59秒都会落一次,假如任务的延时时间是0——59,那秒针落在哪个时刻,哪个时刻对应的任务执行。
    时间轮的本质是一个环形数组,当指针每走一步,就获取到当前刻度挂载的任务进行执行


    image.png

    2.2 最简单的版本实现

    class Task {
    
    }
    @Data
    class Bucket {
        @Delegate
        private List<Task> taskList = new ArrayList<>();
    }
    class Timer {
        private Bucket[] bucketList = new Bucket[60];
    
        public Timer (){
            Arrays.fill(bucketList, new Bucket());
        }
        private int getBucketByTimestamp(long timestamp){
            return (int)(timestamp / 1000) % 60;
        }
    
        public boolean addTask(Task task, long timestamp) {
            int bucket = this.getBucketByTimestamp(timestamp);
    
            bucketList[bucket].add(task);
            return true;
        }
    
        public Bucket getBucket(long timestamp){
            int bucket = this.getBucketByTimestamp(timestamp);
            
            return bucketList[bucket];
        }
        
        public void run() throws InterruptedException {
            while (true) {
                Bucket bucket = this.getBucket(System.currentTimeMillis());
                List<Task> taskList = bucket.getTaskList();
                
                if (taskList.size() != 0) {
                    // 线程池do task
                }
                
                TimeUnit.SECONDS.sleep(1);
            }
        }
    }
    
    

    3 长时间的任务

    上面的实现表示的范围有限(只能表示60s之内的定时任务),超过的就没办法表示了,解决办法有两个

    3.1 记录圈数

    在记录任务的时候,同时记录下任务的圈数,当圈数为0时,代表需要执行任务,否则圈数-1。这种实现方式存在缺点,每一个刻度上的任务非常多,导致效率降低,和HashMap的hash冲突类似。

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Task {
        private int num = 0; // 圈数
    }
    @Data
    class Bucket {
        @Delegate
        private List<Task> taskList = new ArrayList<>();
    }
    class Timer {
        private Bucket[] bucketList = new Bucket[60];
    
        public Timer (){
            Arrays.fill(bucketList, new Bucket());
        }
        private int getBucketByTimestamp(long timestamp){
            return (int)(timestamp / 1000) % 60;
        }
    
        public boolean addTask(Task task, long timestamp) {
            int bucket = this.getBucketByTimestamp(timestamp);
    
            bucketList[bucket].add(task);
            return true;
        }
    
        public List<Task> getTaskList(long timestamp){
            int bucket = this.getBucketByTimestamp(timestamp);
            bucketList[bucket].getTaskList().stream().filter(it -> it.getNum() != 0).forEach(it -> it.setNum(it.getNum() - 1));
            
            return bucketList[bucket].getTaskList().stream().filter(it -> it.getNum() == 0).collect(Collectors.toList());
        }
    
        public void run() throws InterruptedException {
            while (true) {
                List<Task> taskList = this.getTaskList(System.currentTimeMillis());
    
                if (taskList.size() != 0) {
                    // 线程池do task
                }
    
                TimeUnit.SECONDS.sleep(1);
            }
        }
    }
    

    3.2 层级时间轮

    对于只有三个指针的表来说,最大能表示12个小时,超过了12小时这个范围,时间就会产生歧义。如果我们加多几个指针呢?比如说我们有秒针,分针,时针,上下午针,天针,月针,年针。。。而且,它并不需要占用很大的内存。

    对应到层级时间轮,当超过当前时间轮表示范围后,尝试添加到上层的时间轮(不设上限)。比如有一个600s后执行的任务,秒级时间轮不能处理,需要尝试添加到分钟级别;后续如果任务剩余时间小于60s,对任务进行降级,将任务添加到秒级时间轮,最后保证所有任务都能执行。


    image.png
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Task {
        private int num = 0; // 圈数
    }
    @Data
    class Bucket {
        @Delegate
        private List<Task> taskList = new ArrayList<>();
    }
    
    @Builder
    class Timer {
        // 一个bucket的大小
        private long tickMs;
    
        // 有多少bucket
        private int wheelSize;
    
        // 上一级时间轮
        private Timer upTimer;
    
        // 时间轮的时间跨度
        private long interval;
    
        private Bucket[] bucketList = new Bucket[60];
    
        public Timer (){
            Arrays.fill(bucketList, new Bucket());
        }
        private int getBucketByTimestamp(long timestamp){
            return (int)(timestamp / 1000) % 60;
        }
    
        private synchronized Timer getUpTimer(){
            if (Objects.isNull(this.upTimer)) {
                this.upTimer = Timer.builder()
                        .interval(this.interval * 60)
                        .tickMs(this.tickMs * 60)
                        .wheelSize(60)
                        .build();
            }
    
            return this.upTimer;
        }
    
        private void advance(){
            if (Objects.isNull(this.upTimer)) {
                return ;
            }
    
            // 推动upTimer状态,将可以降级的bucket进行降级处理。
        }
    
        public boolean addTask(Task task, long timestamp) {
            long delayMs = timestamp - System.currentTimeMillis();
            if (delayMs < tickMs) {
                // 过期了,可以抛给上层处理
                return false;
            } else {
                // 没过期,扔进当前时间轮的某个槽中
                if (delayMs < interval) {
                    int bucketIndex = (int) (((delayMs + System.currentTimeMillis()) / tickMs) % wheelSize);
    
                    Bucket bucket = bucketList[bucketIndex];
                    bucket.add(task);
                } else {
                    // 扔到上一级
                    Timer timeWheel = this.getUpTimer();
                    timeWheel.addTask(task, timestamp);
                }
            }
    
            return true;
        }
    
        public List<Task> getTaskList(long timestamp){
            int bucket = this.getBucketByTimestamp(timestamp);
            bucketList[bucket].getTaskList().stream().filter(it -> it.getNum() != 0).forEach(it -> it.setNum(it.getNum() - 1));
    
            return bucketList[bucket].getTaskList().stream().filter(it -> it.getNum() == 0).collect(Collectors.toList());
        }
    
        public void run() throws InterruptedException {
            while (true) {
                List<Task> taskList = this.getTaskList(System.currentTimeMillis());
    
                if (taskList.size() != 0) {
                    // 线程池do task
                }
    
                // 执行后需要推动一下状态,相当于表的电池
                this.advance();
    
                TimeUnit.SECONDS.sleep(1);
            }
        }
    }
    

    3.3 如何判断任务已过期

    这个过期有可能说的是不符合当前时间轮,需要重新分配。可以使用delayedQueue(之前说delayedQueue复杂度过高是对任务来说,我们可以针对bucket来说,不管任务有多少个,bucket不会变),而这个槽到期后,也就是被我们从delayQueue中poll出来后,我们只需要按照添加新任务的逻辑,将槽中的所有任务循环一次,重新加到新的槽中即可。

    相关文章

      网友评论

        本文标题:Kafka时间轮算法

        本文链接:https://www.haomeiwen.com/subject/ysizlktx.html