美文网首页
模拟kafka时间轮

模拟kafka时间轮

作者: 快点给我想个名 | 来源:发表于2020-12-28 17:42 被阅读0次

延迟功能调度器

public class SystemTimer {
    // 执行任务线程池
    private ExecutorService taskExecutor = Executors.newFixedThreadPool(1, runnable -> {
        Thread thread = new Thread(runnable, "executor-pool");
        thread.setDaemon(true);
        thread.setUncaughtExceptionHandler((t, e) -> System.err.println("Uncaught exception in thread '" + t.getName() + "':" + e));
        return thread;
    });
    // 延迟队列
    private DelayQueue<TimerTaskList> delayQueue = new DelayQueue();
    // 时间轮
    private TimingWheel timingWheel = new TimingWheel(1000L, 5, System.currentTimeMillis(), delayQueue);

    private static SystemTimer INSTANCE;

    public static SystemTimer getInstance() {
        if (INSTANCE == null) {
            synchronized (SystemTimer.class) {
                if (INSTANCE == null) {
                    INSTANCE = new SystemTimer();
                }
            }
        }
        return INSTANCE;
    }

    private SystemTimer() {
        new Thread(() -> {
            while (true){
                advanceClock(1000L);
            }
        }).start();
    }

    /**
     * 推动时间轮转动
     * @param timeoutMs
     */
    private void advanceClock(Long timeoutMs) {
        try {
            TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
            if (bucket != null){
                timingWheel.advanceClock(bucket.getExpiration());
                bucket.flush(this::addTask);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 添加任务
     * @param timedTask
     */
    public void addTask(TimerTaskEntry timedTask) {
        if (!timingWheel.addTask(timedTask)) {// 到期或取消
            if (!timedTask.isCancle()) {// 到期立即执行
                taskExecutor.submit(timedTask.getTask());
            }
        }
    }
}

时间轮

public class TimingWheel {
    // 一个槽表示的时间范围
    private Long tickMs;
    // 轮大小
    private Integer wheelSize;
    // 一个时间轮表示的时间范围
    private Long interval;
    // 时间轮指针
    private volatile long currentTime;
    private TimerTaskList[] buckets;
    private DelayQueue<TimerTaskList> delayQueue;
    // 上层时间轮
    private volatile TimingWheel overflowWheel;

    public TimingWheel(Long tickMs, Integer wheelSize, Long currentTime, DelayQueue<TimerTaskList> delayQueue) {
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.interval = tickMs * wheelSize;
        this.buckets = new TimerTaskList[wheelSize];
        this.currentTime = currentTime - (currentTime % tickMs);
        this.delayQueue = delayQueue;
        for (int i = 0; i < buckets.length; i++) {
            buckets[i] = new TimerTaskList(UUID.randomUUID().toString());
        }
    }

    /**
     * 添加任务到对应的槽
     * @param timedTask
     * @return
     */
    public boolean addTask(TimerTaskEntry timedTask) {
        Long expirationMs = timedTask.getExpirationMs();
        if(timedTask.isCancle()){// 任务取消
            return false;
        }
        long delayMs = expirationMs - currentTime;
        if(delayMs < tickMs){// 任务到期
            return false;
        }else if(delayMs < interval){// 添加到对应的槽
            long virtualId = expirationMs / tickMs;
            int bucketIndex = (int) (virtualId % wheelSize);
            TimerTaskList bucket = buckets[bucketIndex];
            bucket.addTask(timedTask);
            // 设置槽过期时间并将任务入队
            if(bucket.setExpiration(expirationMs - (expirationMs % tickMs))){
                delayQueue.offer(bucket);
            }
            return true;
        }else{// 添加到上层时间轮
            if(overflowWheel == null){
                addOverflowWheel();
            }
            overflowWheel.addTask(timedTask);
            return true;
        }
    }

    /**
     * 尝试推荐时间轮
     * @param expiration
     */
    public void advanceClock(Long expiration) {
        if(expiration >= currentTime + tickMs){
            currentTime = expiration - (expiration % tickMs);
            if(overflowWheel != null){
                overflowWheel.advanceClock(expiration);
            }
        }
    }

    private TimingWheel addOverflowWheel() {
        if (overflowWheel == null) {
            synchronized (this) {
                if (overflowWheel == null) {
                    // 注意这里第一个参数为interval
                    overflowWheel = new TimingWheel(interval, wheelSize, currentTime, delayQueue);
                }
            }
        }
        return overflowWheel;
    }
}

槽(任务列表)

public class TimerTaskList implements Delayed {
    // 唯一标识
    private String id;
    // 槽过期时间
    private AtomicLong expiration = new AtomicLong(-1L);

    private TimerTaskEntry root = new TimerTaskEntry(-1L,null);

    {
        root.next = root;
        root.prev = root;
    }

    public TimerTaskList(String id) {
        this.id = id;
    }

    /**
     * 添加任务
     * @param timedTask
     */
    public void addTask(TimerTaskEntry timedTask) {
        synchronized (this) {
            timedTask.bucket = this;
            TimerTaskEntry tail = root.prev;

            timedTask.next = root;
            timedTask.prev = tail;

            tail.next = timedTask;
            root.prev = timedTask;
        }
    }

    /**
     * 删除任务
     * @param timedTask
     */
    public void removeTask(TimerTaskEntry timedTask) {
        synchronized (this) {
            if (timedTask.bucket.id.equals(this.id)) {
                timedTask.next.prev = timedTask.prev;
                timedTask.prev.next = timedTask.next;
                timedTask.bucket = null;
                timedTask.next = null;
                timedTask.prev = null;
            }
        }
    }

    /**
     * 重新分配槽
     * 执行当前槽任务时会调用该方法
     * @param timedTaskFlush
     */
    public void flush(Consumer<TimerTaskEntry> timedTaskFlush) {
        synchronized (this){
            TimerTaskEntry timedTask = root.next;
            while (timedTask != null && !timedTask.equals(root)){
                removeTask(timedTask);
                timedTaskFlush.accept(timedTask);
                timedTask = root.next;
            }
            expiration.set(-1);
        }
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
    }

    @Override
    public int compareTo(Delayed o) {
        if (o instanceof TimerTaskList) {
            return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
        }
        return 0;
    }

    public Boolean setExpiration(Long expiration) {
        return this.expiration.getAndSet(expiration) != expiration;
    }

    public Long getExpiration() {
        return expiration.get();
    }
}

任务

public class TimerTaskEntry  {

    // 延迟时间
    private Long delayMs;
    // 任务
    private Runnable task;
    // 过期时间戳
    private Long expirationMs;
    private AtomicBoolean cancel;
    protected TimerTaskEntry next;
    protected TimerTaskEntry prev;
    protected TimerTaskList bucket;

    public TimerTaskEntry(Long delayMs, Runnable task) {
        this.delayMs = delayMs;
        this.task = task;
        this.expirationMs = System.currentTimeMillis() + delayMs;
        this.cancel = new AtomicBoolean(false);
        this.next = this.prev = null;
        this.bucket = null;
    }

    public boolean isCancle() {
        return cancel.get();
    }

    public boolean cancel(){
        return cancel.compareAndSet(false,true);
    }

    public Runnable getTask() {
        return task;
    }

    public Long getExpirationMs() {
        return expirationMs;
    }

}

测试

public class Test {

    public static void main(String[] args) throws InterruptedException {
        SystemTimer systemTimer = SystemTimer.getInstance();

        TimerTaskEntry taskEntry1 = new TimerTaskEntry(1000L, () -> System.out.println("任务执行1->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry2 = new TimerTaskEntry(2000L, () -> System.out.println("任务执行2->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry3 = new TimerTaskEntry(3000L, () -> System.out.println("任务执行3->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry4 = new TimerTaskEntry(4000L, () -> System.out.println("任务执行4->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry5 = new TimerTaskEntry(5000L, () -> System.out.println("任务执行5->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry6 = new TimerTaskEntry(6000L, () -> System.out.println("任务执行6->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry7 = new TimerTaskEntry(10000L, () -> System.out.println("任务执行7->" + System.currentTimeMillis()));
        TimerTaskEntry taskEntry8 = new TimerTaskEntry(20000L, () -> System.out.println("任务执行8->" + System.currentTimeMillis()));


        systemTimer.addTask(taskEntry1);
        systemTimer.addTask(taskEntry2);
        systemTimer.addTask(taskEntry3);
        systemTimer.addTask(taskEntry4);
        systemTimer.addTask(taskEntry5);
        systemTimer.addTask(taskEntry6);
        systemTimer.addTask(taskEntry7);
        systemTimer.addTask(taskEntry8);

        Thread.sleep(20000);

        new Thread(() -> {
            TimerTaskEntry taskEntry9 = new TimerTaskEntry(1500L, () -> System.out.println("任务执行9->" + System.currentTimeMillis()));
            systemTimer.addTask(taskEntry9);
        }).start();
    }
}

相关文章

  • 模拟kafka时间轮

    延迟功能调度器 时间轮 槽(任务列表) 任务 测试

  • 解惑“高深”的 Kafka 时间轮原理,原来也就这么回事!

    【摘要】Kafka时间轮是Kafka实现高效的延时任务的基础,它模拟了现实生活中的钟表对时间的表示方式,同时,时间...

  • “高深莫测”的Kafka时间轮原理,原来也就这么回事

    【摘要】 Kafka时间轮是Kafka实现高效的延时任务的基础,它模拟了现实生活中的钟表对时间的表示方式,同时,时...

  • Kafka中的服务端

    阅读以下内容你将了解到:1.Kafka的协议2.Kafka的时间轮实现(作用、原理、多级时间轮)3.Kafka中的...

  • kafka时间轮

    举个例子:第一层时间轮格数是10,每格表示1ms。第二层时间轮格数是20,每格表示上一层时间轮的总和:10*1ms...

  • 无镜--kafka之服务端--时间轮

    时间轮 Kafka中存在大量的延迟操作,比如延迟生产,延迟拉取,延迟加入,延迟心跳等。kafka使用时间轮(Tim...

  • kafka时间轮解析

    概述 这篇博文的起源在于阿里的公众号里面有一篇文章讲菜鸟的同学在造一个关于时间轮定时器的文章,然后在网上搜索资...

  • Kafka时间轮算法

    1 背景 Kafka存在大量的延时操作,比如延时生产、延时消费或者延时删除,实现延时操作有很多办法,JDK的Tim...

  • kafka TimingWheel(时间轮)

    先吐个槽,不喜勿喷,最近非常想换工作,在目前这家公司待的还不满一年,为什么想离职呢?年前加了半年的班几乎每天都是九...

  • 一张图理解Kafka时间轮(TimingWheel),看不懂算我

    本文是【字节可视化系列】Kafka专栏文章。通过本文你将了解到时间轮算法思想,层级时间轮,时间轮的升级和降级。 时...

网友评论

      本文标题:模拟kafka时间轮

      本文链接:https://www.haomeiwen.com/subject/xestoktx.html