美文网首页Kafka
Broker是怎么延时处理请求的?

Broker是怎么延时处理请求的?

作者: 专职掏大粪 | 来源:发表于2021-01-11 15:45 被阅读0次

Timer 接口


trait Timer {
  // 将给定的定时任务插入到时间轮上,等待后续延迟执行
  def add(timerTask: TimerTask): Unit
  // 向前推进时钟,执行已达过期时间的延迟任务
  def advanceClock(timeoutMs: Long): Boolean
  // 获取时间轮上总的定时任务数
  def size: Int
  // 关闭定时器
  def shutdown(): Unit
}

SystemTimer 类


class SystemTimer(executorName: String,
                  tickMs: Long = 1,
                  wheelSize: Int = 20,
                  startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {
  // 单线程的线程池用于异步执行定时任务
  private[this] val taskExecutor = Executors.newFixedThreadPool(1,
    (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))
  // 延迟队列保存所有Bucket,即所有TimerTaskList对象
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  // 总定时任务数
  private[this] val taskCounter = new AtomicInteger(0)
  // 时间轮对象
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs,
    wheelSize = wheelSize,
    startMs = startMs,
    taskCounter = taskCounter,
    delayQueue
  )
  // 维护线程安全的读写锁
  private[this] val readWriteLock = new ReentrantReadWriteLock()
  private[this] val readLock = readWriteLock.readLock()
  private[this] val writeLock = readWriteLock.writeLock()
  ......
}
  • delayQueue 字段。它保存了该定时器下管理的所有 Bucket 对象。因为是 DelayQueue,所以只有在 Bucket 过期后,才能从该队列中获取到。SystemTimer 类的 advanceClock 方法正是依靠了这个特性向前驱动时钟。关于这一点,一会儿我们详细说。
  • timingWheel。TimingWheel 是实现分层时间轮的类。SystemTimer 类依靠它来操作分层时间轮。
  • taskExecutor。它是单线程的线程池,用于异步执行提交的定时任务逻辑

add 方法的作用,是将给定的定时任务插入到时间轮中进行管理。代码如下:


def add(timerTask: TimerTask): Unit = {
  // 获取读锁。在没有线程持有写锁的前提下,
  // 多个线程能够同时向时间轮添加定时任务
  readLock.lock()
  try {
    // 调用addTimerTaskEntry执行插入逻辑
    addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
  } finally {
    // 释放读锁
    readLock.unlock()
  }
}

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
  // 视timerTaskEntry状态决定执行什么逻辑:
  // 1. 未过期未取消:添加到时间轮
  // 2. 已取消:什么都不做
  // 3. 已过期:提交到线程池,等待执行
  if (!timingWheel.add(timerTaskEntry)) {
    // 定时任务未取消,说明定时任务已过期
    // 否则timingWheel.add方法应该返回True
    if (!timerTaskEntry.cancelled)
      taskExecutor.submit(timerTaskEntry.timerTask)
  }
}

advanceClock 方法。顾名思义,它的作用是驱动时钟向前推进



def advanceClock(timeoutMs: Long): Boolean = {
  // 获取delayQueue中下一个已过期的Bucket
  var bucket = delayQueue.poll(
    timeoutMs, TimeUnit.MILLISECONDS)
  if (bucket != null) {
    // 获取写锁
    // 一旦有线程持有写锁,其他任何线程执行add或advanceClock方法时会阻塞
    writeLock.lock()
    try {
      while (bucket != null) {
        // 推动时间轮向前"滚动"到Bucket的过期时间点
        timingWheel.advanceClock(bucket.getExpiration())
        // 将该Bucket下的所有定时任务重写回到时间轮
        bucket.flush(reinsert)
        // 读取下一个Bucket对象
        bucket = delayQueue.poll()
      }
    } finally {
      // 释放写锁
      writeLock.unlock()
    }
    true
  } else {
    false
  }
}
image.png

DelayedOperation 类


abstract class DelayedOperation(override val delayMs: Long,
                                lockOpt: Option[Lock] = None)
  extends TimerTask with Logging {
  // 标识该延迟操作是否已经完成
  private val completed = new AtomicBoolean(false)
  // 防止多个线程同时检查操作是否可完成时发生锁竞争导致操作最终超时
  private val tryCompletePending = new AtomicBoolean(false)
  private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
  ......
}
  • forceComplete:强制完成延迟操作,不管它是否满足完成条件。每当操作满足完成条件或已经过期了,就需要调用该方法完成该操作。
  • isCompleted:检查延迟操作是否已经完成。源码使用这个方法来决定后续如何处理该操作。比如如果操作已经完成了,那么通常需要取消该操作。
  • onExpiration:强制完成之后执行的过期逻辑回调方法。只有真正完成操作的那个线程才有资格调用这个方法。
  • onComplete:完成延迟操作所需的处理逻辑。这个方法只会在 forceComplete 方法中被调用。
  • tryComplete:尝试完成延迟操作的顶层方法,内部会调用 forceComplete 方法。
  • maybeTryComplete:线程安全版本的 tryComplete 方法。这个方法其实是社区后来才加入的,不过已经慢慢地取代了 tryComplete,现在外部代码调用的都是这个方法了。
  • run:调用延迟操作超时后的过期逻辑,也就是组合调用 forceComplete + onExpiration。

相关文章

网友评论

    本文标题:Broker是怎么延时处理请求的?

    本文链接:https://www.haomeiwen.com/subject/hrldaktx.html