Kotlin协程笔记

作者: dengyin2000 | 来源:发表于2018-04-19 23:48 被阅读189次

    Kotlin语言基础笔记

    Kotlin流程控制语句笔记

    Kotlin操作符重载与中缀表示法笔记

    Kotlin扩展函数和扩展属性笔记

    Kotlin空指针安全(null-safety)笔记

    Kotlin类型系统笔记

    Kotlin面向对象编程笔记

    Kotlin委托(Delegation)笔记

    Kotlin泛型型笔记

    Kotlin函数式编程笔记

    Kotlin与Java互操作笔记

    Kotlin协程笔记

    很多小伙伴可能会觉得Java有了线程、线程池了,我们还要协程(Coroutines)干嘛。这里还是有些区别的。区别有:

    • 线程是为了提高CPU的利用率,调度是由操作系统决定的,而协程是为了解决多个任务更好的协作,调度是由我们代码控制的。
    • 协程并不是为了取代线程,协程对线程进行抽象,你可以看成协程是一个异步调用的框架,解决了之前线程间协作代码繁琐的问题。

    我们先来看一段代码,如下:

    data class Product(var id: String, var title: String)
    data class Stock(var pid: String, var stock: Int)
    data class Pms(var pid: String, var pmsTips: String)
    
    suspend fun getProductsByIds(pids: List<String>): List<Product> {
        delay(1000)
        return listOf(Product("1", "a"), Product("2", "b"))
    }
    
    suspend fun getProductStocksByIds(pids: List<String>): List<Stock> {
        delay(2000)
        return listOf(Stock("1", 2), Stock("2", 4))
    }
    
    suspend fun getProductPMSByIds(pids: List<String>): List<Pms> {
        delay(3000)
        return listOf(Pms("1", "100减99"), Pms("2", "100减99"))
    }
    
    fun combine(products: List<Product>?, productStocks: List<Stock>?, productPMS: List<Pms>?) {
        println(products)
        println(productStocks)
        println(productPMS)
    }
    
    fun main(args: Array<String>) = runBlocking<Unit> {
        val pids = listOf<String>("1", "2")
        val products = async {
            withTimeoutOrNull(1500) {
                getProductsByIds(pids)
            }
        }
        val productStocks = async {
            withTimeoutOrNull(2500) {
                getProductStocksByIds(pids)
            }
        }
        val productPMS = async {
            withTimeoutOrNull(2500) {
                getProductPMSByIds(pids)
            }
        }
    
        val measureTimeMillis = measureTimeMillis {
            combine(products.await(), productStocks.await(), productPMS.await())
        }
        println(measureTimeMillis)
    }
    

    这段代码看起来就像是伪代码,不过还是非常容易理解,就是通过一批商品id,分别调用三个接口拿到商品的信息,商品的库存,商品的优惠信息,然后再合并数据,这个场景无论在后端还是前端都会经常遇到,比如APP调用的一个接口,需要从不同的底层系统获取到不同部分的数据,然后聚合好一次性返回给APP。想想如果是用Java来实现的会有多复杂。用Kotlin的协程实现就像是写顺序执行的代码,但实际上你做的是异步调用。

    1.第一个协程代码

    fun main(args: Array<String>) {
        launch { // launch new coroutine in background and continue
            delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
            println("World!") // print after delay
        }
        println("Hello,") // main thread continues while coroutine is delayed
        Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
    }
    

    我们使用lauch来启动一个协程,其中要注意的是delay这个函数,看起来它跟Thread.sleep是一样的作用,但是他们有本质的区别,Thread.sleep会阻塞当前线程(线程就傻傻的在等待),而delay是暂停当前的协程,不会阻塞当前线程,这个线程可以去做其他事情。delay是一个suspending function,它只能运行在协程里面,如果不在协程中运行,会报以下异常。

    Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
    

    2. runBlocking

    runBlocking函数会阻塞当前线程,一直等到协程运行完。上面的例子可以改成下面的:

    fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
        launch { // launch new coroutine in background and continue
            delay(1000L)
            println("World!")
        }
        println("Hello,") // main coroutine continues here immediately
        delay(2000L)      // delaying for 2 seconds to keep JVM alive
    }
    

    3.等待协程完成

    延时一段时间来等待协程完成通常不是很高效,我们可以通过join来实现一旦协程完成就退出main函数。

    fun main(args: Array<String>) = runBlocking<Unit> {
        val job = launch { // launch new coroutine and keep a reference to its Job
            delay(1000L)
            println("World!")
        }
        println("Hello,")
        job.join() // wait until child coroutine completes
    }
    

    4. suspending function 暂停函数

    我们也可以使用suspending function重构下。

    fun main(args: Array<String>) = runBlocking<Unit> {
        val job = launch { doWorld() }
        println("Hello,")
        job.join()
    }
    
    // this is your first suspending function
    suspend fun doWorld() {
        delay(1000L)
        println("World!")
    }
    

    注意:delay也是一个suspending function,所以depay只能放在suspending function或者协程代码(lanuch)里面。

    5. 协程是非常轻量级的

    fun main(args: Array<String>) = runBlocking<Unit> {
        val jobs = List(100_000) { // launch a lot of coroutines and list their jobs
            launch {
                delay(1000L)
                print(".")
            }
        }
        jobs.forEach { it.join() } // wait for all jobs to complete
    }
    

    启动了10万个协程,最后代码能够成功的执行完成。同样,大家可以试试换成起10万个线程试试,应该会得出OOM的结果。

    6. 协程像守护线程

    请看下面这段代码:

    fun main(args: Array<String>) = runBlocking<Unit> {
        launch {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        }
        delay(1300L) // just quit after delay
    }
    

    输出如下:

    I'm sleeping 0 ...
    I'm sleeping 1 ...
    I'm sleeping 2 ...
    

    可以知道,等待1.3秒后,main退出了。不会等待launch的协程运行完。

    7. 协程取消

    launch返回一个Job对象,它可以被取消:

    fun main(args: Array<String>) = runBlocking<Unit> {
        val job = launch {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        }
        delay(1300L) // delay a bit
        println("main: I'm tired of waiting!")
        job.cancel() // cancels the job
        job.join() // waits for job's completion 
        println("main: Now I can quit.")
    }
    

    输出如下:

    I'm sleeping 0 ...
    I'm sleeping 1 ...
    I'm sleeping 2 ...
    main: I'm tired of waiting!
    main: Now I can quit.
    

    可以看到,一旦调用了job.cancel(),就退出了main函数。Job还有一个cancelAndJoin方法,合并了cancel和join操作。

    8. 协程的取消可能需要协作完成

    协程的取消可能需要协作完成,所有在kotlinx.coroutines包下面的suspending functions都可以被取消,但是如果一个协程处在计算中,他是不能被取消的,比如这个例子:

    fun main(args: Array<String>) = runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        val job = launch {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5) { // computation loop, just wastes CPU
                // print a message twice a second
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }
        delay(1300L) // delay a bit
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // cancels the job and waits for its completion
        println("main: Now I can quit.")
    }
    

    你可以看到调用取消后,还在打印。

    9. 让处于计算中的协程可取消

    有两种方式可以做到:

    • 最简单的在while循环最后面调用下yield函数。这样就在每次循环后让协程有了被取消的机会,yield是kotlinx.coroutines包下的suspending functions。
    • 检查协程取消的状态,如果发现被取消,则退出循环。
      下面我们以第二种方式演示下:
    fun main(args: Array<String>) = runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        val job = launch {
            var nextPrintTime = startTime
            var i = 0
            while (isActive) { // cancellable computation loop
                // print a message twice a second
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }
        delay(1300L) // delay a bit
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // cancels the job and waits for its completion
        println("main: Now I can quit.")
    

    isActive是协程的CoroutineScope的一个属性。

    10. 协程中try catch finally

    当协程被取消时,catch和finally可以被执行。

    fun main(args: Array<String>) = runBlocking<Unit> {
        val job = launch {
            try {
                repeat(1000) { i ->
                    println("I'm sleeping $i ...")
                    delay(500L)
                }
            }catch (e:Throwable){
                println("I'm running catch")
            } finally {
                println("I'm running finally")
            }
        }
        delay(1300L) // delay a bit
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // cancels the job and waits for its completion
        println("main: Now I can quit.")
    }
    

    输出:

    I'm sleeping 0 ...
    I'm sleeping 1 ...
    I'm sleeping 2 ...
    main: I'm tired of waiting!
    I'm running catch
    I'm running finally
    main: Now I can quit.
    

    11. withContext函数

    在上个例子中,如果我们在finally块中调用suspending functions的话,会抛出CancellationException,因为协程已经被取消了。不过一般来说没什么太大问题,只要不调用suspending functions。如果你一定要在调用的话,你可以使用withContext(NonCancellable) {...}。如下:

    fun main(args: Array<String>) = runBlocking<Unit> {
        val job = launch {
            try {
                repeat(1000) { i ->
                    println("I'm sleeping $i ...")
                    delay(500L)
                }
            } finally {
                withContext(NonCancellable) {
                    println("I'm running finally")
                    delay(1000L)
                    println("And I've just delayed for 1 sec because I'm non-cancellable")
                }
            }
        }
        delay(1300L) // delay a bit
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // cancels the job and waits for its completion
        println("main: Now I can quit.")
    }
    

    12. Timeout超时

    如果要设定协程调用超时时间,我们可以使用withTimeout函数,如下:

    fun main(args: Array<String>) = runBlocking<Unit> {
        withTimeout(1300L) {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        }
    }
    

    输出如下:

    I'm sleeping 0 ...
    I'm sleeping 1 ...
    I'm sleeping 2 ...
    Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
    

    如果超时的时候你不想抛出异常,你可以使用withTimeoutOrNull函数,超时的时候它会返回null。

    fun main(args: Array<String>) = runBlocking<Unit> {
        val result = withTimeoutOrNull(1300L) {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
            "Done" // will get cancelled before it produces this result
        }
        println("Result is $result")
    }
    

    输出如下:

    I'm sleeping 0 ...
    I'm sleeping 1 ...
    I'm sleeping 2 ...
    Result is null
    

    13. 使用async并发调用

    asynclaunch类似,它也是启动一个协程,只不过lauch返回的是Job(没有返回值),而async返回的是Deferred(带返回值),你可以使用.await()来获取Deferred的值。Deferred是Job的子类,所以Deferred也可以被取消。看看下面这段代码:

    suspend fun doSomethingUsefulOne(): Int {
        delay(1000L) // pretend we are doing something useful here
        return 13
    }
    
    suspend fun doSomethingUsefulTwo(): Int {
        delay(1000L) // pretend we are doing something useful here, too
        return 29
    }
    
    fun main(args: Array<String>) = runBlocking<Unit> {
        val time = measureTimeMillis {
            val one = async { doSomethingUsefulOne() }
            val two = async { doSomethingUsefulTwo() }
            println("The answer is ${one.await() + two.await()}")
        }
        println("Completed in $time ms")
    }
    

    输出如下:

    The answer is 42
    Completed in 1016 ms
    

    因为是并行调用,所以时间差不多是1秒。

    14. async延时调用

    fun main(args: Array<String>) = runBlocking<Unit> {
        val time = measureTimeMillis {
            val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
            val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
            println("The answer is ${one.await() + two.await()}")
        }
        println("Completed in $time ms")
    }
    

    如果async带上了start = CoroutineStart.LAZY参数,协程不会立即执行,会等到调用await的时候才开始执行。上面代码输出如下:

    The answer is 42
    Completed in 2017 ms
    

    执行结果看起来变成了顺序执行,那是因为one.await执行完成之后,才会开始调用two.await()执行。所以变成了顺序执行。

    15. Async-style functions

    // The result type of somethingUsefulOneAsync is Deferred<Int>
    fun somethingUsefulOneAsync() = async {
        doSomethingUsefulOne()
    }
    
    // The result type of somethingUsefulTwoAsync is Deferred<Int>
    fun somethingUsefulTwoAsync() = async {
        doSomethingUsefulTwo()
    }
    

    上面两个方法xxxAsync并不是suspending functions,所以他们可以在任何地方调用。

    // note, that we don't have `runBlocking` to the right of `main` in this example
    fun main(args: Array<String>) {
        val time = measureTimeMillis {
            // we can initiate async actions outside of a coroutine
            val one = somethingUsefulOneAsync()
            val two = somethingUsefulTwoAsync()
            // but waiting for a result must involve either suspending or blocking.
            // here we use `runBlocking { ... }` to block the main thread while waiting for the result
            runBlocking {
                println("The answer is ${one.await() + two.await()}")
            }
        }
        println("Completed in $time ms")
    }
    

    16. Dispatchers and threads

    launchasync都接收一个可选的CoroutineContext参数可以用来指定CoroutineDispatcher。如下:

    fun main(args: Array<String>) = runBlocking<Unit> {
        val jobs = arrayListOf<Job>()
        jobs += launch(Unconfined) { // not confined -- will work with main thread
            println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        }
        jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
            println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
        }
        jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
            println("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
        }
        jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
            println("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
        }
        jobs.forEach { it.join() }
    }
    

    输出如下:

          'Unconfined': I'm working in thread main
          'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
              'newSTC': I'm working in thread MyOwnThread
    'coroutineContext': I'm working in thread main
    

    默认的dispatcher是DefaultDispatcher当前的实现是CommonPool

    17. Unconfined vs confined dispatcher

    Unconfined dispatcher会在当前线程开始执行协程,但是仅仅是在第一个暂停点,之后它恢复后的dispatcher取决于那个线程执行suspending function。

    coroutineContextCoroutineScope的一个属性,它的dispatcher会继承它parent线程的dispatcher。 代码如下:

    fun main(args: Array<String>) = runBlocking<Unit> {
        val jobs = arrayListOf<Job>()
        jobs += launch(Unconfined) { // not confined -- will work with main thread
            println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
            delay(500)
            println("      'Unconfined': After delay in thread ${Thread.currentThread().name}")
        }
        jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
            println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
            delay(1000)
            println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
        }
        jobs.forEach { it.join() }
    }
    

    输出如下:

          'Unconfined': I'm working in thread main
    'coroutineContext': I'm working in thread main
          'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
    'coroutineContext': After delay in thread main
    

    所以,coroutineContext继承了runBlocking的main线程,而unconfined恢复后变成了default executor线程。

    18. 线程切换

    加上-Dkotlinx.coroutines.debugJVM参数运行下面的代码:

    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
    
    fun main(args: Array<String>) {
        newSingleThreadContext("Ctx1").use { ctx1 ->
            newSingleThreadContext("Ctx2").use { ctx2 ->
                runBlocking(ctx1) {
                    log("Started in ctx1")
                    withContext(ctx2) {
                        log("Working in ctx2")
                    }
                    log("Back to ctx1")
                }
            }
        }
    }
    

    这里展示了几个用法:一个是使用runBlocking指明一个特殊的Context,另外一个是使用withContext来切换Context,输出如下:

    [Ctx1 @coroutine#1] Started in ctx1
    [Ctx2 @coroutine#1] Working in ctx2
    [Ctx1 @coroutine#1] Back to ctx1
    

    还有就是run来释放线程。

    19. 通过Context来获取Job

    协程的Job是Context的一个属性,如下:

    fun main(args: Array<String>) = runBlocking<Unit> {
        println("My job is ${coroutineContext[Job]}")
    }
    

    19. 子协程

    在协程中使用coroutineContext来启动另一个协程,新协程的Job变成了父协程的子Job,当父协程取消时,子协程也会被取消。

    fun main(args: Array<String>) = runBlocking<Unit> {
        // launch a coroutine to process some kind of incoming request
        val request = launch {
            // it spawns two other jobs, one with its separate context
            val job1 = launch {
                println("job1: I have my own context and execute independently!")
                delay(1000)
                println("job1: I am not affected by cancellation of the request")
            }
            // and the other inherits the parent context
            val job2 = launch(coroutineContext) {
                delay(100)
                println("job2: I am a child of the request coroutine")
                delay(1000)
                println("job2: I will not execute this line if my parent request is cancelled")
            }
            // request completes when both its sub-jobs complete:
            job1.join()
            job2.join()
        }
        delay(500)
        request.cancel() // cancel processing of the request
        delay(1000) // delay a second to see what happens
        println("main: Who has survived request cancellation?")
    }
    

    输出结果如下:

    job1: I have my own context and execute independently!
    job2: I am a child of the request coroutine
    job1: I am not affected by cancellation of the request
    main: Who has survived request cancellation?
    

    20. Context联合

    协程Context可以使用+联合,如下:

    fun main(args: Array<String>) = runBlocking<Unit> {
        // start a coroutine to process some kind of incoming request
        val request = launch(coroutineContext) { // use the context of `runBlocking`
            // spawns CPU-intensive child job in CommonPool !!! 
            val job = launch(coroutineContext + CommonPool) {
                println("job: I am a child of the request coroutine, but with a different dispatcher")
                delay(1000)
                println("job: I will not execute this line if my parent request is cancelled")
            }
            job.join() // request completes when its sub-job completes
        }
        delay(500)
        request.cancel() // cancel processing of the request
        delay(1000) // delay a second to see what happens
        println("main: Who has survived request cancellation?")
    }
    

    job是request的子协程,但是是在CommonPool的线程中执行操作。所以取消request,job也会取消。

    21. 父协程会等待子协程完成

    父协程会等待子协程完成,不需要使用join来等待他们完成。

    fun main(args: Array<String>) = runBlocking<Unit> {
        // launch a coroutine to process some kind of incoming request
        val request = launch {
            repeat(3) { i -> // launch a few children jobs
                launch(coroutineContext)  {
                    delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
                    println("Coroutine $i is done")
                }
            }
            println("request: I'm done and I don't explicitly join my children that are still active")
        }
        request.join() // wait for completion of the request, including all its children
        println("Now processing of the request is complete")
    }
    

    输出如下:

    request: I'm done and I don't explicitly join my children that are still active
    Coroutine 0 is done
    Coroutine 1 is done
    Coroutine 2 is done
    Now processing of the request is complete
    

    22. Tricks

    假如我们现在在写一个anroid app,在activity中启动了很多协程异步调用接口获取数据,当这个activity被destory后,所有的协程需要被取消,要不然就可能会发生内存泄漏。
    我们可以创建一个Job实例,然后使用launch(coroutineContext, parent = job)来明确指定parent job。
    这样的话,我们可以调用Job.cancel来取消所有的子协程,而Job.join可以等待所有的子协程完成。如下:

    fun main(args: Array<String>) = runBlocking<Unit> {
        val job = Job() // create a job object to manage our lifecycle
        // now launch ten coroutines for a demo, each working for a different time
        val coroutines = List(10) { i ->
            // they are all children of our job object
            launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
        println("Launched ${coroutines.size} coroutines")
        delay(500L) // delay for half a second
        println("Cancelling the job!")
        job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
    }
    

    输出如下:

    Launched 10 coroutines
    Coroutine 0 is done
    Coroutine 1 is done
    Cancelling the job!
    

    23. channel, select, actor

    请看:https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md

    相关文章

      网友评论

        本文标题:Kotlin协程笔记

        本文链接:https://www.haomeiwen.com/subject/akpwkftx.html