前言
在开发项目期间 Kotlin 协程是经常使用的异步&并发编程框架。在协程使用过程中,时常会用到挂起函数,而 delay 就是一个挂起函数,在很多业务场景中会使用到,本文通过源码分析了解其背后的实现原理
分析
举个🌰
通常我们的业务场景有这种情况,需要延时执行某些任务
private suspend fun test() {
findViewLifecycleOwner()?.lifecycleScope?.launch() {
print("before delay")
//延时3s
delay(3000)
print("after delay")
}
}
delay 函数则会挂起当前协程,并且会在3s后进行恢复
再来看看 delay 函数的实现
/**
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
*
*/
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
suspendCancellableCoroutine
我们先来看看 suspendCancellableCoroutine 函数,它是 Kotlin 协程库中的一个函数,它是一个挂起函数,用于创建一个可以被取消的挂起点。
这个函数也是协程中经常使用,它可以将异步的回调用同步的方式表达出来,减少回调嵌套
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
/*
* For non-atomic cancellation we setup parent-child relationship immediately
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
* properly supports cancellation.
*/
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
然而 suspendCancellableCoroutine 函数内部是使用了suspendCoroutineUninterceptedOrReturn 函数实现的,而且也是一个挂起函数
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}
suspendCoroutineUninterceptedOrReturn 函数没有实际内容,是因为它是编译器内建函数,它是由 Kotlin 编译器来实现的。主要作用是传递协程上下文,以及判断是否挂起的,或者直接返回结果
再举个🌰
写个👇方法,然后进行反编译看看
private suspend fun test2(){
suspendCancellableCoroutine<String> {
print("suspendCancellableCoroutine test")
}
}
反编译后
// 传递了 Continuation 上下文
private final Object test2(Continuation $completion) {
int $i$f$suspendCancellableCoroutine = false;
int var4 = false;
//封装实体
CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
cancellable$iv.initCancellability();
CancellableContinuation it = (CancellableContinuation)cancellable$iv;
int var7 = false;
//执行block代码
String var8 = "suspendCancellableCoroutine test";
System.out.print(var8);
//获取协程体返回值
Object var10000 = cancellable$iv.getResult();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
//判断是否是挂起,是的话返回挂起状态
return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}
正如上面所说的,suspendCoroutineUninterceptedOrReturn 函数是新增了传递协程上下文,以及判断是否挂起的,或者直接返回结果的逻辑
知道了 suspendCancellableCoroutine 函数的作用后,再回过头看 delay 函数
/**
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
*
*/
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
suspendCancellableCoroutine 函数传递了封装后的协程对象 cont,以及判断 cont 是否会执行挂起
DefaultExecutor
看看 cont.context.delay是啥
//获取delay
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
//默认实现
internal actual val DefaultDelay: Delay = DefaultExecutor
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
init {
incrementUseCount() // this event loop is never completed
}
... 省略很多代码 ...
}
可以看见 cont.context.delay 最终的实现是 DefaultExecutor ,它继承了EventLoopImplBase 和 Runnable
DefaultExecutor 是个单例,里边开启了线程,并且检测队列里任务的情况来决定是否需要挂起线程进行等待
而 scheduleResumeAfterDelay 函数是 EventLoopImplBase 里实现的
public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
continuation.disposeOnCancellation(task)
schedule(now, task)
}
}
}
如果满足时间条件,则创建一个延迟的task,DelayedResumeTask
入队列
看 schedule 函数
public fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
//unpark() 会启动线程
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
else -> error("unexpected result")
}
}
private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
if (isCompleted) return SCHEDULE_COMPLETED
val delayedQueue = _delayed.value ?: run {
_delayed.compareAndSet(null, DelayedTaskQueue(now))
_delayed.value!!
}
//入队列
return delayedTask.scheduleTask(now, delayedQueue, this)
}
实际就是把 DelayedResumeTask 放进 _delayed 队列里面,并且启动 DefaultExecutor 里的线程(如果没有开启的话)
_delayed 是存储延迟执行 task 的队列
还有一个 _queue 是是存储正常任务的队列
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)
// Allocated only only once
private val _delayed = atomic<DelayedTaskQueue?>(null)
...
}
既然上面有入队列,就有出队列
出队列
回到刚刚说的启动了 DefaultExecutor 里的线程,看看它的 run 方法
override fun run() {
ThreadLocalEventLoop.setEventLoop(this)
registerTimeLoopThread()
try {
var shutdownNanos = Long.MAX_VALUE
if (!notifyStartup()) return
while (true) {
Thread.interrupted() // just reset interruption flag
//取队列核心代码
var parkNanos = processNextEvent()
if (parkNanos == Long.MAX_VALUE) {
// nothing to do, initialize shutdown timeout
val now = nanoTime()
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // shut thread down
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
shutdownNanos = Long.MAX_VALUE
//如果返回的时间大于0
if (parkNanos > 0) {
// check if shutdown was requested and bail out in this case
if (isShutdownRequested) return
//挂起线程一段时间
parkNanos(this, parkNanos)
}
}
} finally {
_thread = null // this thread is dead
acknowledgeShutdownIfNeeded()
unregisterTimeLoopThread()
// recheck if queues are empty after _thread reference was set to null (!!!)
if (!isEmpty) thread // recreate thread if it is needed
}
}
取出队列里的 task 逻辑是在 processNextEvent() 函数中,看看它的实现
override fun processNextEvent(): Long {
// unconfined events take priority
if (processUnconfinedEvent()) return 0
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
// 加入正常任务队列
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
val task = dequeue()
if (task != null) {
task.run()
return 0
}
//返回线程需要挂起的时间
return nextTime
}
protected override val nextTime: Long
get() {
if (super.nextTime == 0L) return 0L
val queue = _queue.value
when {
queue === null -> {} // empty queue -- proceed
queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
else -> return 0 // non-empty queue
}
val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
其中流程是:
- 有个死循环一直在从 _delayed 队列里取延迟 task,如果判断延迟时间已经到了才会加入正常任务队列里且移除
- 直到取不出延迟 task 了才跳出循环
- 然后从正常队列里取出任务进行执行
- 执行任务就是在执行 DelayedResumeTask 类里的 run 方法
private inner class DelayedResumeTask(
nanoTime: Long,
private val cont: CancellableContinuation<Unit>
) : DelayedTask(nanoTime) {
override fun run() { with(cont) { resumeUndispatched(Unit) } }
override fun toString(): String = super.toString() + cont.toString()
}
- 可以是看见调用了 resumeUndispatched() 函数,使用协程能经常看见或者使用 resumexxx 函数,它就是协程中的恢复,对应于挂起,它们理应是成对出现的
- resumeUndispatched() 函数最终的实现就是一开始封装 Continuation 协程为 CancellableContinuationImpl 的实现,即是 cont 对象,最终恢复了协程的运行
//恢复协程
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation
resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
processNextEvent() 函数的返回值
- 最后返回下一个 nextDelayedTask 的延迟时间,即是线程挂起时间
- 如果返回的时间大于0,则最终会调用 LockSupport.parkNanos(),将线程挂起一段时间,直到延迟时间结束。
源码看到这里就能知道 delay 函数的背后运行原理了,最终是由 DefaultExecutor 线程整体控制挂起和恢复,而且不会阻塞当前调用方的线程。
总结
delay 函数的总体流程是
- 创建一个延迟任务(DelayedResumeTask),它包含需要延迟执行的逻辑和执行时间信息。
- 将延迟任务添加到延迟队列(_delayed)中。
- 在等待延迟任务执行之前,协程会被暂停(挂起)。
- 有一个单独的 DefaultExecutor 线程定期检查延迟队列中的任务,如果任务的延迟时间到了,就会将任务从延迟队列中移除,并将其放入执行队列中。
- 执行队列中取出任务,并调用其执行逻辑。
- 执行任务的过程也就是协程的恢复过程,一旦任务开始执行,协程会被恢复,继续执行其后续的逻辑。
网友评论