Kotlin协程实际上是所谓的stackless协程,即没有在线程之上实现类似线程栈的结构,可以认为是一种kotlin语言层面支持的 线程调度框架,使用这个框架,我们可以省去手动书写callback,使代码看上去是同步的
使用了协程以后,原本需要回调实现的非阻塞功能可以用阻塞的方式去写,比如:
launch {
// 异步读时挂起
val bytesRead = inChannel.aRead(buf)
// 读完成后才执行这一行
...
...
process(buf, bytesRead)
// 异步写时挂起
outChannel.aWrite(buf)
// 写完成后才执行这一行
...
...
outFile.close()
}
// 协程挂起后,线程继续执行
println("thread continue")
执行上述代码段时,在协程运行到val bytesRead = inChannel.aRead(buf)
这一句时挂起,线程继续执行协程外的代码,输出thread continue
那么,协程是如何做到这样的“黑科技”呢?网上搜一圈,基本上都会搜到CPS(Continuation-Passing-Style, 续体传递风格),听起来挺玄乎的,什么是CPS呢?
说的简单点,其实就是函数通过回调传递结果,让我们看看这个例子
class Test {
public static long plus(int i1, int i2) {
return i1 + i2;
}
public static void main(String[] args) {
System.out.println(plus(1, 2));
}
}
这个例子是常规的写法,函数plus的结果通过函数返回值的形式返回并进行后续处理(这里仅仅打印),如果把例子改写成CPS风格,则是
class Test {
interface Continuation {
void next(int result);
}
public static void plus(int i1, int i2, Continuation continuation) {
continuation.next(i1 + i2);
}
public static void main(String[] args) {
plus(1, 2, result -> System.out.println(result));
}
}
很简单吧?这就是CPS风格,函数的结果通过回调来传递
协程里通过在CPS的Continuation回调里结合状态机流转,来实现协程挂起-恢复的功能,来看下面的例子
假设我们有一个扩展的挂起函数:
suspend fun <T> CompletableFuture<T>.await(): T
在编译过后,其函数签名将变成
fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
可以看到,入口参数多了一个Continuation类型,这个就是CPS续体,其实也就是我们上面说的回调
现在我们再假设有这么段代码
val a = a()
val y = foo(a).await() // 挂起点 #1
b()
val z = bar(a, y).await() // 挂起点 #2
c(z)
这段代码必须在协程里面执行(launch{}
或者async{}
等的lambda里),且调用了两个await()
挂起函数,其编译后将变成这样的伪代码(因为是编译器直接生成字节码)
class <anonymous_for_state_machine> extends SuspendLambda<...> {
// 状态机当前状态
int label = 0
// 协程的局部变量
A a = null
Y y = null
void resumeWith(Object result) {
if (label == 0) goto L0
if (label == 1) goto L1
if (label == 2) goto L2
else throw IllegalStateException()
L0:
// 这次调用,result 应该为空
a = a()
label = 1
result = foo(a).await(this) // 'this' 作为续体传递
if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
L1:
// 外部代码传入 .await() 的结果恢复协程
y = (Y) result
b()
label = 2
result = bar(a, y).await(this) // 'this' 作为续体传递
if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
L2:
// 外部代码传入 .await() 的结果恢复协程
Z z = (Z) result
c(z)
label = -1 // 没有其他步骤了
return
}
}
可以看到,以两个挂起函数的调用点为分界,生成了一个具有3个状态的状态机类
现在,当协程开始时,我们调用了它的 resumeWith() —— label 是 0,然后我们跳去 L0,接着我们做一些工作,将 label 设为下一个状态—— 1,调用 .await(),如果协程执行挂起就返回。当我们想继续执行时,我们再次调用 resumeWith(),现在它继续执行到了 L1,做一些工作,将状态设为 2,调用 .await(),同样在挂起时返回。下一次它从 L3 继续,将状态设为 -1,这意味着"结束了,没有更多工作要做了"。
其实编译后的代码就是利用这个生成的类作为续体(Continuation)传递了挂起点前后的中间结果,并且通过状态机,来记忆协程恢复后应该执行哪段代码
续体状态机流转
好了,这就是协程挂起和恢复的实现方式了,可以看到,我们并不能把Kotlin协程当作是所谓的“轻量级线程”来解释,它更像是一个以同步方式去写异步方法,并帮助开发者生成回调方法(CPS风格)的线程调度框架,当然本文只分析了协程的挂起和恢复,还有协程的取消操作,上下文(可用于限定协程在某个线程工作)等等功能,原理解析推荐阅读:
Kotlin中文官网的协程设计文档(中文)
Bennyhuo的破解Kotlin协程系列(中文)
协程的使用推荐阅读:
扔物线的Kotlin教学视频及文章(协程部分)
google codelabs kotlin coroutines guide(手把手教你写协程)
网友评论