1.CoroutineScope
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
fun coroutineScopeTest() {
val scope = CoroutineScope(Job())
scope.launch {
}
scope.async {
}
}
async
和 launch
的扩展接收者都是CoroutineScope
,这就意味着他们等价于CoroutineScope
的成员方法,如果要调用就必须先获取到CoroutineScope
的对象。
public interface CoroutineScope {
/**
* 此作用域的上下文
* Context被作用域封装,用于实现作为作用域扩展的协程构建器
* 不建议在普通代码中访问此属性,除非访问[Job]实例以获得高级用法
*/
public val coroutineContext: CoroutineContext
}
CoroutineScope
是一个接口,这个接口所做的也只是对CoroutineContext
做了一层封装而已。CoroutineScope
最大的作用就是可以方便的批量的控制协程,例如结构化并发。
2.CoroutineScope与结构化并发
fun coroutineScopeTest() {
val scope = CoroutineScope(Job())
scope.launch {
launch {
delay(1000000L)
logX("ChildLaunch 1")
}
logX("Hello 1")
delay(1000000L)
logX("Launch 1")
}
scope.launch {
launch {
delay(1000000L)
logX("ChildLaunch 2")
}
logX("Hello 2")
delay(1000000L)
logX("Launch 2")
}
Thread.sleep(1000L)
scope.cancel()
}
//输出结果:
//================================
//Hello 2
//Thread:DefaultDispatcher-worker-2
//================================
//================================
//Hello 1
//Thread:DefaultDispatcher-worker-1
//================================
上面的代码实现了结构化,只是创建了CoroutineScope(Job())
和利用launch
启动了几个协程就实现了结构化,结构如图所示,那么它的父子结构是如何建立的?
3.父子关系是如何建立的
这里要先说明一下为什么CoroutineScope
是一个接口,可是在创建的时候却可以以构造函数的方式使用。在Kotlin中的命名规则是以【驼峰法】为主的,在特殊情况下是可以打破这个规则的,CoroutineScope
就是一个特殊的情况,它是一个顶层函数但它发挥的作用却是构造函数,同样的还有Job()
,它也是顶层函数,在Kotlin中当顶层函数被用作构造函数的时候首字母都是大写的。
再来看一下CoroutineScope
作为构造函数使用时的源码:
/**
* 创建一个[CoroutineScope],包装给定的协程[context]。
*
* 如果给定的[context]不包含[Job]元素,则创建一个默认的' Job() '。
*
* 这样,任何子协程在这个范围或[取消][协程]失败。就像在[coroutineScope]块中一样,
* 作用域本身会取消作用域的所有子作用域。
*/
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
构造函数的CoroutineScope
传入一个参数,这个参数如果包含Job
元素则直接使用,如果不包含Job
则会创建一个新的Job
,这就说明每一个coroutineScope
对象中的 Context
中必定会存在一个Job
对象。而在创建一个CoroutineScope
对象时这个Job()
是一定要传入的,因为CoroutineScope
就是通过这个Job()
对象管理协程的。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
上面的代码是launch
的源码,分析一下LazyStandaloneCoroutine
和StandaloneCoroutine
。
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
private val continuation = block.createCoroutineUnintercepted(this, this)
override fun onStart() {
continuation.startCoroutineCancellable(this)
}
}
StandaloneCoroutine
是AbstractCoroutine
子类,AbstractCoroutine
是协程的抽象类, 里面的参数initParentJob = true
表示协程创建之后需要初始化协程的父子关系。LazyStandaloneCoroutine
是StandaloneCoroutine
的子类,active=false使命它是以懒加载的方式创建协程。
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
init {
/**
* 在上下文中的父协程和当前协程之间建立父子关系
* 如果父协程已经被取消他可能导致当前协程也被取消
* 如果协程从onCancelled或者onCancelling内部操作其状态,
* 那么此时建立父子关系是危险的
*/
if (initParentJob) initParentJob(parentContext[Job])
}
}
AbstractCoroutine
是一个抽象类他继承了JobSupport
,而JobSupport
是Job
的具体实现。
在init
函数中根据initParentJob
判断是否建立父子关系,initParentJob
的默认值是true
因此if中的initParentJob()
函数是一定会执行的,这里的parentContext[Job]
取出的的Job
就是在Demo中传入的Job
。
initParentJob
是JobSupport
中的方法,因为AbstractCoroutine
继承自JobSupport
,所以进入JobSupport
分析这个方法。
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
final override val key: CoroutineContext.Key<*> get() = Job
/**
* 初始化父类的Job
* 在所有初始化之后最多调用一次
*/
protected fun initParentJob(parent: Job?) {
assert { parentHandle == null }
//①
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
//②
parent.start() // 确保父协程已经启动
@Suppress("DEPRECATION")
//③
val handle = parent.attachChild(this)
parentHandle = handle
// 检查注册的状态
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle
}
}
}
上面的源码initParentJob
中添加了三处注释,现在分别对这三处注释进行分析:
-
if (parent == null): 这里是对是否存在父
Job
的判断,如果不存在则不再进行后面的工作,也就谈不上建立父子关系了。因为在Demo中传递了Job()
因此这里的父Job
是存在的,所以代码可以继续执行。 - parent.start(): 这里确保parent对应的Job启动了;
-
parent.attachChild(this): 这里就是将子
Job
添加到父Job
中,使其成为parent的子Job
。这里其实就是建立了父子关系。
用一句话来概括这个关系就是:每一个协程都有一个Job,每一个Job又有一个父Job和多个子Job,可以看做是一个树状结构。这个关系可以用下面这张图表示:
4.结构化是如何取消的
结构化可以被创建的同时CoroutineScope还提供了可取消的函数,Demo中通过scope.cancel()
取消了协程,它的流程又是怎样的呢?先从scope.cancel
中的cancel
看起
/**
* 取消这个scope,包含当前Job和子Job
* 如果没有Job,可抛出异常IllegalStateException
*/
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
scope.cancel
又是通过job.cancel
取消的,这个cancel
具体实现是在JobSupport
中
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
...
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause ?: defaultCancellationException())
}
public open fun cancelInternal(cause: Throwable) {
cancelImpl(cause)
}
/**
* 当cancelChild被调用的时候cause是Throwable或者ParentJob
* 如果异常已经被处理则返回true,否则返回false
*/
internal fun cancelImpl(cause: Any?): Boolean {
var finalState: Any? = COMPLETING_ALREADY
if (onCancelComplete) {
// 确保它正在完成,如果返回状态是 cancelMakeCompleting 说明它已经完成
finalState = cancelMakeCompleting(cause)
if (finalState === COMPLETING_WAITING_CHILDREN) return true
}
if (finalState === COMPLETING_ALREADY) {
//转换到取消状态,当完成时调用afterCompletion
finalState = makeCancelling(cause)
}
return when {
finalState === COMPLETING_ALREADY -> true
finalState === COMPLETING_WAITING_CHILDREN -> true
finalState === TOO_LATE_TO_CANCEL -> false
else -> {
afterCompletion(finalState)
true
}
}
}
/**
* 如果没有需要协程体完成的任务返回true并立即进入完成状态等待子类完成
* 这里代表的是当前Job是否有协程体需要执行
*/
internal open val onCancelComplete: Boolean get() = false
}
job.cancel
最终调用的是JobSupport
中的cancelImpl
。这里它分为两种情况,判断依据是onCancelComplete
,代表的就是当前Job
是否有协程体需要执行,如果没有则返回true。这里的Job
是自己创建的且没有需要执行的协程代码因此返回结果是true
,所以就执行cancelMakeCompleting
表达式。
private fun cancelMakeCompleting(cause: Any?): Any? {
loopOnState { state ->
...
val finalState = tryMakeCompleting(state, proposedUpdate)
if (finalState !== COMPLETING_RETRY) return finalState
}
}
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
...
return tryMakeCompletingSlowPath(state, proposedUpdate)
}
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
//获取状态列表或提升为列表以正确操作子列表
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
...
notifyRootCause?.let { notifyCancelling(list, it) }
...
return finalizeFinishingState(finishing, proposedUpdate)
}
进入cancelMakeCompleting
后经过多次流转最终会调用tryMakeCompletingSlowPath
中的notifyCancelling
,在这个函数中才是执行子Job
和父Job
取消的最终流程
private fun notifyCancelling(list: NodeList, cause: Throwable) {
//首先取消子Job
onCancelling(cause)
//通知子Job
notifyHandlers<JobCancellingNode>(list, cause)
// 之后取消父Job
cancelParent(cause) // 试探性取消——如果没有parent也没关系
}
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
node.invoke(cause)
} catch (ex: Throwable) {
exception?.apply { addSuppressedThrowable(ex) } ?: run {
exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
}
}
}
exception?.let { handleOnCompletionException(it) }
}
notifyHandlers
中的流程就是遍历当前Job
的子Job
,并将取消的cause
传递过去,这里的invoke()
最终会调用 ChildHandleNode
的 invoke()
方法
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
...
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}
public final override fun parentCancelled(parentJob: ParentJob) {
cancelImpl(parentJob)
}
}
childJob.parentCancelled(job)
的调用最终调用的是JobSupport
中的parentCanceled()
函数,然后又回到了cancelImpl()
中,也就是 Job 取消的入口函数。这实际上就相当于在做递归调用。
子Job
取消完成后接着就是取消父Job
了,进入到cancelParent()
函数中
/**
* 取消Job时调用的方法,以便可能将取消传播到父类。
* 如果父协程负责处理异常,则返回' true ',否则返回' false '。
*/
private fun cancelParent(cause: Throwable): Boolean {
// Is scoped coroutine -- don't propagate, will be rethrown
if (isScopedCoroutine) return true
/*
* CancellationException被认为是“正常的”,当子协程产生它时父协程通常不会被取消。
* 这允许父协程取消它的子协程(通常情况下),而本身不会被取消,
* 除非子协程在其完成期间崩溃并产生其他异常。
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// 责任链模式
return parent.childCancelled(cause) || isCancellation
}
/**
* 在这个方法中,父类决定是否取消自己(例如在重大故障上)以及是否处理子类的异常。
* 如果异常被处理,则返回' true ',否则返回' false '(调用者负责处理异常)
*/
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
cancelParent
的返回结果使用了责任链模式, 如果返回【true】表示父协程处理了异常,返回【false】则表示父协程没有处理异常。
当异常是CancellationException
时如果是子协程产生的父协程不会取消,或者说父协程会忽略子协程的取消异常,如果是其他异常父协程就会响应子协程的取消了。
5.总结
- 每次创建
CoroutineScope
时都会保证CoroutineContext
中一定存在Job
元素,而CoroutineScope
就是通过Job
来管理协程的; - 每次通过
launch
、async
启动(创建)协程时都会创建AbstractCoroutine
的子类,然后通过initParentJob()
函数建立协程的父子关系。每个协程都会对应一个Job
,每个Job
都会由一个父Job
以及多个子Job
,这是一个N叉树结构。 - 因为是一个树结构因此协程的取消以及异常的传播都是按照这个结构进行传递。当取消
Job
时都会通知自己的父Job
和子Job
,取消子Job
最终是以递归的方式传递给每一个Job
。协程在向上取消父Job
时通过责任链模式一步一步的传递到最顶层的协程,同时如果子Job
产生CancellationException
异常时父Job
会将其忽略,如果是其他异常父Job
则会响应这个异常。 - 对于
CancellationException
引起的取消只会向下传递取消子协程;对于其他异常引起的取消既向上传递也向下传递,最终会使所有的协程被取消。
作者:无糖可乐爱好者
链接:https://juejin.cn/post/7175334344049819705
网友评论