掘金迁移地址:【译文】扒一扒Kotlin Coroutines幕后实现
原文地址: Kotlin Coroutines 幕後那一兩件事
前言
如果你能看完本文并把所有内容都弄懂,你对协程的理解也已经超过大部分人了。
Coroutines是近几年在Kotlin上Google主推的异步问题解决方案,至少在Android R Asynctask被放弃后,打开Android Document看到最显目的提示项目就是导引你至Coroutine的页面教导你怎么使用Coroutine。
Emm….那如果把所有问题简单化,其实大多数碰上异步问题,解决的办法基本上都是callback。
fun longTimeProcess(parameter1 : String, callBack:CallBack<String>){
val result = ""
//Do something long time
callBack.onResponse(result)
}
其实我们一般执行回调,本质上都是callback
,所以很多异步解决方案,追根溯源,会发现他的实现方式仍旧是callback。
不过callback的使用情境、context还有许许多多的用法情况都不同,整体概念也会有出入,所以我们会需要更多的名词来代表这样的情况,因此延伸出更多样的词汇,不过这段就题外话了。
话说回来,上面那段简易的callback,换作是Coroutine会变成是这样:
suspend fun longTimeProcess(parameter1:String):String{
val result =“”
//Do something long time
return result
}
这样写的好处是可以不用自己控制Thread的使用,上面的代码如果直接在主线程执行,可能会造成主线程卡顿,超过5秒喷Exception直接让Process out,所以还会需要额外自己开thread + handler
或是使用Rxjava之类第三方套件去处理。换作是Coroutine,使用起来就简单很多了,被suspend
修饰的函数longTimeProcess,有自己的作用域(Scope),用scope launch
里头执行该function,利用这个function回传的数据做该在main thread上解决的事情,问题解决,就是如此的简单。
那问题来了。
Coroutine到底是怎么运作的?究竟是甚么神奇的魔法让他可以这么的方便可以不用写那么多东西呢?
记得某次面试里有提到这个问题,但我只知道他是个有限状态机,然后就…
恩,我那时的表情应该跟King Crimson有那么几分神似就是了。
Coroutine概念
维基百科上其实有解释了Coroutine的实作概念:
var q := new queue
coroutine produce
loop
while q is not full
create some new items
add the items to q
yield to consume
coroutine consume
loop
while q is not empty
remove some items from q
use the items
yield to produce
概念是,这有个queue是空的,那是先跑coroutine product还是coroutine consume其实无所谓,总之随便跑一个,先从coroutine product开始好了。
coroutine produce在queue没满时,会产生一些items,然后加入queue里头,直到queue满为止,接着把程序让给coroutine consume。
coroutine consume在queue不是空的时候,会移除(消费)一些items,直到queue空为止,接着把程序让给coroutine produce,如此反复,这个世界的经济得以维持。
那这边可以看出,当coroutine produce
碰到queue是满的时候会直接把程序让给coroutine consume
;相对的,若coroutine consume
在碰到queue是空的时候,会直接把程序让给coroutine produce
。
那么,以Kotlin Coroutine来说,queue的是空是满的条件会变成是method的状态是否suspend,那因为上面这个程序很明显会是无限循环,多数我们在开发时会不需要无限的循环,那怎么样才能让这种来回传接球的形式有个终点呢?
答案就是有限状态机,接下来这篇文章会慢慢地解释。
有这么个东西叫做 Continuation
很多时候,原本很麻烦的事情突然变得简单了,其实不是什么都不用做,而是事情有人帮你做了,Coroutine也是,它帮你把写一堆callback
的麻烦事给做掉了。
等等,Compiler把写一堆的callback的麻烦事给做掉了,那意思是…
没错,Coroutine本质上还是callback
,只是编译器帮你写了。
我本来是想说从CoroutineScope.Launch
下去追的,追到IntrinsicsJvm
,这东西叫Intrinsic这东西有很大的机率是给编译器用的,追到这里,大概就可以知道,suspend fun
会在编译的过程转成Continuation
.
但后来换个方向去想,其实也不用这么麻烦,因为Kotlin是可以给Java呼叫的,那Java比较少这种语法糖转译的东西,也就是说,透过Java呼叫suspend fun,就可以知道suspend fun真正的模样。
这边先随便写一个suspend fun。
suspend fun getUserDescription(name:String,id:Int):String{
return ""
}
在 Java 中调用的時候是如下这样:
instance.getUserDescription("name", 0, new Continuation<String>() {
@NotNull
@Override
public CoroutineContext getContext() {
return null;
}
@Override
public void resumeWith(@NotNull Object o) {
}
});
return 0;
我们可以看到,其实suspend fun
就是一般的function后头加上一个Continuation
。
总之得到一个线索,这个线索就是Continuation,它是个什么玩意呢?
它是一个 interface
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
它代表的是Coroutine
的runBlock
在suspend
状态中,要被唤醒的callback
。
那注意这边提到状态了,大伙都知道Coroutine会是个状态机,那具体是怎么个状态呢?这个稍后提。
那如果硬要在java file里头使用GlobalScope.launch,那会长成这样:
BuildersKt.launch(GlobalScope.INSTANCE,
Dispatchers.getMain(),//context to be ran on
CoroutineStart.DEFAULT,
new Function2<CoroutineScope,Continuation<? super Unit>,String>() {
@Override
public String invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) {
return "";
}
}
);
这样就行了吗?这样好像没啥效果最后会回一个空字串就是了,但这里就会发现,如果用lanuch会需要用到一个Function去传递一个continuation。这样看还是蒙,没关系,咱们继续看下去。
Continuation到底怎么运行?
那这边简单用一个suspend:
fun main() {
GlobalScope.launch {
val text = suspendFunction("text")
println(text) // print after delay
}
}
suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){
val result = doSomethingLongTimeProcess(text)
result
}
用 Kotlin Bytecode
去 decompile
會得到這個:
public static final void main() {
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
private CoroutineScope p$;
Object L$0;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
CoroutineScope $this$launch;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
$this$launch = this.p$;
this.L$0 = $this$launch;
this.label = 1;
var10000 = CoroutineTestKt.suspendFunction("text", this);
if (var10000 == var5) {
return var5;
}
break;
case 1:
$this$launch = (CoroutineScope)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
String text = (String)var10000;
boolean var4 = false;
System.out.println(text);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.p$ = (CoroutineScope)value;
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
}
另外一个是 suspendFunction 的 decompile code
:
public static final Object suspendFunction(@NotNull final String text, @NotNull Continuation $completion) {
return BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
private CoroutineScope p$;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineScope $this$withContext = this.p$;
String result = CoroutineTestKt.doSomethingLongTimeProcess(text);
return result;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.p$ = (CoroutineScope)value;
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), $completion);
}
字节码反编译成 Java 这种事,我们干过很多次了。跟往常不同的是,这次我不会直接贴反编译后的代码,因为如果我直接贴出反编译后的 Java 代码,估计会吓退一大波人。协程反编译后的代码,逻辑实在是太绕了,可读性实在太差了。没关系,我们直接梳理解释一下流程。
反编译代码中我们看到一个 switch(this.label) , 这就是大名鼎鼎的 Coroutine
的状态机
了,Kotlin编译器会在编译时产生一个label
,这个label
就是runBlock
里边执行到第几段的状态了。
那具体会有几个状态呢?其实在runBlock
里边有几个suspend
就会对应有几个状态机,举个例子:
GlobalScope.launch {
test()
test()
test()
test()
}
fun test(){}
如上代码会有几个呢?
答案是一个,因為這 test() 不是挂起函数(suspend function)
,它不需要挂起操作(suspended)。
如果换成是这样?
GlobalScope.launch {
test()
test()
test()
test()
}
suspend fun test(){}
答案是五个。
GlobalScope.launch {
// case 0
test() // case 1 receive result
test() // case 2 receive result
test() // case 3 receive result
test() // case 4 receive result
}
因為四个 test()
都有可能获得 suspended
的状态,所以需要五个执行状态的,case 0
用于初始化,case 1– 4
用于结果获取。
那状态何时会改变呢?
答案是:invokeSuspend
执行时。
label34: {
label33: {
var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
$this$launch = this.p$;
this.L$0 = $this$launch;
this.label = 1;
if (CoroutineTestKt.test(this) == var3) {
return var3;
}
break;
case 1://...ignore
break;
case 2://...ignore
break label33;
case 3://...ignore
break label34;
case 4://...ignore
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
this.L$0 = $this$launch;
this.label = 2;
if (CoroutineTestKt.test(this) == var3) {
return var3;
}
}
this.L$0 = $this$launch;
this.label = 3;
if (CoroutineTestKt.test(this) == var3) {
return var3;
}
}
this.L$0 = $this$launch;
this.label = 4;
if (CoroutineTestKt.test(this) == var3) {
return var3;
} else {
return Unit.INSTANCE;
}
这部分比较有意思的地方是,这些状态还有 call method
的都不在 switch case
里面,这其实跟 Bytecode
有关,主要是因为这个结果是 反编译
出來的东西,所以会是这样的叠加方式。
我们可以看到,在状态机改变时:
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//...ignore
this.label = 1;
if (CoroutineTestKt.test(this) == var3) {
return var3;
}
根据上述代码可以看出, 编译器内部有一个函数IntrinsicsKt.getCOROUTINE_SUSPENDED()
该函数代表当前的状态是否挂起。如果它回传的是 getCOROUTINE_SUSPENDED
,代表这个 function
处在 挂起(suspended)
的状态,意味着它可能当前正在进行耗时操作。这时候直接返回 挂起
状态,等待下一次被 调用(invoke)
。
那什么时候会再一次被 调用(invoke) 呢?
这时候就要看传入到该挂起函数的的 Continuation
,這裡可以觀察一下 BaseContinuationImpl
的 resumeWith
的操作:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
//...ignore
}
原则上 resumeWith
在一开始 Coroutine
被创建时就会执行(所以需要 case 0 做初始化
),可以看到 invokeSuspend
会被执行到。(probeCoroutineResumed 那個看起來是 debug 用的請無視
),通过执行 invokeSuspend
开始执行状态机,如果该 continuation
的状态是挂起,就会执行return
,重新执行 invokeSuspend
,等下一次被唤醒,再次被唤醒后,继续执行,直到得到结果,并且将结果通过 continuation((name in completion)
的 resumeWith
返回结果,结束此次执行,接着继续执行挂起函数的的 invokeSuspend
,如此反复直至最终结束。
到這裡,我們知道了, 被
suspend标记的函数内部是通过状态机才实现的挂起恢复的,并且利用状态机来记录Coroutine执行的状态
。
执行挂起函数时可以得到它的状态为:getCOROUTINE_SUSPENDED
。
不过又有问题来了,当挂起函数判断条件为:getCOROUTINE_SUSPENDED
时执行了 return
,代表它已经结束了,那它怎么能继续执行呢?而且还有办法在执行完后通知协程。
这里我们拿一段代码来看看:
suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){
val result = doSomethingLongTimeProcess(text)
result //result 是個 String
}
它 decompile 後:
public static final Object suspendFunction(@NotNull final String text, @NotNull Continuation $completion) {
return BuildersKt.withContext(
(CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
//...ignore
}), $completion);
}
会发现,该函数 return
的不是 String
而是一个Object
,那这个Object
是什么呢?其实就是COROUTINE_SUSPENDED
。
要证明这点其实很简单,如下代码,调用该 suspendFunction
就可以了
Object text = instance.suspendFunction("", new Continuation<String>() {
@NotNull
@Override
public CoroutineContext getContext() {
return Dispatchers.getMain();
}
@Override
public void resumeWith(@NotNull Object o) {
}
});
System.out.println(text);
結果:
COROUTINE_SUSPENDED
Process finished with exit code 0
PS:如果该函数时一个普通函数,没有标记
suspend
则会直接返回结果。
根据上边我们这么多的分析,我们可以解释那段代码了。
fun main() {
GlobalScope.launch {
val text = suspendFunction("text")
println(text) // print after delay
}
}
suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){
val result = doSomethingLongTimeProcess(text)
result
}
首先,Kotlin编译器会把 main()
里面的代码反编译生成一个Continuation,而 launch block
的部分生成一個有限的状态机,并包装进 Continuation
里面那个叫 invokeSuspend(result)
的方法里头,并做为初次 resumeWith
。
Continuation { // GlobalScope.Lanuch()
var label = 0
fun invokeSuspend(result:Any):Any{
when(label){
0->{
val functionResult = suspendFunction("text",this)
lable = 1
if(functionResult == COROUTINE_SUSPENDED){
return functionResult
}
}
1->{
throwOnFailure(result)
break
}
}
val text = result as String
print(text)
}
}
invokeSuspend(result)
会在该 Continuation
的 resumeWith
执行的时候执行。
Continuation { // GlobalScope.Lanuch()
var label = 0
fun invokeSuspend(result:Any):Any{
when(label){
0->{
val functionResult = suspendFunction("text",this)
lable = 1
if(functionResult == COROUTINE_SUSPENDED){
return functionResult
}
}
1->{
throwOnFailure(result)
break
}
}
val text = result as String
print(text)
}
}
第一次执行 invokeSuspend(result)
的时候,会执行到 suspendFunction(String)
,并传入包裝好的 Continuation
。
Continuation { // suspendFunction(text)
fun invokeSuspend(result:Any):Any{
when(label){
0->{
val text = doSomethingLongTimeProcess(context)
return 後執行 continuation.resultWith(text)
}
}
}
}
suspendFunction
自己本身也是一個挂起函数,所以它也会包裝成一个 Continuation
(但这边就单纯很多,虽然也会生成状态机,但其实就是直接跑doSomethingLongTimeProcess()
)。
Continuation { // GlobalScope.Lanuch()
var label = 0
fun invokeSuspend(result:Any):Any{
when(label){
0->{
val functionResult = suspendFunction("text",this)
lable = 1
if(functionResult == COROUTINE_SUSPENDED){
return functionResult
}
}
1->{
throwOnFailure(result)
break
}
}
val text = result as String
print(text)
}
}
因为会进行耗时操作,所以直接回传COROUTINE_SUSPENDED
,让原先执行该挂起函数的Thread
先 return
并执行其他东西,而 suspendFunction
则在另一条 Thread
上把耗时任务完成。
Continuation { // GlobalScope.Lanuch()
var label = 0
fun invokeSuspend(result:Any):Any{
when(label){
0->{
val functionResult = suspendFunction("text",this)
lable = 1
if(functionResult == COROUTINE_SUSPENDED){
return functionResult
}
}
1->{
throwOnFailure(result)
break
}
}
val text = result as String
print(text)
}
}
等待 suspendFunction
的耗时任务完成后,利用传入的 Continuation
的 resumeWith
把结果传入,这个动作会执行到挂起函数的invokeSuspend(result)
,并传入结果,该动作就能让挂起函数得到suspendFunction(String)
的结果。
PS:上面那段代码实际上是伪代码,实际业务会比这复杂的多
所以事实上,挂起函数就是我把我的 callback
給你,等你结束后再用我之前给你的 callback
回调给我,你把你的 callback
給我,等我结束后我用之前你给我的 callback
通知你。
挂起函数时如何自行切换线程的?
原则上,挂起函数在执行时,就会决定好要用哪个 Dispatcher
,然后就会建立挂起点,一般情况下,会走到 startCoroutineCancellable
,然后执行createCoroutineUnintercepted
,也就是上面提到的:resumeWith
和invokeSuspend
。
我们进入到startCoroutineCancellable
内部再看看:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
createCoroutineUnintercepted
最后会产出一个 Continuation
,而resumeCancellableWith
其实就是我们前面说到的初始化操作, 這行会去执行状态机 case 0
。
至于 intercepted()
,到底要拦截啥,其实就是把生成的 Continuation
拦截给指定的 ContinuationInterceptor
(这东西包裝在 CoroutineContext
里面,原则上在指定 Dispatcher
的时候就已经建立好了)
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
这里可以注意到 interceptContinuation(Continuation)
,可以用他追下去,发现他是 ContinuationInterceptor
的方法 ,再追下去可以发现CoroutineDispatcher
继承了他:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
可以发现该动作产生了一个 DispatchedContinuation
,看看 DispatchedContinuation
,可以注意到刚才有提到的 resumeCancellableWith
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
原则上就是利用 dispatcher
来決定需不需要 dispatch
,沒有就直接执行了resumeUndispatchedWith
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWith(result: Result<T>) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWith(result)
}
}
其实就是直接跑 continuation
的 resumeWith
。
那回头看一下,其实就可以发现是 CoroutineDispatcher
决定要用什么 Thread
了。
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
@InternalCoroutinesApi
public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
}
}
其实知道这个东西后,就可以向下去找它的 Child ,就能找到 HandlerDispatcher
了。
isDispatchNeeded
就是说是否需要切换线程
。
dispatch
则是切换线程
的操作。
可以看到这两个方法在 HandlerDispatcher
的执行:
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
可以看到CoroutineContext
根本没有用到。
为什么呢?其实原因主要是: 挂起函数是
设计给 Kotlin
用的,并不是专门设计给 Android
用的,所以 Android
要用的话,还是需要实现 CoroutineDispatcher
的部分,这实际上是两个体系的东西。那 CoroutineDispatcher
的 dispatch
有提供 CoroutineContext
,但不见的 Android
这边会用到,所以就有这个情況了。
其他诸如 Dispatcher.Default
,他用到了 线程池(Executor)
,Dispatcher.IO
则是用到了一个叫 工作队列(WorkQueue)
的东西。
所以每一个 Dispatcher
都有自己的一套实现,目前有提供四种 Dispatcher
。
网友评论