美文网首页
TimingWheel.scala

TimingWheel.scala

作者: 上海马超23 | 来源:发表于2018-11-19 17:16 被阅读0次
// tickMs 当前时间轮中一个时间格表示的时间跨度
// wheelSize 当前时间轮的格数
// startMs 时间轮的创建时间
// taskCounter 所有时间轮中任务的总数
// queue 所有时间轮共用的一个任务队列,元素类型是TimerTaskList
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
  // 当前时间轮的时间跨度
  // 当前时间轮只能处理时间范围在 currentTime~currentTime+interval之间的定时任务,
  // 超过范围要把任务添加到上层时间轮中
  private[this] val interval = tickMs * wheelSize
  // 每个成员对应时间轮里的一个时间格,保存TimerTaskList的数组
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
  // currentTime是时间轮的指针,tickMs的整数倍(即只可能按时间格步长前进),将整个时间轮分为到期部分和未到期部分
  // currentTime当前指向的时间格也属于到期部分
  // 初始化时近似等同于创建时间
  private[this] var currentTime = startMs - (startMs % tickMs)
  // 上层时间轮的引用
  @volatile private[this] var overflowWheel: TimingWheel = null

  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
    val expiration = timerTaskEntry.expirationMs

    if (timerTaskEntry.cancelled) {
      false
    } else if (expiration < currentTime + tickMs) {
      // 任务已经过期了,即使是在当前指针指向的时间格也算过期
      false
    } else if (expiration < currentTime + interval) { // 在这个时间轮跨度内.添加到这个时间轮里
      // 根据任务的失效时间分配时间格
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)

      // 随着currentTime后移,当前时间轮能处理的时间段也在不断后移,
      // 新来的TimerTaskEntity会添加到复用原来清理过的时间格
      // 所以每次重置bucket的到期时间,保证最新
      if (bucket.setExpiration(virtualId * tickMs)) {
        queue.offer(bucket)
      }
      true
    } else {
      // 超过时间轮跨度,添加到上层
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }

  // TimingWheel:尝试推进当前和上层时间轮的指针
  def advanceClock(timeMs: Long): Unit = {
    if (timeMs >= currentTime + tickMs) {
      // 修整currentTime是tickMs的整数倍, 即减去整除后多余的余数
      // 指针的前进并不是想象中的固定步长,而是直接跳到对应任务的超时时间
      currentTime = timeMs - (timeMs % tickMs)

      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }
}

private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
  private[this] val expiration = new AtomicLong(-1L)

  def setExpiration(expirationMs: Long): Boolean = {
    // 这里判断新添加任务的expiration和原来的是否一致,保证幂等
    expiration.getAndSet(expirationMs) != expirationMs
  }

}

// 执行到期任务、阻塞等待最近到期任务
@threadsafe
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20,
                  startMs: Long = System.currentTimeMillis) extends Timer {
  // 固定线程池,执行到期任务
  private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
    def newThread(runnable: Runnable): Thread =
      Utils.newThread("executor-"+executorName, runnable, false)
  })

  // 所有时间轮共用队列
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  private[this] val taskCounter = new AtomicInteger(0)
  // 最底层的时间轮
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue
  )
  // 同步时间轮currentTime修改的读写锁
  private[this] val readWriteLock = new ReentrantReadWriteLock()

  // DelayedOperationPurgatory.tryCompleteElseWatch里如果未到时间的operation会触发add
  // DelayedOperationPurgatory是TimerTask的子类
  // 实质就是加锁版的addTimerTaskEntry
  def add(timerTask: TimerTask): Unit = {
    readLock.lock()
    try {
      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
    } finally {
      readLock.unlock()
    }
  }

  // 添加定时任务,未过期就加入时间轮,否则就执行
  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
    if (!timingWheel.add(timerTaskEntry)) {
      // 时间轮添加任务返回false说明已经过期,直接执行该任务
      if (!timerTaskEntry.cancelled)
        taskExecutor.submit(timerTaskEntry.timerTask)
    }
  }

  // SystenTimer.advanClock
  def advanceClock(timeoutMs: Long): Boolean = {
    // 取出的是TimerTaskList类型成员
    // 当TimerTaskList因为超时被轮询出来并不一定代表里面所有的TimerTaskEntry一定就超时,
    // 所以对于没有超时的TimerTaskEntry需要重新加入到TimingWheel新的TimerTaskList中,对于超时的TimerTaskEntry则立即执行任务。
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    if (bucket != null) {
      // 定时队列能取出任务说明任务已到期
      writeLock.lock()
      try {
        while (bucket != null) {
          // 从最底层的时间轮开始推进指针
          timingWheel.advanceClock(bucket.getExpiration())
          // 从队列里取出的是TimerTaskList,然后遍历List,每条Entity过期就执行,未过期就重新从底层时间轮开始插入
          // 不就重复插入了吗?在哪里清空时间格的??
          bucket.flush(reinsert)
          // 此处poll不会阻塞
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }
}

trait TimerTask extends Runnable {
  // 延迟操作的延迟时长
  val delayMs: Long
}

abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
  // 此Operation是否完成
  private val completed = new AtomicBoolean(false)

  override def run(): Unit = {
    if (forceComplete())
      // 执行延迟操作到期执行的相关代码
      onExpiration()
  }

  def forceComplete(): Boolean = {
    // 如果Operation没有完成
    // 这个CAS保证线程安全
    if (completed.compareAndSet(false, true)) {
      // 从TimerTaskList里删除
      cancel()
      // 调用真正逻辑
      onComplete()
      true
    } else {
      false
    }
  }

  // 具体子类的业务逻辑实现
  def onComplete(): Unit
}

private class Watchers(val key: Any) {

  // DelayedOperation队列
  private[this] val operations = new LinkedList[T]()

  // 添加DelayOperation到队列
  def watch(t: T) {
    operations synchronized operations.add(t)
  }

  def tryCompleteWatched(): Int = {

    var completed = 0
    operations synchronized {
      // 遍历operations队列
      val iter = operations.iterator()
      while (iter.hasNext) {
        val curr = iter.next()
        if (curr.isCompleted) {
          // 其他线程完成了这个operation,这里就移除已完成的operation
          iter.remove()
        } else if (curr synchronized curr.tryComplete()) {
          // 尝试执行未完成的operation,如果返回isCompleted=true,表示立刻就能完成就删除
          completed += 1
          iter.remove()
        }
      }
    }

    // operations集合全部完成,从watchersForKey里删除这个键值对
    if (operations.size == 0)
      removeKeyIfEmpty(key, this)

    completed
  }

}

class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
         timeoutTimer: Timer, // SystemTimer对象
         brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true)
        extends Logging with KafkaMetricsGroup {

  // 管理watchers
  // values是Watchers类型的对象,表示一个DelayedOperation集合,底层是LinkedList
  // key是Watchers里DelayedOperation集合关心的对象(貌似关联的key就GroupCoordinator和ReplicaManager
  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
  // 对watchersForKey同步的读写锁
  private val removeWatchersLock = new ReentrantReadWriteLock()
  // delayedOperation的个数
  private[this] val estimatedTotalOperations = new AtomicInteger(0)

  // 主要作用:推进时间轮指针,定期清理watchersForKey中已完成的DelayedOperation
  private val expirationReaper = new ExpiredOperationReaper()

  if (reaperEnabled)
    // 初始化时就启动expirationReaper线程
    expirationReaper.start()

  private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d".format(brokerId), false) {
    // 轮询检查推进时间轮指针和清理完成的operation
    override def doWork() {
      // 时间轮和SystemTimer的指针都是从这个线程驱动推进的
      advanceClock(200L)
    }

    def advanceClock(timeoutMs: Long) {
      timeoutTimer.advanceClock(timeoutMs)
      // 当DelayedOperationPurgatory与SystemTimer中的DelayOperation数量相差到一个阈值时,执行清理工作
      if (estimatedTotalOperations.get - delayed > purgeInterval) {
        estimatedTotalOperations.getAndSet(delayed)
        val purged = allWatchers.map(_.purgeCompleted()).sum
      }
    }
  }

  // 检测指定单个DelayedOperation是否已经完成,若未完成则添加到watchesForKeys和SystemTimer中
  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
    var isCompletedByMe = operation synchronized operation.tryComplete()
    if (isCompletedByMe)
      return true

    // 将DelayedOperation添加到所有key对应的Watchers中
    // 一个DelayedOperation可能有多个watchKeys
    var watchCreated = false
    for(key <- watchKeys) {
      if (operation.isCompleted())
        // 若过程中被其他线程完成,则放弃后续添加过程
        // ExpiredOperationReaper线程会定期清理watchersForKey,所以不需要清理之前添加的key
        return false
      // 添加DelayedOperation到watchersForKey中对应key的watchers
      watchForOperation(key, operation)

      if (!watchCreated) {
        watchCreated = true
        estimatedTotalOperations.incrementAndGet()
      }
    }

    isCompletedByMe = operation synchronized operation.tryComplete()
    if (isCompletedByMe)
      return true

    // 将operation添加到SystemTimer里
    // 同时SystemTimer也会把任务添加到时间轮里
    if (! operation.isCompleted()) {
      timeoutTimer.add(operation)
      if (operation.isCompleted()) {
        // 如果完成从SystemTimer里删除
        operation.cancel()
      }
    }

    false
  }

  // 根据传入的key,尝试执行对应的Watchers中的operation
  def checkAndComplete(key: Any): Int = {
    val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
      watchers.tryCompleteWatched()
  }
}

相关文章

网友评论

      本文标题:TimingWheel.scala

      本文链接:https://www.haomeiwen.com/subject/vruafqtx.html