美文网首页Kotlin 实战kotlin
Kotlin Coroutines 笔记 (二)

Kotlin Coroutines 笔记 (二)

作者: fengzhizi715 | 来源:发表于2018-07-31 01:43 被阅读245次
    躲雨的MM.jpg

    协程虽然是微线程,但是并不会和某一个特定的线程绑定,它可以在A线程中执行,并经过某一个时刻的挂起(suspend),等下次调度到恢复执行的时候,很可能会在B线程中执行。

    一. withContext

    与 launch、async、runBlocking 类似 withContext 也属于 Coroutine builders。不过与他们不同的是,其他几个都是创建一个新的协程,而 withContext 不会创建新的协程。withContext 允许更改协程的执行线程,withContext 在使用时需要传递一个 CoroutineContext 。

        launch {
    
            val result1 = withContext(CommonPool) {
    
                delay(2000)
                1
            }
    
            val  result2 = withContext(CommonPool) {
    
                delay(1000)
                2
            }
    
            val  result = result1 + result2
            println(result)
        }
    
        Thread.sleep(5000)
    

    执行结果:

    3
    

    withContext 可以有返回值,这一点类似 async。async 创建的协程通过 await() 方法将值返回。而 withContext 可以直接返回。

        launch {
    
            val result1 = async {
    
                delay(2000)
                1
            }
    
            val  result2 = async {
    
                delay(1000)
                2
            }
    
            val  result = result1.await() + result2.await()
            println(result)
        }
    
        Thread.sleep(5000)
    

    执行结果:

    3
    

    二. 共享线程池

    在上述的例子中,withContext 使用了 CommonPool。CommonPool 继承了 CoroutineDispatcher,表示使用线程池来执行协程的任务。

    CommonPool.png

    CommonPool 有点类似于 RxJava 的 Schedulers.computation(),主要是用于CPU密集型的计算任务。

    CommonPool 使用 pool 来执行 block。

        override fun dispatch(context: CoroutineContext, block: Runnable) =
            try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
            catch (e: RejectedExecutionException) {
                timeSource.unTrackTask()
                DefaultExecutor.execute(block)
            }
    

    如果 pool 为空,则调用 getOrCreatePoolSync() 方法来创建 pool。

        @Synchronized
        private fun getOrCreatePoolSync(): Executor =
            pool ?: createPool().also { pool = it }
    

    此时,createPool() 方法是正在创建 pool 的方法。

    首先,安全管理器不为空的话,使用 createPlainPool() 来创建 pool。
    否则,尝试创建一个 ForkJoinPool,不行的话还是使用 createPlainPool() 来创建 pool。

        private fun createPool(): ExecutorService {
            if (System.getSecurityManager() != null) return createPlainPool()
            val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
                ?: return createPlainPool()
            if (!usePrivatePool) {
                Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
                    ?.let { return it }
            }
            Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
                ?. let { return it }
            return createPlainPool()
        }
    

    createPlainPool() 会使用 Executors.newFixedThreadPool() 来创建线程池。

        private fun createPlainPool(): ExecutorService {
            val threadId = AtomicInteger()
            return Executors.newFixedThreadPool(parallelism) {
                Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
            }
        }
    

    CommonPool 的创建原理大致了解之后,通过源码发现 CoroutineContext 默认的 CoroutineDispatcher 就是 CommonPool。

    /**
     * This is the default [CoroutineDispatcher] that is used by all standard builders like
     * [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
     *
     * It is currently equal to [CommonPool], but the value is subject to change in the future.
     */
    @Suppress("PropertyName")
    public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool
    

    常见的 CoroutineDispatcher 还可以通过 ThreadPoolDispatcher 的 newSingleThreadContext()、newFixedThreadPoolContext()来创建,以及Executor 的扩展函数 asCoroutineDispatcher() 来创建。

    在 Android 中,还可以使用UI。它顾名思义,在 Android 主线程上调度执行。

    三. 可取消的协程

    Job、Deferred 对象都可以取消任务。

    3.1 cancel()

    使用 cancel() 方法:

        val job = launch {
            delay(1000)
            println("Hello World!")
        }
        job.cancel()
        println(job.isCancelled)
        Thread.sleep(2000)
    

    执行结果:

    true
    

    true表示job已经被取消了,并没有打印"Hello World!"

    3.2 cancelAndJoin()

    使用 cancelAndJoin() 方法:

        runBlocking<Unit> {
    
            val job = launch {
    
                repeat(100) { i ->
                    println("count time: $i")
                    delay(500)
                }
            }
            delay(2100)
            job.cancelAndJoin()
        }
    

    执行结果:

    count time: 0
    count time: 1
    count time: 2
    count time: 3
    count time: 4
    

    cancelAndJoin() 等价于使用了 cancel() 和 join()。

    join() 方法用于等待已启动协程的完成,并且它不会传播其异常。 但是,崩溃的子协程也会取消其父协程,并带有相应的异常。

    3.3 检查协程的取消标记

    如果一个协程一直在执行计算,没有去检查取消标记,它就无法取消。即使调用了cancel() 或者 cancelAndJoin()。

        runBlocking<Unit> {
    
            val startTime = System.currentTimeMillis()
            val job = launch {
                var tempTime = startTime
                var i = 0
                while (i < 100) {
    
                    if (System.currentTimeMillis() >= tempTime) {
                        println("count time: ${i++}")
                        tempTime += 500L
                    }
                }
            }
            delay(2100)
            job.cancelAndJoin()
        }
    

    上述代码仍然会打印100次。

    如果使用isActive检查取消标记,则Job 或 Deferred 的任务可以被取消:

        runBlocking<Unit> {
    
            val startTime = System.currentTimeMillis()
            val job = launch {
                var tempTime = startTime
                var i = 0
                while (isActive) {
    
                    if (System.currentTimeMillis() >= tempTime) {
                        println("count time: ${i++}")
                        tempTime += 500L
                    }
                }
            }
            delay(2100)
            job.cancelAndJoin()
        }
    

    执行结果:

    count time: 0
    count time: 1
    count time: 2
    count time: 3
    count time: 4
    

    isActive 是 CoroutineScope 的属性:

    package kotlinx.coroutines.experimental
    
    import kotlin.coroutines.experimental.*
    import kotlin.internal.*
    
    /**
     * Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient
     * and fast access to its own cancellation status via [isActive].
     */
    public interface CoroutineScope {
        /**
         * Returns `true` when this coroutine is still active (has not completed and was not cancelled yet).
         *
         * Check this property in long-running computation loops to support cancellation:
         * ```
         * while (isActive) {
         *     // do some computation
         * }
         * ```
         *
         * This property is a shortcut for `coroutineContext.isActive` in the scope when
         * [CoroutineScope] is available.
         * See [coroutineContext][kotlin.coroutines.experimental.coroutineContext],
         * [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive].
         */
        public val isActive: Boolean
    
        /**
         * Returns the context of this coroutine.
         *
         * @suppress: **Deprecated**: Replaced with top-level [kotlin.coroutines.experimental.coroutineContext].
         */
        @Deprecated("Replace with top-level coroutineContext",
            replaceWith = ReplaceWith("coroutineContext",
                imports = ["kotlin.coroutines.experimental.coroutineContext"]))
        @LowPriorityInOverloadResolution
        @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
        public val coroutineContext: CoroutineContext
    }
    

    总结:

    本文介绍了三个部分:withContext的使用,CommonPool的创建以及如何取消协程。其中,还捎带介绍了 async 和 await 的使用。

    该系列的相关文章:
    Kotlin Coroutines 笔记 (一)

    相关文章

      网友评论

        本文标题:Kotlin Coroutines 笔记 (二)

        本文链接:https://www.haomeiwen.com/subject/kbdbmftx.html