Kotlin 协程入门

作者: ddu_ | 来源:发表于2019-03-27 11:03 被阅读21次

    协程是什么

    从使用的角度来说,协程可以简单理解为轻量化的线程,比线程更低一级的代码执行单元。

    准备工作

    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
    //针对 android 项目
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1"
    

    为什么需要协程

    首先看一下,客户端从服务器获取数据的一般流程:

    • 从本地缓存或服务器获取 token
    • 从服务器获取数据
    • 处理数据

    理想的操作是这样的:

    fun requestToken():Token {...}
    fun createPost(token: Token, item: Item): Post {...}
    fun processPost(post: Post)
    
    fun postItem(item: Item) {
        val token = requestToken()
        val post = createPost(token, item)
        processPost(post)
    }
    

    现实是残酷的,对于 Android 平台而言,获取 token 和数据的操作是耗时的,会阻塞主线程,导致页面无响应。

    刚开始我们使用新线程 + 回调 来解决这个问题:

    //启动新线程发起数据请求,当数据获取到后调用回调函数,必要的情况下,获取到数据后可以切换到主线程
    fun requestToken(cb: (Token) -> Unit) {...}
    fun createPost(token: Token, item: Item, cb: (Post) -> Unit) {...}
    fun processPost(post: Post) {...}
    
    fun postItem(item: Item) {
        requestToken { token ->
            createPost(token, item) { post ->
                processPost(post)
            }
        }
    }
    

    问题来了,过多的回调使得代码变得冗长而难以阅读,这里使用了 lamada 表达式使得情况稍微好点,不用 lamada,再来个四五层的回调,会疯的。

    很多人都受不了了,陆续出了三个方案来解决这个问题:

    • Future
    • Promise
    • Rx

    他们本质是一样的,通过数据的封装来减少回调:

     fun requestToken():Promise<Token> {...}
     fun createPost(token: Token, item: Item): Promise<Token> {...}
     fun processPost(post: Post)
    
     fun postItem(item: Item) {
         requestToken()
            .thenCompose { token -> createPost(token, item) }
            .thenAccept {post -> processPost(post)}
     }
    
    fun requestToken(): Token { ... }
    fun createPost(token: Token, item: Item): Post { ... }
    fun processPost(post: Post) { ... }
    fun postItem(item: Item) {
        Single.fromCallable { requestToken() }
                .map { token -> createPost(token, item) }
                .subscribe(
                        { post -> processPost(post) }, // onSuccess
                        { e -> e.printStackTrace() } // onError
                )
    }
    

    问题大为改观,巴特, 为了使用这些库你需要记忆诸如 thenCompose, thenAccept 这样的操作符,不同的库,操作符可能名字还不相同。新功能的增加往往伴随操作符的增加,给使用者带来的极大的学习负担。我大概 16 年开始使用 Rxjava,Rxjava 的操作符到现在我还没彻底搞清楚。可能是我太懒了吧。

    最后我们看下,协程在这个问题的解决上究竟有多香!

    suspend fun requestToken():Token {...}
    suspend fun createPost(token: Token, item: Item): Post {...}
    suspend fun processPost(post: Post)
    
    suspend fun postItem(item: Item) {
        val token = requestToken()
        val post = createPost(token, item)
        processPost(post)
    }
    

    无限接近,最开始提出的理想方案———顺序执行。

    协程是如何做到的如此 nb 的 !?

    如何启动协程

    线程 -> 协程

    我们的代码默认执行在线程环境下,runBlocking 像一个转换器,把线程环境转换为协程环境。

    import kotlinx.coroutines.*
    
    fun main() { 
        println("Hello,") 
        runBlocking { // 线程 -> 协程    
            delay(2000L) //main 线程阻塞两秒
        } 
    }
    

    runBlocking 还可以直接启动一个 main 协程:

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        println("Hello") 
    }
    

    通过 CoroutineScope 接口的扩展方法 launch 可以启动一个协程。因为 CoroutineScope 是一个接口,不能直接调用其扩展方法,需要定义 CoroutineScope 接口的实现类才能使用。协程库提供了一个 CoroutineScope 的实现 GlobalScope。

    import kotlinx.coroutines.*
    
    fun main() {
        //启动协程,就像启动一个线程一样,在后台运行
        GlobalScope.launch { 
            delay(1000L) 
            println("World!") 
        }
        println("Hello,") 
        //主线程不 sleep,则不会打印 World,程序直接结束,不会等待协程执行完毕
        Thread.sleep(2000L) 
    }
    

    我们也可以自己写 CoroutineScope 的实现类。实际上,不需要这么麻烦,协程库给我们提供了 CoroutineScope 方法来快速实现一个 CoroutineScope 对象。

    val uiScope: CoroutineScope = CoroutineScope(Dispatchers.Main)
    val ioScope: CoroutineScope = CoroutineScope(Dispatchers.IO)
    val computeScope: CoroutineScope = CoroutineScope(Dispatchers.Default)
            
    uiScope.launch {
        delay(1000)
    }
            
    ioScope.launch { 
        delay(1000)
    }
            
    computeScope.launch { 
        delay(1000)
    }
    

    这里,Dispatchers.Main,Dispatchers.IO,Dispatchers.Default 用于指定协程运行的线程环境:

    • Dispatchers.Main: 主线程
    • Dispatchers.IO: IO 线程
    • Dispatchers.Default: 计算线程

    协程 -> 协程

    我们也可以在协程环境下启动一个新的协程

    通过 launch,async 在协程中启动协程:

    fun main() {
        runBlocking {
            val job:Job = launch {
                delay(1000)
                println("子协程1执行完毕")
            }
    
            val deferred:Deferred<Int> = async {
                delay(1200)
                println("子协程2执行完毕")
                3 //协程的返回值
            }
    
            delay(100)
            println("父线程执行完毕") 
            //父协程等待子协程结束
        }
    }
    

    launch 方法返回一个 Job 对象,async 返回一个 Deferred 对象,Deferred 是带返回值的,可以通过 deferred.await() 获取到返回值。

    另外需要注意的是,协程内部启动的协程称为子协程,父协程会等待子协程执行完后才会结束,而不会直接结束。这里可以和之前的 GlobalScope 对比一下。

    coroutineScope 也可以在协程环境下启动一个新的协程,通常用于完成并行任务。

    import kotlinx.coroutines.*
    
    fun main() = runBlocking { 
    
        coroutineScope { 
            launch {
                delay(500L) 
                println("Task from nested launch")
            }
        
            delay(100L)
            println("Task from coroutine scope") 
        }
        //coroutineScope 执行完了,才会执行后续代码
        println("Coroutine scope is over")
    }
    

    挂起 vs 阻塞 (suspend vs block)

    在线程环境下,如果一个操作是耗时的,该耗时操作执行完成后,才能执行同一线程后续代码,我们称该操作 阻塞(block) 了线程。

    在协程环境下,如果一个操作是耗时的,该耗时操作执行完成后,才能执行同一协程后续代码,我们称该操作 挂起(suspend) 了协程。

    就是多了个叫法,方便区分。

    kotlin 中有一个关键字 suspend,用于修饰方法。

    • 被 suspend 修饰的方法通常表示该方法是耗时的,会挂起协程。
    • suspend 方法执行完了,才会执行后续代码。
    • suspend 方法只能在 suspend 方法内部或协程内部调用
    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        launch { doWorld() }
        println("Hello,")
    }
    
    suspend fun doWorld() {
        delay(1000L)
        println("World!")
    }
    

    用于启动协程的 coroutineScope 就是一个 suspend 方法。所以,coroutineScope 执行完了,才会执行后续代码。

    原理

    以上就是协程最基础的部分。了解一下协程的工作原理:

    查看下源码:

    public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
    
    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job
    
    
    public fun <T> CoroutineScope.async(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> T
    ): Deferred<T>
    
    public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
    

    可以看出,启动协程时,我们传入的 block 都是 CoroutineScope 的扩展函数:

    public interface CoroutineScope {
        public val coroutineContext: CoroutineContext
    }
    

    CoroutineScope 有一个成员变量 coroutineContext,在协程中我们都可以
    访问到这个成员变量

    fun main() {
        GlobalScope.launch {
            println(coroutineContext)
        }
        Thread.sleep(1000)
    }
    

    CoroutineContext 是一个接口,功能类似于 Map,用于保存 key-value 型数据。

    public interface CoroutineContext {
        //'Map' 中的 key,E 是对于 Value 的类型
        public interface Key<E : Element>
        //'Map' 中的 value
        public interface Element : CoroutineContext {
            
            public val key: Key<*>
    
            public override operator fun <E : Element> get(key: Key<E>): E? =
                @Suppress("UNCHECKED_CAST")
                if (this.key == key) this as E else null
    
            public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
                operation(initial, this)
    
            public override fun minusKey(key: Key<*>): CoroutineContext =
                if (this.key == key) EmptyCoroutineContext else this
        }
       
        //根据 key 获取 value,重载操作符
        public operator fun <E : Element> get(key: Key<E>): E?
        //遍历 ‘map’,进行累积操作
        public fun <R> fold(initial: R, operation: (R, Element) -> R): R
        //重载操作符 ‘+’ , 用于将两个 'map' 合并
        public operator fun plus(context: CoroutineContext): CoroutineContext =
            if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
                context.fold(this) { acc, element ->
                    val removed = acc.minusKey(element.key)
                    if (removed === EmptyCoroutineContext) element else {
                        // make sure interceptor is always last in the context (and thus is fast to get when present)
                        val interceptor = removed[ContinuationInterceptor]
                        if (interceptor == null) CombinedContext(removed, element) else {
                            val left = removed.minusKey(ContinuationInterceptor)
                            if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                                CombinedContext(CombinedContext(left, element), interceptor)
                        }
                    }
                }
    
        //从 ‘map’ 中删除键值对
        public fun minusKey(key: Key<*>): CoroutineContext
    }
    

    CoroutineContext 表示协程工作的上下文。主要包含了下面几类对象:

    • ContinuationInterceptor:用于指定协程运行在哪个线程之上
    • Job:代表一个协程对象,就像 Thread 代表一个线程一样
    • CoroutineExceptionHandler:用于协程的异常处理
    • CoroutineName:协程的名字
    • CoroutineId:协程的 id

    通过重载操作符我们都可以访问到这些对象

    fun main() {
        GlobalScope.launch {
            val job = coroutineContext[Job]
            println(job)
            val  continnuation = coroutineContext[ContinuationInterceptor]
            println(continnuation)
            val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
            println(exceptionHandler)
            val name = coroutineContext[CoroutineName]
            println(name)
        }
        Thread.sleep(1000)
    }
    

    小结一下:

    每个用于启动协程的 block 都是 CoroutineScope 接口的扩展方法,都继承了 coroutineContext 成员变量。CoroutineScope 用于定义协程的作用域(scope),
    coroutineContext 表示协程工作的上下文,类似一个map,保存协程作用域中的重要对象:

    • ContinuationInterceptor:用于指定协程运行在哪个线程之上
    • Job:代表一个协程对象,就像 Thread 代表一个线程一样
    • CoroutineExceptionHandler:用于协程的异常处理
    • CoroutineName:协程的名字
    • CoroutineId:协程的 id

    从源代码可以看出 CoroutineContext 相对于 Map 的几点优势:

    • 重载了 plus 操作符,可以方便的合并两个 CoroutineContext
    • 存储的数据 Element 也实现了 CoroutineContext 接口,内部含有 Key 成员变量,简化了 CoroutineContext 中数据的添加。
    • get 方法直接返回泛型类型,无需使用强制类型装换

    我从协库里找了点代码(格式上稍作修改),来看看 CoroutineContext 到底有多 nice:

    @ExperimentalCoroutinesApi
    public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
        //合并两个 CoroutineContext,后者权重更高,就是说,如果两个 context 内部有相同的 key,取加号右边的 value
        val combined = coroutineContext + context
        val debug = if (DEBUG) {
            //CoroutineContext 与 Element 合并,加号右侧权重更高
            combined + CoroutineId(COROUTINE_ID.incrementAndGet())
        }  else {
            combined
        }
        return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) {
            //CoroutineContext 与 Element 合并,加号右侧权重更高
            debug + Dispatchers.Default
        } else {
            debug
        } 
    }
    

    最后,再来看看协程的启动的一些细节

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job
    

    通过 launch 启动的协程,其上下文对象 CoroutineContext 由父协程的 CoroutineContext 对象和 launch 方法的第一个参数共同决定。具体规则如下图所示:

    协程原理.png

    新协程context = 父协程context + 参数 context + child Job

    启动过程中,会创建一个新的 Job 对象 child Job,父协程的 job 对象是它的 parent。内部是通过 attachChild 方法来确定 job 之间的关系的。

    线程切换

    在启动一个线程时我们可以指定一个线程:

    //在 main 线程之上运行协程
    launch(Dispatchers.Main) {
           
    }
    

    也可通过 withContext 方法切换线程

    launch(Dispatchers.Main) {
        withContext(Dispatchers.IO) {
    
        }
    }
    

    用于指定线程的参数包括:

    • Dispatchers.Main:主线程
    • Dispatchers.IO:不限制数量的线程池
    • Dispatchers.Default:线程数量等于 cpu 核心数的线程池。不指定的情况下都是这个
    • Dispatchers.Unconfined:不改变线程,执行完一个 suspend 方法后,改变为 suspend 中的线程
    fun main() {
    
        GlobalScope.launch {
            launch(Dispatchers.Unconfined) {
                println("1 I'm working in thread ${Thread.currentThread().name}")
                withContext(Dispatchers.IO) {
                    println("2 I'm working in thread ${Thread.currentThread().name}")
                }
                println("3 I'm working in thread ${Thread.currentThread().name}")
            }
        }
    
        Thread.sleep(1000)
    }
    

    输出:

    1 I'm working in thread DefaultDispatcher-worker-1
    2 I'm working in thread DefaultDispatcher-worker-2
    3 I'm working in thread DefaultDispatcher-worker-2
    

    协程方法

    join

    协程的 join 方法和线程的 join 方法类似,都是让出当前执行权,让其它协程先执行。

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        //返回一个 job 对象
        val job = GlobalScope.launch { 
            delay(1000L)
            println("World!")
        }
        println("Hello,")
        job.join()  //外部也成让出执行权,执行 job 协程
        //job 协程执行完后,才会执行后续代码
        println("Hello,")
    }
    

    yiedld

    用于让出当前线程,让其它协程执行。如果协程是通过 Dispatchers.Unconfined 启动的,yiedld 方法什么都不做。

    fun main() {
        GlobalScope.launch {
            yield()
        }
    
        Thread.sleep(1000)
    }
    

    协程的退出

    可以通过协程的 cancel 方法来退出一个可退出的(cancellable)协程。

    那什么是可退出(cancellable)的协程呢?有两种情况:

    情况一:协程中耗时任务都来自协程库中的 suspend 方法。

    val job = launch {
        repeat(1000) { i ->
                println("I'm sleeping $i ...")
                //程库中的 suspend 方法
                delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion 
    println("main: Now I can quit.")
    

    协程库中的 suspend 方法在中断时都会抛出 CancellationException 异常,这个异常主要用于调试,通常无需单独处理。

    情况二:使用 isActive 决定协程是否继续执行

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
    

    协程调用 cancle 后,isActive 会被置为 false。

    有的时候,我们希望在协程被中断后做一些清理工作,可以使用 try finally

    val job = launch {
        try {
            repeat(1000) { i ->
                    println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("I'm running finally")
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
    

    finally 中是不能调用 suspend 方法,虽然在 finally 中调用 suspend 情况很少,但是还是可以通过 withContext(NonCancellable) {...} 来实现

    val job = launch {
        try {
            repeat(1000) { i ->
                    println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                println("I'm running finally")
                delay(1000L)
                println("And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
    

    异常处理

    通过 launch 启动的协程使用 CoroutineExceptionHandler 处理异常

    val handler = CoroutineExceptionHandler { _, exception -> 
            println("Caught $exception") 
    }
    val job = GlobalScope.launch(handler) {
        throw AssertionError()
    }
    
    job.join()
    

    通过 async 启动的协程使用 try catch 处理异常

    val deferred = GlobalScope.async {
        println("Throwing exception from async")
        throw ArithmeticException() // Nothing is printed, relying on user to call await
    }
    
    try {
            deferred.await()
            println("Unreached")
        } catch (e: ArithmeticException) {
            println("Caught ArithmeticException")
        }
    

    当协程抛出异常时,协程的执行会结束,同时协程会将这个异常传递给父协程,父协程的其他子协程及父协程的执行也会结束。

    val handler = CoroutineExceptionHandler { _, exception -> 
            println("Caught $exception") 
    }
    val job = GlobalScope.launch(handler) {
        launch { // the first child
            try {
                delay(Long.MAX_VALUE)
            } finally {
                withContext(NonCancellable) {
                    println("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    println("The first child finished its non cancellable block")
                }
            }
        }
        launch { // the second child
            delay(10)
            println("Second child throws an exception")
            throw ArithmeticException()
        }
    }
    job.join()
    

    但是,有一个特殊的异常 CancellationException,它只会导致抛出该异常的协程结束,不会传递给其他协程。

    有的时候,我们希望抛出异常时,可以单独结束一个协程。这时候可以使用 Supervision job

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        val supervisor = SupervisorJob()
        with(CoroutineScope(coroutineContext + supervisor)) {
            // launch the first child -- its exception is ignored for this example (don't do this in practice!)
            val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
                println("First child is failing")
                throw AssertionError("First child is cancelled")
            }
            // launch the second child
            val secondChild = launch {
                firstChild.join()
                // Cancellation of the first child is not propagated to the second child
                println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
                try {
                    delay(Long.MAX_VALUE)
                } finally {
                    // But cancellation of the supervisor is propagated
                    println("Second child is cancelled because supervisor is cancelled")
                }
            }
            // wait until the first child fails & completes
            firstChild.join()
            println("Cancelling supervisor")
            supervisor.cancel()
            secondChild.join()
        }
    }
    

    android 实用代码

    这里是协程库中注释给出的代码:

     class MyActivity : AppCompatActivity(), CoroutineScope {
          lateinit var job: Job
          override val coroutineContext: CoroutineContext
              get() = Dispatchers.Main + job
     
          override fun onCreate(savedInstanceState: Bundle?) {
              super.onCreate(savedInstanceState)
              job = Job()
          }
     
          override fun onDestroy() {
              super.onDestroy()
              job.cancel() // Cancel job on activity destroy. After destroy all children jobs will be cancelled automatically
          }
     
          /*
           * Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
           * in this method throws an exception, then all nested coroutines are cancelled.
           */
          fun loadDataFromUI() = launch { // <- extension on current activity, launched in the main thread
             val ioData = async(Dispatchers.IO) { // <- extension on launch scope, launched in IO dispatcher
                 // blocking I/O operation
             }
             // do something else concurrently with I/O
             val data = ioData.await() // wait for result of I/O
             draw(data) // can draw in the main thread
          }
      }
    

    我觉得可以稍加改进下:

    class MainScope : CoroutineScope, LifecycleObserver {
    
        private val job = SupervisorJob()
        override val coroutineContext: CoroutineContext
            get() = job + Dispatchers.Main
    
        @OnLifecycleEvent(Lifecycle.Event.ON_PAUSE)
        fun destroy() = coroutineContext.cancelChildren()
    }
    // usage
    class MainFragment : Fragment() {
        private val uiScope = MainScope()
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            lifecycle.addObserver(mainScope)
        }
    
        private fun loadData() = uiScope.launch {
            val result = withContext(bgDispatcher) {
                // your blocking call
            }
        }
    }
    

    参考资料

    相关文章

      网友评论

        本文标题:Kotlin 协程入门

        本文链接:https://www.haomeiwen.com/subject/sggavqtx.html