TimerTask 类
trait TimerTask extends Runnable {
val delayMs: Long // 通常是request.timeout.ms参数值,表示这个定时任务的超时时间
// 每个TimerTask实例关联一个TimerTaskEntry
// 就是说每个定时任务需要知道它在哪个Bucket链表下的哪个链表元素上
private[this] var timerTaskEntry: TimerTaskEntry = null
// 取消定时任务,原理就是将关联的timerTaskEntry置空
def cancel(): Unit = {
synchronized {
if (timerTaskEntry != null) timerTaskEntry.remove()
timerTaskEntry = null
}
}
// 关联timerTaskEntry,原理是给timerTaskEntry字段赋值
private[timer] def setTimerTaskEntry(entry: TimerTaskEntry)
: Unit = {
synchronized {
if (timerTaskEntry != null && timerTaskEntry != entry)
timerTaskEntry.remove()
timerTaskEntry = entry
}
}
// 获取关联的timerTaskEntry实例
private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
timerTaskEntry
}
}
TimerTaskEntry 类
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
@volatile
var list: TimerTaskList = null // 绑定的Bucket链表实例
var next: TimerTaskEntry = null // next指针
var prev: TimerTaskEntry = null // prev指针
// 关联给定的定时任务
if (timerTask != null) timerTask.setTimerTaskEntry(this)
// 关联定时任务是否已经被取消了
def cancelled: Boolean = {
timerTask.getTimerTaskEntry != this
}
// 从Bucket链表中移除自己
def remove(): Unit = {
var currentList = list
while (currentList != null) {
currentList.remove(this)
currentList = list
}
}
......
}
TimerTaskList 类
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
private[this] val root = new TimerTaskEntry(null, -1)
root.next = root
root.prev = root
private[this] val expiration = new AtomicLong(-1L)
......
}
- taskCounter,用于标识当前这个链表中的总定时任务数;
- expiration,表示这个链表所在 Bucket 的过期时间戳。
expiration 的 Getter 和 Setter 方法。
// Set the bucket's expiration time
// Returns true if the expiration time is changed
def setExpiration(expirationMs: Long): Boolean = {
expiration.getAndSet(expirationMs) != expirationMs
}
// Get the bucket's expiration time
def getExpiration(): Long = {
expiration.get()
}
add方法
// Add a timer task entry to this list
def add(timerTaskEntry: TimerTaskEntry): Unit = {
var done = false
while (!done) {
// 在添加之前尝试移除该定时任务,保证该任务没有在其他链表中
// Remove the timer task entry if it is already in any other list
// We do this outside of the sync block below to avoid deadlocking.
// We may retry until timerTaskEntry.list becomes null.
timerTaskEntry.remove()
synchronized {
timerTaskEntry.synchronized {
if (timerTaskEntry.list == null) {
// put the timer task entry to the end of the list. (root.prev points to the tail entry)
val tail = root.prev
timerTaskEntry.next = root
timerTaskEntry.prev = tail
timerTaskEntry.list = this
// 把timerTaskEntry添加到链表末尾
tail.next = timerTaskEntry
root.prev = timerTaskEntry
taskCounter.incrementAndGet()
done = true
}
}
}
}
}
remove方法
// Remove the specified timer task entry from this list
def remove(timerTaskEntry: TimerTaskEntry): Unit = {
synchronized {
timerTaskEntry.synchronized {
if (timerTaskEntry.list eq this) {
timerTaskEntry.next.prev = timerTaskEntry.prev
timerTaskEntry.prev.next = timerTaskEntry.next
timerTaskEntry.next = null
timerTaskEntry.prev = null
timerTaskEntry.list = null
taskCounter.decrementAndGet()
}
}
}
}
flush 方法: 将高层次时间轮 Bucket 上的定时任务重新插入回低层次的 Bucket 中
// Remove all task entries and apply the supplied function to each of them
def flush(f: (TimerTaskEntry)=>Unit): Unit = {
synchronized {
var head = root.next
while (head ne root) {
remove(head)
f(head)
head = root.next
}
expiration.set(-1L)
}
}
TimingWheel 类
#定义
private[timer] class TimingWheel(
tickMs: Long, wheelSize: Int,
startMs: Long, taskCounter: AtomicInteger,
queue: DelayQueue[TimerTaskList]) {
private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
private[this] var currentTime = startMs - (startMs % tickMs)
@volatile private[this] var overflowWheel: TimingWheel = null
......
}
- tickMs:滴答一次的时长,类似于手表的例子中向前推进一格的时间。对于秒针而言,tickMs 就是 1 秒。同理,分针是 1 分,时针是 1 小时。在 Kafka 中,第 1 层时间轮的 tickMs 被固定为 1 毫秒,也就是说,向前推进一格 Bucket 的时长是 1 毫秒。
- wheelSize:每一层时间轮上的 Bucket 数量。第 1 层的 Bucket 数量是 20。
- startMs:时间轮对象被创建时的起始时间戳。
- taskCounter:这一层时间轮上的总定时任务数。
- queue:将所有 Bucket 按照过期时间排序的延迟队列。随着时间不断向前推进,Kafka 需要依靠这个队列获取那些已过期的 Bucket,并清除它们。
- interval:这层时间轮总时长,等于滴答时长乘以 wheelSize。以第 1 层为例,interval 就是 20 毫秒。由于下一层时间轮的滴答时长就是上一层的总时长,因此,第 2 层的滴答时长就是 20 毫秒,总时长是 400 毫秒,以此类推。
- buckets:时间轮下的所有 Bucket 对象,也就是所有 TimerTaskList 对象。
- currentTime:当前时间戳,只是源码对它进行了一些微调整,将它设置成小于当前时间的最大滴答时长的整数倍。举个例子,假设滴答时长是 20 毫秒,当前时间戳是 123 毫秒,那么,currentTime 会被调整为 120 毫秒。
- overflowWheel:Kafka 是按需创建上层时间轮的。这也就是说,当有新的定时任务到达时,会尝试将其放入第 1 层时间轮。如果第 1 层的 interval 无法容纳定时任务的超时时间,就现场创建并配置好第 2 层时间轮,并再次尝试放入,如果依然无法容纳,那么,就再创建和配置第 3 层时间轮,以此类推,直到找到适合容纳该定时任务的第 N 层时间轮。
addOverflowWheel
private[this] def addOverflowWheel(): Unit = {
synchronized {
// 只有之前没有创建上层时间轮方法才会继续
if (overflowWheel == null) {
// 创建新的TimingWheel实例
// 滴答时长tickMs等于下层时间轮总时长
// 每层的轮子数都是相同的
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue
)
}
}
}
add
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
// 获取定时任务的过期时间戳
val expiration = timerTaskEntry.expirationMs
// 如果该任务已然被取消了,则无需添加,直接返回
if (timerTaskEntry.cancelled) {
false
// 如果该任务超时时间已过期
} else if (expiration < currentTime + tickMs) {
false
// 如果该任务超时时间在本层时间轮覆盖时间范围内
} else if (expiration < currentTime + interval) {
val virtualId = expiration / tickMs
// 计算要被放入到哪个Bucket中
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
// 添加到Bucket中
bucket.add(timerTaskEntry)
// 设置Bucket过期时间
// 如果该时间变更过,说明Bucket是新建或被重用,将其加回到DelayQueue
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
// 本层时间轮无法容纳该任务,交由上层时间轮处理
} else {
// 按需创建上层时间轮
if (overflowWheel == null) addOverflowWheel()
// 加入到上层时间轮中
overflowWheel.add(timerTaskEntry)
}
}

advanceClock: 向前驱动时钟的方法
def advanceClock(timeMs: Long): Unit = {
// 向前驱动到的时点要超过Bucket的时间范围,才是有意义的推进,否则什么都不做
// 更新当前时间currentTime到下一个Bucket的起始时点
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)
// 同时尝试为上一层时间轮做向前推进动作
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
网友评论