美文网首页Kafka
时间轮TimingWheel

时间轮TimingWheel

作者: 专职掏大粪 | 来源:发表于2021-01-11 14:13 被阅读0次

TimerTask 类


trait TimerTask extends Runnable {
 val delayMs: Long // 通常是request.timeout.ms参数值,表示这个定时任务的超时时间
 // 每个TimerTask实例关联一个TimerTaskEntry
 // 就是说每个定时任务需要知道它在哪个Bucket链表下的哪个链表元素上
 private[this] var timerTaskEntry: TimerTaskEntry = null
 // 取消定时任务,原理就是将关联的timerTaskEntry置空
 def cancel(): Unit = {
   synchronized {
     if (timerTaskEntry != null) timerTaskEntry.remove()
     timerTaskEntry = null
   }
 }
 // 关联timerTaskEntry,原理是给timerTaskEntry字段赋值
 private[timer] def setTimerTaskEntry(entry: TimerTaskEntry)
   : Unit = {
   synchronized {
     if (timerTaskEntry != null && timerTaskEntry != entry)
       timerTaskEntry.remove()
     timerTaskEntry = entry
   }
 }
 // 获取关联的timerTaskEntry实例
 private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
   timerTaskEntry
 }
}

TimerTaskEntry 类

private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
  @volatile
  var list: TimerTaskList = null   // 绑定的Bucket链表实例 
  var next: TimerTaskEntry = null  // next指针
  var prev: TimerTaskEntry = null  // prev指针
  // 关联给定的定时任务
  if (timerTask != null) timerTask.setTimerTaskEntry(this)
  // 关联定时任务是否已经被取消了
  def cancelled: Boolean = {
    timerTask.getTimerTaskEntry != this
  }
  // 从Bucket链表中移除自己
  def remove(): Unit = {
    var currentList = list
    while (currentList != null) {
      currentList.remove(this)
      currentList = list
    }
  }
  ......
}

TimerTaskList 类

private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
  private[this] val root = new TimerTaskEntry(null, -1)
  root.next = root
  root.prev = root
  private[this] val expiration = new AtomicLong(-1L)
  ......
}
  • taskCounter,用于标识当前这个链表中的总定时任务数;
  • expiration,表示这个链表所在 Bucket 的过期时间戳。

expiration 的 Getter 和 Setter 方法。

 // Set the bucket's expiration time
  // Returns true if the expiration time is changed
  def setExpiration(expirationMs: Long): Boolean = {
    expiration.getAndSet(expirationMs) != expirationMs
  }

  // Get the bucket's expiration time
  def getExpiration(): Long = {
    expiration.get()
  }

add方法

  // Add a timer task entry to this list
  def add(timerTaskEntry: TimerTaskEntry): Unit = {
    var done = false
    while (!done) {
 // 在添加之前尝试移除该定时任务,保证该任务没有在其他链表中
      // Remove the timer task entry if it is already in any other list
      // We do this outside of the sync block below to avoid deadlocking.
      // We may retry until timerTaskEntry.list becomes null.
      timerTaskEntry.remove()

      synchronized {
        timerTaskEntry.synchronized {
          if (timerTaskEntry.list == null) {
            // put the timer task entry to the end of the list. (root.prev points to the tail entry)
            val tail = root.prev
            timerTaskEntry.next = root
            timerTaskEntry.prev = tail
            timerTaskEntry.list = this
 // 把timerTaskEntry添加到链表末尾
            tail.next = timerTaskEntry
            root.prev = timerTaskEntry
            taskCounter.incrementAndGet()
            done = true
          }
        }
      }
    }
  }

remove方法

 // Remove the specified timer task entry from this list
  def remove(timerTaskEntry: TimerTaskEntry): Unit = {
    synchronized {
      timerTaskEntry.synchronized {
        if (timerTaskEntry.list eq this) {
          timerTaskEntry.next.prev = timerTaskEntry.prev
          timerTaskEntry.prev.next = timerTaskEntry.next
          timerTaskEntry.next = null
          timerTaskEntry.prev = null
          timerTaskEntry.list = null
          taskCounter.decrementAndGet()
        }
      }
    }
  }

flush 方法: 将高层次时间轮 Bucket 上的定时任务重新插入回低层次的 Bucket 中

  // Remove all task entries and apply the supplied function to each of them
  def flush(f: (TimerTaskEntry)=>Unit): Unit = {
    synchronized {
      var head = root.next
      while (head ne root) {
        remove(head)
        f(head)
        head = root.next
      }
      expiration.set(-1L)
    }
  }

TimingWheel 类

#定义
private[timer] class TimingWheel(
  tickMs: Long, wheelSize: Int, 
  startMs: Long, taskCounter: AtomicInteger, 
  queue: DelayQueue[TimerTaskList]) {
  private[this] val interval = tickMs * wheelSize
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
  private[this] var currentTime = startMs - (startMs % tickMs)
  @volatile private[this] var overflowWheel: TimingWheel = null
  ......
}

  • tickMs:滴答一次的时长,类似于手表的例子中向前推进一格的时间。对于秒针而言,tickMs 就是 1 秒。同理,分针是 1 分,时针是 1 小时。在 Kafka 中,第 1 层时间轮的 tickMs 被固定为 1 毫秒,也就是说,向前推进一格 Bucket 的时长是 1 毫秒。
  • wheelSize:每一层时间轮上的 Bucket 数量。第 1 层的 Bucket 数量是 20。
  • startMs:时间轮对象被创建时的起始时间戳。
  • taskCounter:这一层时间轮上的总定时任务数。
  • queue:将所有 Bucket 按照过期时间排序的延迟队列。随着时间不断向前推进,Kafka 需要依靠这个队列获取那些已过期的 Bucket,并清除它们。
  • interval:这层时间轮总时长,等于滴答时长乘以 wheelSize。以第 1 层为例,interval 就是 20 毫秒。由于下一层时间轮的滴答时长就是上一层的总时长,因此,第 2 层的滴答时长就是 20 毫秒,总时长是 400 毫秒,以此类推。
  • buckets:时间轮下的所有 Bucket 对象,也就是所有 TimerTaskList 对象。
  • currentTime:当前时间戳,只是源码对它进行了一些微调整,将它设置成小于当前时间的最大滴答时长的整数倍。举个例子,假设滴答时长是 20 毫秒,当前时间戳是 123 毫秒,那么,currentTime 会被调整为 120 毫秒。
  • overflowWheel:Kafka 是按需创建上层时间轮的。这也就是说,当有新的定时任务到达时,会尝试将其放入第 1 层时间轮。如果第 1 层的 interval 无法容纳定时任务的超时时间,就现场创建并配置好第 2 层时间轮,并再次尝试放入,如果依然无法容纳,那么,就再创建和配置第 3 层时间轮,以此类推,直到找到适合容纳该定时任务的第 N 层时间轮。

addOverflowWheel


private[this] def addOverflowWheel(): Unit = {
  synchronized {
    // 只有之前没有创建上层时间轮方法才会继续
    if (overflowWheel == null) {
      // 创建新的TimingWheel实例
      // 滴答时长tickMs等于下层时间轮总时长
      // 每层的轮子数都是相同的
      overflowWheel = new TimingWheel(
        tickMs = interval,
        wheelSize = wheelSize,
        startMs = currentTime,
        taskCounter = taskCounter,
        queue
      )
    }
  }
}

add


def add(timerTaskEntry: TimerTaskEntry): Boolean = {
  // 获取定时任务的过期时间戳
  val expiration = timerTaskEntry.expirationMs
  // 如果该任务已然被取消了,则无需添加,直接返回
  if (timerTaskEntry.cancelled) {
    false
  // 如果该任务超时时间已过期
  } else if (expiration < currentTime + tickMs) {
    false
  // 如果该任务超时时间在本层时间轮覆盖时间范围内
  } else if (expiration < currentTime + interval) {
    val virtualId = expiration / tickMs
    // 计算要被放入到哪个Bucket中
    val bucket = buckets((virtualId % wheelSize.toLong).toInt)
    // 添加到Bucket中
    bucket.add(timerTaskEntry)
    // 设置Bucket过期时间
    // 如果该时间变更过,说明Bucket是新建或被重用,将其加回到DelayQueue
    if (bucket.setExpiration(virtualId * tickMs)) {
      queue.offer(bucket)
    }
    true
  // 本层时间轮无法容纳该任务,交由上层时间轮处理
  } else {
    // 按需创建上层时间轮
    if (overflowWheel == null) addOverflowWheel()
    // 加入到上层时间轮中
    overflowWheel.add(timerTaskEntry)
  }
}

image.png

advanceClock: 向前驱动时钟的方法


def advanceClock(timeMs: Long): Unit = {
  // 向前驱动到的时点要超过Bucket的时间范围,才是有意义的推进,否则什么都不做
  // 更新当前时间currentTime到下一个Bucket的起始时点
  if (timeMs >= currentTime + tickMs) {
    currentTime = timeMs - (timeMs % tickMs)
    // 同时尝试为上一层时间轮做向前推进动作
    if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
  }
}

相关文章

  • 时间轮TimingWheel

    TimerTask 类 TimerTaskEntry 类 TimerTaskList 类 taskCounter,...

  • TimingWheel 时间轮详解

    在kafka中,有许多请求并不是立即返回,而且处理完一些异步操作或者等待某些条件达成后才返回,这些请求一般都会带有...

  • kafka TimingWheel(时间轮)

    先吐个槽,不喜勿喷,最近非常想换工作,在目前这家公司待的还不满一年,为什么想离职呢?年前加了半年的班几乎每天都是九...

  • 时间轮

    参考代码 hzlulu/TimingWheel TimerTask 时间轮中任务类的抽象类。实现Runnable...

  • 算法

    1.timingWheel2.lru3.mapreduce4.quorum5.consistent hash

  • TimingWheel.scala

  • 一张图理解Kafka时间轮(TimingWheel),看不懂算我

    本文是【字节可视化系列】Kafka专栏文章。通过本文你将了解到时间轮算法思想,层级时间轮,时间轮的升级和降级。 时...

  • 时间轮

    https://zhuanlan.zhihu.com/p/102476356[https://zhuanlan.z...

  • 时间轮

    前言 时间轮是一个非常高效且低成本的计时算法,论文地址《Hashed and Hierarchical Timin...

  • 浅论时间轮

    基于时间轮的定时器 定时器的实现多采用最小堆,其创建和删除复杂度为O(logN),tick的复杂度为O(1);在极...

网友评论

    本文标题:时间轮TimingWheel

    本文链接:https://www.haomeiwen.com/subject/xbgraktx.html