前段时间在项目中引入了 Kotlin Coroutine,那么也来谈谈对它的理解。所谓窥一斑而知全豹,首先尝试透过一个🌰来窥探协程的启动流程,直接通过 GlobalScope.launch 启动了一个顶级协程并指定在主线程中执行:
fun test() {
GlobalScope.launch(Dispatchers.Main) {
Log.d("GlobalScopeTest", "Log in GlobalScope")
}
Log.d("GlobalScopeTest", "Log after GlobalScope")
}
上面的代码输出结果为:
Log after GlobalScope
Log in GlobalScope
为什么会出现这样的执行顺序?协程体内的代码逻辑是透过什么机制分发到主线程执行?要解答这些问题,还得从 GlobalScope.launch 寻找突破口。
1、GlobalScope.launch:协程入口
launch 是个扩展函数、启动协程并且返回一个 job 对象。这里先大概了解返回的 job 可以方便地对协程进行追踪,取消等操作。
launch 源码:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
// 构建 AbstractCoroutine 对象,并由此开启协程任务
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
看了 launch 方法,会有一个疑惑:经过编译后,协程把我们的代码逻辑封装成了一个 SuspendLambda 类,那么必然需要该类的相关信息才可能进行回调执行逻辑,但是在入口处并没有看到相关参数,难道还有什么黑科技能够获取到类相关信息?
事实上反编译 class 文件,发现 CoroutineScope.launch 方法签名和源码实现略有不同,其中多了一个参数:Function2<? super CoroutineScope, ? super Continuation<? super T>, ? extend Object>。 而生成的 SuspendLambda 类就是 Function2 的实现类。
没错,在调用 CoroutineScope.launch 的时候,传入的 Function2 的参数值就是生成的 SuspendLambda 对象!所以在这里并没有什么黑科技,只是由于编译器的参与,隐藏了其中一些细节。
反编译得到的 launch 源码:
@NotNull
public static final Job launch(@NotNull CoroutineScope $this$launch, @NotNull CoroutineContext context, @NotNull CoroutineStart start, @NotNull Function2<? super CoroutineScope, ? super Continuation<? super Unit>, ? extends Object> block) {
StandaloneCoroutine coroutine;
Intrinsics.checkParameterIsNotNull($this$launch, "$this$launch");
Intrinsics.checkParameterIsNotNull(context, "context");
Intrinsics.checkParameterIsNotNull(start, "start");
Intrinsics.checkParameterIsNotNull(block, "block");
CoroutineContext newContext = CoroutineContextKt.newCoroutineContext($this$launch, context);
if (start.isLazy()) {
coroutine = new LazyStandaloneCoroutine(newContext, block);
} else {
coroutine = new StandaloneCoroutine(newContext, true);
}
coroutine.start(start, coroutine, block);
return coroutine;
}
回到 launch ,方法最后调用 start 启动了协程。层层追踪最后调用了 startCoroutineCancellable:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
}
观察方法名大概能得出一个判断:
- createCoroutineUnintercepted:首先创建了一个非拦截的协程 Continuation。
- intercepted:添加拦截器 Continuation。
- resume:正式启动协程。
看来要知道更多细节,还要从这几个方法入手。
2、createCoroutineUnintercepted:构建任务层对象
createCoroutineUnintercepted 在 jvm 的实现在 IntrinsicsJvm 类中。在这里,launch 处传入的 Function2 参数起了作用,通过这个对象调用 create() 来构建 编译器生成的 SuspendLambda 对象。
@SinceKotlin("1.3")
public fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
// 创建 SuspendLambda 对象
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
反编译得到的 create 方法 :
@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
// 将Continuation传入作为completion参数值(静态代理)
Continuation coroutineTest$test$1 = new CoroutineTest$test$1(this.$context, completion);
CoroutineScope coroutineScope = (CoroutineScope) value;
coroutineTest$test$1.p$ = (CoroutineScope) value;
return coroutineTest$test$1;
}
所以 launch 传入的 Function2 参数作用是构建出编译器为我们生成的 SuspendLambda 对象,后续才能回调挂起的逻辑。那么 SuspendLambda.intercepted() 又做了什么?
3、Continuation.intercepted:构建任务调度层对象
追踪 intercepted 调用链,最终调用了 ContinuationImpl.intercepted :
public fun intercepted(): Continuation<Any?> =
intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
上面代码关键点就在于 (context[ContinuationInterceptor]?.interceptContinuation(this),context[ContinuationInterceptor] 得到的是 HandlerContext 对象(why?)而 handlerContext 是 CoroutineDispatcher 的子类(CoroutineDispatcher 可以理解为线程级别的任务调度类,决定了任务在哪个线程中执行),CoroutineDispatcher.interceptContinuation() 返回了 DispatchedContinuation 对象。
why HandlerContext handlerContext = context[ContinuationInterceptor] ?
栗子中我们调用 launch 传入了 Dispatchers.Main 作为 context,CoroutineContext 可以理解为一个数据结构,其中键为 ContinuationInterceptor,值为 Dispatchers.Main。所以context[ContinuationInterceptor] 得到的是 Dispatchers.Main,即为 HandlerContext。
intercepted 就是通过静态代理得到了 DispatchedContinuation 对象。接下来只需要通过 resumeCancellable 开启协程。
4、DispatchedContinuation.resumeCancellable:开启协程任务
DispatchedContinuation 为任务调度类,将具体任务分配给 Dispatcher(也就是上面通过 context[ContinuationInterceptor] 得到的对象) 执行。resumeCancellable 内部调用 dispatcher.dispatch 将任务交给了 Dispatcher,方法第二个参数 this 指代 runnable 对象。
resumeCancellable 的实现:
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
// 指定需要线程调度,默认为true
if (dispatcher.isDispatchNeeded(context)) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
// 不需要线程调度
executeUnconfined(value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}
HandlerContext 的 dispatch 方法实现如下,通过 handler 把任务抛出去:
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
DispatchedContinuation 内 runnable.run 方法实现如下:
public final override fun run() {
val taskContext = this.taskContext
var exception: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
val context = continuation.context
val job = if (resumeMode.isCancellableMode) context[Job] else null
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
val exception = getExceptionalResult(state)
if (exception != null)
continuation.resumeWithStackTrace(exception)
else
// 回调suspendLambda.resumeWith()
continuation.resume(getSuccessfulResult(state))
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
exception = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(exception, result.exceptionOrNull())
}
}
run 方法内部通过调用 continuation.resume 回调整条链路的任务逻辑。
continuation.resume 的主要下游调用链路:continuation.resume() -> continuation.resumeWith() -> suspendLambda.invokeuspend() -> AbstractContinue.resumeWith()。
其中 suspendLambda.invokeuspend 内部封装的就是我们栗子中的逻辑: Log.d("GlobalScopeTest", "Log in GlobalScope")
从 launch 开启协程到如何回调到我们的逻辑大致流程已经理清。由于这个栗子指定了主线程作为运行环境。还有更多关于协程如何进行线程调度、多线程下函数挂起、恢复等流程在这里未涉及到。不过我们大概可以推测对于指定IO线程执行的协程体,协程必然维护了自己的线程池作为协程的运行环境。
小结
- launch 方法还有一个隐式参数:Function2,也就是编译器生成 SuspendLambda 的子类对象。为后续构建真实可用的 SuspendLambda 对象做铺垫。
- 通过静态代理构建了三层 Continuation 分别负责不同的职责:AbstractCoroutine(协程状态处理层) -> SuspendLambda(逻辑执行层) -> CoroutineDispatcher(任务调度层)
- 任务最终交由 CoroutineDispatcher 执行。最终脱离不开线程级别的任务调度。
网友评论