美文网首页
模拟kafka时间轮

模拟kafka时间轮

作者: 快点给我想个名 | 来源:发表于2020-12-28 17:42 被阅读0次

    延迟功能调度器

    public class SystemTimer {
        // 执行任务线程池
        private ExecutorService taskExecutor = Executors.newFixedThreadPool(1, runnable -> {
            Thread thread = new Thread(runnable, "executor-pool");
            thread.setDaemon(true);
            thread.setUncaughtExceptionHandler((t, e) -> System.err.println("Uncaught exception in thread '" + t.getName() + "':" + e));
            return thread;
        });
        // 延迟队列
        private DelayQueue<TimerTaskList> delayQueue = new DelayQueue();
        // 时间轮
        private TimingWheel timingWheel = new TimingWheel(1000L, 5, System.currentTimeMillis(), delayQueue);
    
        private static SystemTimer INSTANCE;
    
        public static SystemTimer getInstance() {
            if (INSTANCE == null) {
                synchronized (SystemTimer.class) {
                    if (INSTANCE == null) {
                        INSTANCE = new SystemTimer();
                    }
                }
            }
            return INSTANCE;
        }
    
        private SystemTimer() {
            new Thread(() -> {
                while (true){
                    advanceClock(1000L);
                }
            }).start();
        }
    
        /**
         * 推动时间轮转动
         * @param timeoutMs
         */
        private void advanceClock(Long timeoutMs) {
            try {
                TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
                if (bucket != null){
                    timingWheel.advanceClock(bucket.getExpiration());
                    bucket.flush(this::addTask);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
        /**
         * 添加任务
         * @param timedTask
         */
        public void addTask(TimerTaskEntry timedTask) {
            if (!timingWheel.addTask(timedTask)) {// 到期或取消
                if (!timedTask.isCancle()) {// 到期立即执行
                    taskExecutor.submit(timedTask.getTask());
                }
            }
        }
    }
    

    时间轮

    public class TimingWheel {
        // 一个槽表示的时间范围
        private Long tickMs;
        // 轮大小
        private Integer wheelSize;
        // 一个时间轮表示的时间范围
        private Long interval;
        // 时间轮指针
        private volatile long currentTime;
        private TimerTaskList[] buckets;
        private DelayQueue<TimerTaskList> delayQueue;
        // 上层时间轮
        private volatile TimingWheel overflowWheel;
    
        public TimingWheel(Long tickMs, Integer wheelSize, Long currentTime, DelayQueue<TimerTaskList> delayQueue) {
            this.tickMs = tickMs;
            this.wheelSize = wheelSize;
            this.interval = tickMs * wheelSize;
            this.buckets = new TimerTaskList[wheelSize];
            this.currentTime = currentTime - (currentTime % tickMs);
            this.delayQueue = delayQueue;
            for (int i = 0; i < buckets.length; i++) {
                buckets[i] = new TimerTaskList(UUID.randomUUID().toString());
            }
        }
    
        /**
         * 添加任务到对应的槽
         * @param timedTask
         * @return
         */
        public boolean addTask(TimerTaskEntry timedTask) {
            Long expirationMs = timedTask.getExpirationMs();
            if(timedTask.isCancle()){// 任务取消
                return false;
            }
            long delayMs = expirationMs - currentTime;
            if(delayMs < tickMs){// 任务到期
                return false;
            }else if(delayMs < interval){// 添加到对应的槽
                long virtualId = expirationMs / tickMs;
                int bucketIndex = (int) (virtualId % wheelSize);
                TimerTaskList bucket = buckets[bucketIndex];
                bucket.addTask(timedTask);
                // 设置槽过期时间并将任务入队
                if(bucket.setExpiration(expirationMs - (expirationMs % tickMs))){
                    delayQueue.offer(bucket);
                }
                return true;
            }else{// 添加到上层时间轮
                if(overflowWheel == null){
                    addOverflowWheel();
                }
                overflowWheel.addTask(timedTask);
                return true;
            }
        }
    
        /**
         * 尝试推荐时间轮
         * @param expiration
         */
        public void advanceClock(Long expiration) {
            if(expiration >= currentTime + tickMs){
                currentTime = expiration - (expiration % tickMs);
                if(overflowWheel != null){
                    overflowWheel.advanceClock(expiration);
                }
            }
        }
    
        private TimingWheel addOverflowWheel() {
            if (overflowWheel == null) {
                synchronized (this) {
                    if (overflowWheel == null) {
                        // 注意这里第一个参数为interval
                        overflowWheel = new TimingWheel(interval, wheelSize, currentTime, delayQueue);
                    }
                }
            }
            return overflowWheel;
        }
    }
    

    槽(任务列表)

    public class TimerTaskList implements Delayed {
        // 唯一标识
        private String id;
        // 槽过期时间
        private AtomicLong expiration = new AtomicLong(-1L);
    
        private TimerTaskEntry root = new TimerTaskEntry(-1L,null);
    
        {
            root.next = root;
            root.prev = root;
        }
    
        public TimerTaskList(String id) {
            this.id = id;
        }
    
        /**
         * 添加任务
         * @param timedTask
         */
        public void addTask(TimerTaskEntry timedTask) {
            synchronized (this) {
                timedTask.bucket = this;
                TimerTaskEntry tail = root.prev;
    
                timedTask.next = root;
                timedTask.prev = tail;
    
                tail.next = timedTask;
                root.prev = timedTask;
            }
        }
    
        /**
         * 删除任务
         * @param timedTask
         */
        public void removeTask(TimerTaskEntry timedTask) {
            synchronized (this) {
                if (timedTask.bucket.id.equals(this.id)) {
                    timedTask.next.prev = timedTask.prev;
                    timedTask.prev.next = timedTask.next;
                    timedTask.bucket = null;
                    timedTask.next = null;
                    timedTask.prev = null;
                }
            }
        }
    
        /**
         * 重新分配槽
         * 执行当前槽任务时会调用该方法
         * @param timedTaskFlush
         */
        public void flush(Consumer<TimerTaskEntry> timedTaskFlush) {
            synchronized (this){
                TimerTaskEntry timedTask = root.next;
                while (timedTask != null && !timedTask.equals(root)){
                    removeTask(timedTask);
                    timedTaskFlush.accept(timedTask);
                    timedTask = root.next;
                }
                expiration.set(-1);
            }
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
        }
    
        @Override
        public int compareTo(Delayed o) {
            if (o instanceof TimerTaskList) {
                return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
            }
            return 0;
        }
    
        public Boolean setExpiration(Long expiration) {
            return this.expiration.getAndSet(expiration) != expiration;
        }
    
        public Long getExpiration() {
            return expiration.get();
        }
    }
    

    任务

    public class TimerTaskEntry  {
    
        // 延迟时间
        private Long delayMs;
        // 任务
        private Runnable task;
        // 过期时间戳
        private Long expirationMs;
        private AtomicBoolean cancel;
        protected TimerTaskEntry next;
        protected TimerTaskEntry prev;
        protected TimerTaskList bucket;
    
        public TimerTaskEntry(Long delayMs, Runnable task) {
            this.delayMs = delayMs;
            this.task = task;
            this.expirationMs = System.currentTimeMillis() + delayMs;
            this.cancel = new AtomicBoolean(false);
            this.next = this.prev = null;
            this.bucket = null;
        }
    
        public boolean isCancle() {
            return cancel.get();
        }
    
        public boolean cancel(){
            return cancel.compareAndSet(false,true);
        }
    
        public Runnable getTask() {
            return task;
        }
    
        public Long getExpirationMs() {
            return expirationMs;
        }
    
    }
    

    测试

    public class Test {
    
        public static void main(String[] args) throws InterruptedException {
            SystemTimer systemTimer = SystemTimer.getInstance();
    
            TimerTaskEntry taskEntry1 = new TimerTaskEntry(1000L, () -> System.out.println("任务执行1->" + System.currentTimeMillis()));
            TimerTaskEntry taskEntry2 = new TimerTaskEntry(2000L, () -> System.out.println("任务执行2->" + System.currentTimeMillis()));
            TimerTaskEntry taskEntry3 = new TimerTaskEntry(3000L, () -> System.out.println("任务执行3->" + System.currentTimeMillis()));
            TimerTaskEntry taskEntry4 = new TimerTaskEntry(4000L, () -> System.out.println("任务执行4->" + System.currentTimeMillis()));
            TimerTaskEntry taskEntry5 = new TimerTaskEntry(5000L, () -> System.out.println("任务执行5->" + System.currentTimeMillis()));
            TimerTaskEntry taskEntry6 = new TimerTaskEntry(6000L, () -> System.out.println("任务执行6->" + System.currentTimeMillis()));
            TimerTaskEntry taskEntry7 = new TimerTaskEntry(10000L, () -> System.out.println("任务执行7->" + System.currentTimeMillis()));
            TimerTaskEntry taskEntry8 = new TimerTaskEntry(20000L, () -> System.out.println("任务执行8->" + System.currentTimeMillis()));
    
    
            systemTimer.addTask(taskEntry1);
            systemTimer.addTask(taskEntry2);
            systemTimer.addTask(taskEntry3);
            systemTimer.addTask(taskEntry4);
            systemTimer.addTask(taskEntry5);
            systemTimer.addTask(taskEntry6);
            systemTimer.addTask(taskEntry7);
            systemTimer.addTask(taskEntry8);
    
            Thread.sleep(20000);
    
            new Thread(() -> {
                TimerTaskEntry taskEntry9 = new TimerTaskEntry(1500L, () -> System.out.println("任务执行9->" + System.currentTimeMillis()));
                systemTimer.addTask(taskEntry9);
            }).start();
        }
    }
    

    相关文章

      网友评论

          本文标题:模拟kafka时间轮

          本文链接:https://www.haomeiwen.com/subject/xestoktx.html