美文网首页kotlin知识
Kotlin协程系列之基础设施

Kotlin协程系列之基础设施

作者: _Jun | 来源:发表于2022-09-22 10:54 被阅读0次

    前一篇文章介绍了kotlin协程的历史及现状,接下来就介绍一下kotlin协程吧。

    kotlin协程分为标准库提供的基础设施以及官方协程库两部分,标准库的基础设施依照协程的思想提供基本的挂起、恢复接口,官方协程库基于这些接口提供丰富实用的功能。

    今天先介绍一下kotlin协程的基础设施。为了让大家对kotlin协程有更为直观的认识,先展示一个demo:

    // 代码清单2-1
    val continuation = suspend {
        println("simpleTest: In Coroutine")
        5
    }.createCoroutine(object : Continuation<Int> {
        override fun resumeWith(result: Result<Int>) {
            println("resumeWith: Continuation End: $result")
        }
    
        override val context: CoroutineContext
        get() = EmptyCoroutineContext
    
    })
    continuation.resume(Unit)
    

    demo由三个部分组成:

    • 由suspend声明的挂起函数
    • 调用createCoroutine方法创建Continuation实例
    • 通过continuation.resume启动协程

    suspend函数作为协程体,输出一条日志,并且返回5作为执行结果。协程体执行完成后,回调Continuation的resumeWith方法,其内输出协程的执行结果。

    suspend函数

    suspend函数能在不阻塞线程执行的情况下将协程挂起,适合用于将异步代码转为同步形式。在suspend函数中可以调用普通函数和suspend函数,然而在普通函数中却不能调用suspend函数(由于编译时存在CPS转换,后续讲协程实现时详细说明)。

    当suspend函数被真正挂起的时候,对应的调用处被乘坐挂起点

    suspend函数内部通过调用suspendCoroutine方法实现挂起操作的,如果没有调用suspendCoroutine,那么suspend函数并不会真正的挂起,前面的demo在实际运行时就不会挂起。

    suspendCoroutine的函数签名如下:

    // 代码清单2-2
    suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
    

    它是个内联函数,其中block是参数为Continuation的回调函数,在异步任务完成时,早block内部调用continuation.resumeWith()或者调用continuation.resume()continuation.resumeWithException()恢复被挂起的协程。

    下面看一个suspend函数的例子:

    // 代码清单2-3
    suspend fun suspendFun1() {
        println("suspendFun1")
    }
    
    suspend fun suspendFun2() = suspendCoroutine<Int> {
        thread {
            sleep(50)
            it.resume(65)
        }
    }
    
    fun sleep(time:Long) {
        try {
            Thread.sleep(time)
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }
    }
    
    suspend {
        suspendFun1()
        val res2 = suspendFun2()
        println("suspendFun2 result $res2")
        res2 + 3
    }.startCoroutine(object : Continuation<Int> {
        override val context: CoroutineContext
        get() = EmptyCoroutineContext
    
        override fun resumeWith(result: Result<Int>) {
            println("continuationInterceptorTest resume $result")
        }
    
    })
    

    suspendFun1不会真正执行挂起逻辑,而suspendFun2通过suspendCoroutine创建一个真正的挂起函数,并在内部开启子线程模拟耗时任务执行。所以在执行到suspendFun2时,当前协程先被挂起,等待模拟耗时操作完成后,通过continuation的resume方法恢复被挂起的协程。

    构建协程

    在代码清单2-1中,我们使用createCoroutinecontinuation.resume创建并执行协程,当然我们还可以使用startCoroutine直接创建并启动协程:

    // 代码清单2-4
    suspend {
        println("simpleTest: In Coroutine with start")
        6
    }.startCoroutine(object : Continuation<Int> {
        override val context: CoroutineContext
        get() = EmptyCoroutineContext
    
        override fun resumeWith(result: Result<Int>) {
            println("resumeWith: Continuation with start End: $result")
            println()
        }
    
    })
    

    可以看出startCoroutine用法和createCoroutine类似,下面就看看它们的函数签名吧:

    // 代码清单2-5
    fun <T> (suspend () -> T).createCoroutine(
        completion: Continuation<T>
    ): Continuation<Unit>
    
    fun <T> (suspend () -> T).startCoroutine(
        completion: Continuation<T>
    )
    

    startCoroutinecreateCoroutine都是作为 (suspend () -> T)函数类型的扩展函数定义的,因此在代码清单2-1和2-4中可以直接在suspend函数后直接调用,并且它们都接收一个Continuation类型实例作为回调,当协程执行完成后,会调用completion的resumeWith函数,并将协程执行结果通过result参数传递。

    标准库还定义了带Receiver的startCoroutinecreateCoroutine,用于将协程体的作用域设置成Receiver:

    // 代码清单2-6
    fun <R, T> (suspend R.() -> T).createCoroutine(
        receiver: R,
        completion: Continuation<T>
    )
    
    fun <R, T> (suspend R.() -> T).startCoroutine(
        receiver: R,
        completion: Continuation<T>
    )
    

    Continuation

    startCoroutinecreateCoroutine创建协程时都用到Continuation对象,它用来记录被挂起的协程在挂起点的状态,其内保存着挂起点之后要执行的代码。考虑如下序列生成器:

    // 代码清单2-7
    sequence {
        for (i in 1..10) yield(i * i)
        println("over")
    }  
    

    该序列生成器在for循环内调用yield生成新的序列,并将当前协程挂起,所以在yield内部一定会有Continuation记录剩余要执行的代码,并且有十个Continuation对象:第一次执行i=2和循环并挂起,第二次执行i=3并挂起,以此逻辑依次执行。当协程被创建,但还没开始运行时,即调用了createCoroutine后未调用resume,此时存在一个初始的Continuation<Unit>表示所有的协程代码。

    下面我们看看Continuation的定义:

    // 代码清单2-8
    interface Continuation<in T> {
       val context: CoroutineContext
       fun resumeWith(result: Result<T>)
    }
    

    context表示当前协程的上下文,用户可以根据需求自定义上下文,稍后会详细介绍。在前面的demo中,我们都是直接使用EmptyCoroutineContext,这是一个自带的上下文,无特殊需求时可以用它。

    resumeWith函数是协程的完成回调,不论协程执行成功或失败,都通过此函数通知用户。为了方便使用,kotlin提供了两个扩展函数:

    // 代码清单2-9
    fun <T> Continuation<T>.resume(value: T)
    fun <T> Continuation<T>.resumeWithException(exception: Throwable)
    

    resume作为成功通知,resumeWithException作为失败通知。

    上下文

    协程上下文类似Set集合,用于保存于协程关联的自定义数据:可以包含协程的线程策略、日志信息、协程安全和事务相关信息、协程id及名字等等。可以将协程当做轻量级线程,那么协程上下文就类似线程的ThreadLocal变量,不过ThreadLocal是可变的,而协程上下文是不可变的。

    协程上下文在kotlin中用CoroutineContext表示,这是一个集合,下面是其在标准库中的定义:

    // 代码清单2-10
    interface CoroutineContext {
        operator fun <E : Element> get(key: Key<E>): E?
        fun <R> fold(initial: R, operation: (R, Element) -> R): R
        operator fun plus(context: CoroutineContext): CoroutineContext
        fun minusKey(key: Key<*>): CoroutineContext
    
        interface Element : CoroutineContext {
            val key: Key<*>
        }
    
        interface Key<E : Element>
    }
    

    从定义中可以看出,CoroutineContext由Element组成,Element仅包含字段key,明显key是作为Element的索引。

    Key接口通过泛型将Key与Element关联起来,可以看做Key即为Element本身,这与Set类似。然而CoroutineContext重载了操作符[],即代码清单2-10中的get函数,它接收Key类型索引,而返回Element类型的元素,从这方面看,CoroutineContext又喝Map相似。

    foldplusminusKey属于集合操作,这里就不详细介绍了,请自行查阅文档。

    总之,CoroutineContext是一种混合了Set和Map结构的新型集合。

    EmptyCoroutineContext

    EmptyCoroutineContext 是标准库提供的一个不包含任何数据的CoroutineContext实例,在没有特殊需求时可以使用此实例,其定义如下:

    // 代码清单2-11
    public object EmptyCoroutineContext : CoroutineContext, Serializable {
        private const val serialVersionUID: Long = 0
        private fun readResolve(): Any = EmptyCoroutineContext
    
        public override fun <E : Element> get(key: Key<E>): E? = null
        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
        public override fun plus(context: CoroutineContext): CoroutineContext = context
        public override fun minusKey(key: Key<*>): CoroutineContext = this
        public override fun hashCode(): Int = 0
        public override fun toString(): String = "EmptyCoroutineContext"
    }
    

    实现

    在实现自定义上下文时,不能直接实现CoroutineContext接口,标准库提供了AbstractCoroutineContextElement,应该实现此类,其定义如下:

    // 代码清单2-12
    public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element
    

    AbstractCoroutineContextElement内部通过重写Element接口的key字段来指定具体的Element类型。

    下面看一个自定义上下文的demo:

    // 代码清单2-13
    class CoroutineName(val name: String) : AbstractCoroutineContextElement(Key) {
        companion object Key : CoroutineContext.Key<CoroutineName>
    }
    
    var coroutineContext: CoroutineContext = EmptyCoroutineContext
    coroutineContext += CoroutineName("c_test")
    
    suspend {
        println("coroutineContextTest: In Coroutine ${coroutineContext[CoroutineName]?.name} with start")
    }.startCoroutine(object : Continuation<Int> {
        override val context: CoroutineContext
        get() = coroutineContext
    
        override fun resumeWith(result: Result<Int>) {
            println("resumeWith: Continuation with start End: $result")
        }
    })
    

    CoroutineName中通过伴生对象志明Key类型,并且通过父类构造函数将其传递上去,以指定CoroutineName所对应的Element类型。

    接下去创建CoroutineName实例,并加到EmptyCoroutineContext中,最后再协程体内部通过coroutineContext以及对应的Key(即伴生对象CoroutineName),即可获的CoroutineName的实例。

    拦截器

    在android代码中,更新UI都要在主线程中执行,当发起网络请求或其他耗时操作都会切到子线程中执行,而suspend函数恢复执行依赖调用continuation.resumeWIth()所在的线程,这样就要手动切换线程,容易引发bug。

    ContinuationInterceptor提供了拦截并重新包装Continuation实例的能力,通过重新包装Continuation实例,就可以实现自定义的需求,比如每次都自动切换回主线程执行。

    ContinuationInterceptor接口定义如下:

    // 代码清单2-14
    interface ContinuationInterceptor : CoroutineContext.Element {
        companion object Key : CoroutineContext.Key<ContinuationInterceptor>
        fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
        fun releaseInterceptedContinuation(continuation: Continuation<*>)
    }
    

    通过实现interceptContinuation方法重新包装原始的continuation,以实现自定义的需求。下面我们看一个在协程恢复时添加日志的拦截器示例:

    // 代码清单2-15
    class LogInterceptor() : ContinuationInterceptor {
        override val key: CoroutineContext.Key<*> = ContinuationInterceptor
        override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
            LogContinuation(continuation)
    }
    
    class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {
        override fun resumeWith(result: Result<T>) {
            println("before resumeWith: $result")
            continuation.resumeWith(result)
            println("after resumeWith")
        }
    }
    
    fun continuationInterceptorTest() {
        suspend {
            suspendFun1()
            val res2 = suspendFun2()
            println("suspendFun2 result $res2")
            res2 + 3
        }.startCoroutine(object : Continuation<Int> {
            override val context: CoroutineContext
                get() = LogInterceptor()
    
            override fun resumeWith(result: Result<Int>) {
                println("continuationInterceptorTest resume $result")
            }
    
        })
    }
    
    suspend fun suspendFun1() {
        println("suspendFun1")
    }
    
    suspend fun suspendFun2() = suspendCoroutine<Int> {
        thread {
            sleep(50)
            it.resume(65)
        }
    }
    

    demo中首先定义了LogInterceptorLogContinuation两个类。LogContinuation接受Continuation作为构造函数参数,通过委托方式实现Continuation接口,在resumeWith中添加日志以记录协程的执行记录。

    continuationInterceptorTest中有suspendFun1suspendFun2两个挂起函数,由于suspendFun1并没有真正挂起,所以在执行suspendFun1时没有LogContinuation的日志,suspendFun2则是在调用it.resume(65)时会打印相关的日志。

    参考链接

    作者:Longlongago
    链接:https://juejin.cn/post/7139893784355012615

    相关文章

      网友评论

        本文标题:Kotlin协程系列之基础设施

        本文链接:https://www.haomeiwen.com/subject/pgjwortx.html