今天我们来聊聊Kotlin
的协程Coroutine
。
如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine?
如果你已经接触过协程,但对协程的原理存在疑惑,那么在阅读本篇文章之前推荐你先阅读下面的文章,这样能让你更全面更顺畅的理解这篇文章。
Kotlin协程实现原理:Suspend&CoroutineContext
Kotlin协程实现原理:CoroutineScope&Job
如果你已经接触过协程,相信你都有过以下几个疑问:
- 协程到底是个什么东西?
- 协程的
suspend
有什么作用,工作原理是怎样的? - 协程中的一些关键名称(例如:
Job
、Coroutine
、Dispatcher
、CoroutineContext
与CoroutineScope
)它们之间到底是怎么样的关系? - 协程的所谓非阻塞式挂起与恢复又是什么?
- 协程的内部实现原理是怎么样的?
- ...
接下来的一些文章试着来分析一下这些疑问,也欢迎大家一起加入来讨论。
ContinuationInterceptor
看到Interceptor
相信第一印象应该就是拦截器,例如在Okhttp
中被广泛应用。自然在协程中ContinuationInterceptor
的作用也是用来做拦截协程的。
下面来看下它的实现。
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
* This function is invoked by coroutines framework when needed and the resulting continuations are
* cached internally per each instance of the original [continuation].
*
* This function may simply return original [continuation] if it does not want to intercept this particular continuation.
*
* When the original [continuation] completes, coroutine framework invokes [releaseInterceptedContinuation]
* with the resulting continuation if it was intercepted, that is if `interceptContinuation` had previously
* returned a different continuation instance.
*/
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}
只给出了关键部分,ContinuationInterceptor
继承于CoroutineContext.Element
,所以它也是CoroutineContext
,同时提供了interceptContinuation
方法,先记住这个方法后续会用到。
大家是否还记得在Kotlin协程实现原理系列的第一篇文章中,我们分析了CoroutineContext
的内部结构,当时提到了它的plus
方法,就是下面这段代码
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
在这里第一次看到了ContinuationInterceptor
的身影,当时核心是为了分析CoroutineContext
,所以只是提了plus
方法每次都会将ContinuationInterceptor
添加到拼接链的尾部。
不知道有没有老铁想过这个问题,为什么要每次新加入一个CoroutineContext
都要调整ContinuationInterceptor
的位置,并将它添加到尾部?
这里其实涉及到两点。
其中一点是由于CombinedContext
的结构决定的。它有两个元素分别是left
与element
。而left
类似于前驱节点,它是一个前驱集合,而element
只是一个纯碎的CoroutineContext
,而它的get
方法每次都是从element
开始进行查找对应Key
的CoroutineContext
对象;没有匹配到才会去left
集合中进行递归查找。
所以为了加快查找ContinuationInterceptor
类型的实例,才将它加入到拼接链的尾部,对应的就是element
。
另一个原因是ContinuationInterceptor
使用的很频繁,因为每次创建协程都会去尝试查找当前协程的CoroutineContext
中是否存在ContinuationInterceptor
。例如我们通过launch
来看协程的启动。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
如果你使用launch
的默认参数,那么此时的Coroutine
就是StandaloneCoroutine
,然后调用start
方法启动协程。
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
在start
中进入了CoroutineStart
,对应的就是下面这段代码
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
因为我们使用的是默认参数,所以这里对应的就是CoroutineStart.DEFAULT
,最终来到block.startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
}
在这里我们终于看到了intercepted
。
首先通过createCoroutineUnintercepted
来创建一个协程(内部具体如何创建的这篇文章先不说,后续文章会单独分析),然后再调用了intercepted
方法进行拦截操作,最后再resumeCancellable
,这个方法最终调用的就是Continuation
的resumeWith
方法,即启动协程。
所以每次启动协程都会自动回调一次resumeWith
方法。
今天的主题是ContinuationInterceptor
所以我们直接看intercepted
。
public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
发现它是一个expect
方法,它会根据不同平台实现不同的逻辑。因为我们是Android
所以直接看Android
上的actual
的实现
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
最终来到ContinuationImpl
的intercepted
方法
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
在这里看到了熟悉的context
,获取到ContinuationInterceptor
实例,并且调用它的interceptContinuation
方法返回一个处理过的Continuation
。
多次调用
intercepted
,对应的interceptContinuation
只会调用一次。
所以ContinuationInterceptor
的拦截是通过interceptContinuation
方法进行的。既然已经明白了它的拦截方式,我们自己来手动写一个拦截器来验证一下。
val interceptor = object : ContinuationInterceptor {
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
println("intercept todo something. change run to thread")
return object : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("create new thread")
thread {
continuation.resumeWith(result)
}
}
}
}
}
println(Thread.currentThread().name)
lifecycleScope.launch(interceptor) {
println("launch start. current thread: ${Thread.currentThread().name}")
withContext(Dispatchers.Main) {
println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
}
launch {
println("new continuation todo something. current thread: ${Thread.currentThread().name}")
}
println("launch end. current thread: ${Thread.currentThread().name}")
}
这里简单实现了一个ContinuationInterceptor
,如果拦截成功就会输出interceptContinuation
中对应的语句。下面是程序运行后的输出日志。
main
// 第一次launch
intercept todo something. change run to thread
create new thread
launch start. current thread: Thread-2
new continuation todo something in the main thread. current thread: main
create new thread
// 第二次launch
intercept todo something. change run to thread
create new thread
launch end. current thread: Thread-7
new continuation todo something. current thread: Thread-8
分析一下上面的日志,首先程序运行在main
线程,通过lifecycleScope.launch
启动协程并将我们自定义的intercetpor
加入到CoroutineContext
中;然后在启动的过程中发现我们自定义的interceptor
拦截成功了,同时将原本在main
线程运行的程序切换到了新的thread
线程。同时第二次launch
的时候也拦截成功。
到这里就已经可以证明我们上面对ContinuationInterceptor
理解是正确的,它可以在协程启动的时候进行拦截操作。
下面我们继续看日志,发现withContext
并没有拦截成功,这是为什么呢?注意看Dispatchers.Main
。这也是接下来需要分析的内容。
另外还有一点,如果细心的老铁就会发现,launch start
与launch end
所处的线程不一样,这是因为在withContext
结束之后,它内部还会进行一次线程恢复,将自身所处的main
线程切换到之前的线程,但为什么又与之前launch start
的线程不同呢?
大家不要忘了,协程每一个挂起后的恢复都是通过回调resumeWith
进行的,然而外部launch
协程我们进行了拦截,在它返回的Continuation
的resumeWith
回调中总是会创建新的thread
。所以发生这种情况也就不奇怪了,这是我们拦截的效果。
整体再来看这个例子,它是不是像一个简易版的协程的线程切换呢?
CoroutineDispatcher
现在我们来看Dispatchers.Main
,为什么它会导致我们拦截失败呢?要探究原因没有直接看源码更加直接有效的。
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
主要看它的类型,它返回的是MainCoroutineDispatcher
,然后再看它是什么
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}
发现MainCoroutineDispatcher
继承于CoroutineDispatcher
,主角登场了,但还不够我们继续看CoroutineDispatcher
是什么
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
真想已经浮出水面了,原来CoroutineDispatcher
实现了ContinuationInterceptor
,说明CoroutineDispatcher
也具有拦截器的功能。然后再结合CoroutineContext
的性质,就很好解释为什么我们自定义的拦截器没有生效。
原因就是它与我们自定义的拦截器一样都实现了ContinuationInterceptor
接口,一旦使用Dispatchers.Main
就会替换掉我们自定义的拦截器。
因果关系弄明白了现在就好办了。我们已经知道它具有拦截功能,再来看CoroutineDispatcher
提供的另外几个方法isDispatchNeeded
与dispatch
。
我们可以大胆猜测,isDispatchNeeded
就是判断是否需要分发,然后dispatch
就是如何进行分发,接下来我们来验证一下。
ContinuationInterceptor
重要的方法就是interceptContinuation
,在CoroutineDispatcher
中直接返回了DispatchedContinuation
对象,它是一个Continuation
类型。那么自然重点就是它的resumeWith
方法。
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
这里我们看到了isDispatchNeeded
与dispatch
方法,如果不需要分发自然是直接调用原始的continuation
对象的resumeWith
方法,也就没有什么类似于线程的切换。
那什么时候isDispatcheNeeded
为true
呢?这就要看它的dispatcer
是什么。
由于现在我们是拿Dispatchers.Main
作分析。所以这里我直接告诉你们它的dispatcher
是HandlerContext
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
它继承于HandlerDispatcher
,而HandlerDispatcher
继承于MainCoroutineDispatcher
。
条件都符合,我们直接看isDispatchNeeded
方法返回true
的逻辑。
首先通过invokeImmediately
判断,它代表当前线程是否与自身的线程相同,如何你外部使用者能够保证这一点,就可以直接使用Dispatcher.Main.immediate
来避免进行线程的切换逻辑。当然为了保证外部的判断失败,最后也会通过Looper.myLooper() != handler.looper
来进行校正。对于Dispatchers.Main
这个的handle.looper
自然是主线程的looper
。
如果不能保证则invokeImmediately
为false
,直接进行线程切换。然后进入dispatch
方法,下面是Dispatchers.Main
中dispatch
的处理逻辑。
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
这个再熟悉不过了,因为这个时候的handler.post
就是代表向主线程推送消息,此时的block
将会在主线程进行调用。
这样线程的切换就完成。
所以综上来看,CoroutineDispatcher
为协程提供了一个线程切换的统一判断与执行标准。
首先在协程进行启动的时候通过拦截器的方式进行拦截,对应的方法是interceptContinuation
,然后返回一个具有切换线程功能的Continuation
,在每次进行resumeWith
的时候,内部再通过isDispatchNeeded
进行判断当前协程的运行是否需要切换线程。如果需要则调用dispatch
进行线程的切换,保证协程的正确运行。
如果我要自定义协程线程的切换逻辑,就可以通过继承于CoroutineDispatcher
来实现,将它的核心方法进行自定义即可。
当然,如果你是在Android
中使用协程,那基本上是不需要自定义线程的切换逻辑。因为kotlin
已经为我们提供了日常所需的Dispatchers
。主要有四种分别为:
-
Dispatchers.Default
: 适合在主线程之外执行占用大量CPU
资源的工作 -
Dispatchers.Main
:Android
主线程 -
Dispatchers.Unconfined
: 它不会切换线程,只是启动一个协程进行挂起,至于恢复之后所在的线程完全由调用它恢复的协程控制。 -
Dispatchers.IO
: 适合在主线程之外执行磁盘或网络I/O
最后我们再来简单提一下withContext
。
withContext
CoroutineDispatcher
虽然能够提供线程的切换,但这只是单方向的,因为它没有提供线程的恢复。
试想一下,我们有个网络请求,我们通过CoroutineDispatcher
将线程切换到Dispatchers.IO
,当拿到请求成功的数据之后,所在的线程还是IO
线程,这样并不能有利于我们UI
操作。所以为了解决这个问题kotlin
提供了withContext
,它不仅能够接受CoroutineDispatcher
来帮助我们切换线程,同时在执行完毕之后还会帮助我们将之前切换掉的线程进恢复,保证协程运行的连贯性。这也是为什么官方推荐使用withContext
进行协程线程的切换的原因。
而withContext
的线程恢复原理是它内部生成了一个DispatchedCoroutine
,保存切换线程时的CoroutineContext
与切换之前的Continuation
,最后在onCompletionInternal
进行恢复。
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally) {
val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
uCont.resumeUninterceptedWithExceptionMode(exception, mode)
} else {
uCont.resumeUninterceptedMode(state as T, mode)
}
}
这个uCont
就是切换线程之前的Continuation
。具体实现就不在这分析了,感兴趣的老铁可以自己翻一翻源码。
本篇文章主要介绍了ContinuationInterceptor
作用与如何拦截协程的,同时也分析了CoroutineDispatcher
内部结构,进一步剖析了协程线程切换的原理。希望对学习协程的伙伴们能够有所帮助,敬请期待后续的协程分析。
项目
android_startup: 提供一种在应用启动时能够更加简单、高效的方式来初始化组件,优化启动速度。不仅支持Jetpack App Startup
的全部功能,还提供额外的同步与异步等待、线程控制与多进程支持等功能。
AwesomeGithub: 基于Github
客户端,纯练习项目,支持组件化开发,支持账户密码与认证登陆。使用Kotlin
语言进行开发,项目架构是基于Jetpack&DataBinding
的MVVM
;项目中使用了Arouter
、Retrofit
、Coroutine
、Glide
、Dagger
与Hilt
等流行开源技术。
flutter_github: 基于Flutter
的跨平台版本Github
客户端,与AwesomeGithub
相对应。
android-api-analysis: 结合详细的Demo
来全面解析Android
相关的知识点, 帮助读者能够更快的掌握与理解所阐述的要点。
daily_algorithm: 每日一算法,由浅入深,欢迎加入一起共勉。
网友评论