协程这个概念已经出来很长时间了,网上对它的定义是非阻塞式的线程框架,讨论最多的也是协程的挂起、恢复以及线程切换,那到底挂起是个什么样的概念,怎么就挂起了,怎么就又恢复了?
带着这些问题,我走上了不归路......
在开始探索协程挂起、恢复之前,需要先了解一下几个重要的名词和概念。
1. Continuation
Continuation在协程中其实只是一个接口,其作用有点类似RxJava中Observer,当请求成功时,触发onNext继续更新UI或者下一步的操作。只不过在协程中,Continuation包装了协程在挂起之后应该继续执行的代码,在编译的过程中,一个完整的协程被分割切块成一个又一个续体。在 await 函数的挂起结束以后,它会调用 continuation 参数的 resumeWith 函数,来恢复执行 await 函数后面的代码。
2. invokeSuspend
invokeSuspend中包含的便是我们协程体中的代码,内部管理了一个状态值,循环触发invokeSuspend的调用,而后根据不同的状态,做协程的挂起和恢复操作。
这两个名词虽然比较抽象,但是对于后面的分析还是比较重要的,还是先以一个简单的例子开始,慢慢理解。
使用launch
开启一个协程,在协程体中,加入两个挂起函数,loadDataA
和loadDataB
,并且在函数前后,log打印出函数的运行轨迹。如下:
Log.d(TAG, "onCreate: start")
lifecycleScope.launch {
val num = loadDataA()
loadDataB(num)
}
Log.d(TAG, "onCreate: end")
private suspend fun loadDataA():Int {
delay(3000)
Log.d(TAG, "loadDataA: ")
return 1
}
private suspend fun loadDataB(num:Int) {
delay(1000)
Log.d(TAG, "loadDataB: $num")
}
那launch开启协程,内部做了些什么?又是如何处理loadDataA和loadDataB这些挂起函数的?
不妨先在launch处打上断点,Debug看看它的执行路径。
image.png如上图所示,从onCreate开始,一个lifecycleScope.launch 的执行顺序是
launch->start->invoke->startCoroutineCancellable->resumeWith再到最后invokeSuspend方法,
至于invokeSuspend
的作用是什么?这个后面会详细说明。
既然给了大致的执行方向,我们只需要一步一步的跟进,查看内部详细的代码处理。
# Builders.common.kt
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
从launch
开始,在不阻塞当前线程的情况下启动一个新的协程,并将对协程的引用作为Job 返回,CoroutineScope.launch
中传入了三个参数,第一个CoroutineContext
为协程的上下文,第二个CoroutineStart,协程启动选项。 默认值为CoroutineStart.DEFAULT
,第三个block即协程体中的代码,也就是上面例子中的loadDataA
和loadDataB
。
接着coroutine.start(start, coroutine, block)
使用给定的代码块和启动策略启动此协程。
# AbstractCoroutine.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
在initParentJob()
方法之后,调用了start
,这个时候你会发现,已经无法再继续跟进去了。
但是在文章一开始Debug的路径显示,在start后,执行到的是CoroutineStart#invoke()
# CoroutineStart.kt
@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
在invoke中使用此协程的启动策略将相应的块作为协程启动,这里以DEFAULT为例,startCoroutineCancellable()
,而在startCoroutineCancellable
中,创建了Continuation
,且调用resumeWith
来传递请求结果。
# DispatchedContinuation.kt
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
到这里,你可能还是云里雾里,跟进这么多方法,到底有什么用途?不急
我们接着往下看核心部分Continuation
,
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
Continuation
是一个接口,接口内有一个resumeWith
方法,在上面launch启动协程中startCoroutineCancellable
调用了resumeWith
。既然是接口且被调用,那必然是有地方实现了该接口,并且在resumeWith中做了一些事情。
那就来看看实现Continuation接口的地方做了些啥?
Continuation
的具体实现是在ContinuationImpl
类中,
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion)
而ContinuationImpl
继承自BaseContinuationImpl
,在BaseContinuationImpl
中就可以看到resumeWith
的具体实现。
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
在resumeWith实现中,最核心的部分是在while循环中,调用invokeSuspend并且对返回的标志判断。
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
invokeSuspend
这个方法是不是有点熟悉?
其实在文章开头Debug launch时,invokeSuspend
是lifecycleScope.launch
开启协程内部执行的最后一个方法,invokeSuspend
之后即开始执行协程体中的内容。
既然我们已经从上面的分析了解到invokeSuspend
是由Continuation
的resumeWith
而触发,那接下就看看
协程体中的内容是如何执行的?即协程是如何挂起和恢复的?
我们直接将文章开头的案例反编译。
#启动协程
#挂起函数loadDataA
image.png#挂起函数loadDataB
从启动协程launch
开始,到执行resumeWith
,触发invokeSuspend
方法,可以看到在invokeSuspend
中有存储了一个变量label,初始值为0。
1. label=0,则进入case 0分支,
case 0:
ResultKt.throwOnFailure($result);
var4 = MainActivity.this;
this.label = 1;
var10000 = var4.loadDataA(this);
if (var10000 == var3) {
return var3;
}
break;
执行var4.loadDataA
,在loadDataA中同样存在一个label,初始值同样为0,进入loadDataA中的case 0
分支,执行delay()
方法,同时将label置为1,并返回COROUTINE_SUSPENDED
标志。
# loadDataA
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
if (DelayKt.delay(3000L, (Continuation)$continuation) == var4) {
return var4;
}
break;
标识返回成功后,即var10000=COROUTINE_SUSPENDED
,同时var3=COROUTINE_SUSPENDED
,if (var10000 == var3)
则直接return,跳出了launch操作,执行协程外的Log.d(TAG, "onCreate: end")
,同时也将label置为1,这就是挂起的概念。
2. 挂起后,loadDataA中 Delay完3s将开始恢复协程,触发了loadDataA
中的invokeSuspend
方法。
$continuation = new ContinuationImpl(var1) {
// $FF: synthetic field
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return MainActivity.this.loadDataA(this);
}
};
invokeSuspend
中依然是执行MainActivity.this.loadDataA
,只不过此时loadDataA中所保存的label
的值=1,进入case 1
分支,
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
Log.d("MainActivity", "loadDataA: ");
return Boxing.boxInt(1);
case1分支中只做了结果失败时异常的抛出,随后便执行了Log.d("MainActivity", "loadDataA: ");
并返回值Int值。
3. 在触发loadDataA
中的invokeSuspend
时,也触发了launch
协程中的invokeSuspend
,此时label=1
,进入case1
分支,
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
...
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int num = ((Number)var10000).intValue();
var4 = MainActivity.this;
this.label = 2;
if (var4.loadDataB(num, this) == var3) {
return var3;
} else {
return Unit.INSTANCE;
}
同样,对结果做了失败的判断,同时将loadDataA
所返回的Int
值,赋值给了变量num
,开始loadDataB
的执行。
4. loadDataB
中的处理模式与loadDataA
中一致,因为loadDataB之后无挂起函数,则在触发invokeSuspend时,返回的是Unit.INSTANCE
,结束协程运行。
总结一下,
什么是挂起?
简单来说就是判断标识为COROUTINE_SUSPENDED
时,使用Continuation暂存当前协程的状态,而后直接return出当前协程体。
什么是非阻塞?
挂起是在接口的实现invokeSuspend方法中return出去的,而invokeSuspend之外的函数当然还是会继续执行呀。比如说在一个activity的onCreate的方法中,设置一个Button的onClickListener事件,紧跟其后初始化了一个viewModel,这时在onClick里return,那初始化viewModel难道因为return而不执行吗?当然没有影响。这就是挂起为什么是非阻塞式的。
那恢复又是什么?
协程挂起时,使用了Continuation
暂存当前协程的状态,而挂起函数恢复时,会调用Continuation的resumeWith
方法,继而触发invokeSuspend
,根据Continuation所保存的label值,进入不同的分支,恢复之前挂起协程的状态,并且执行下一个状态。
推荐阅读
网友评论