美文网首页kotlinKotlin
kotlin coroutines 协程教程(二)关键类分析

kotlin coroutines 协程教程(二)关键类分析

作者: kotlon | 来源:发表于2019-05-09 20:27 被阅读24次

    原理篇(一)关键类的分析

    上面简单的介绍了一些用法,但是具体的原理和特点,好像还不是很清楚,那么下面就来介绍一下,一些关键的类,流程和原理。

    介绍的相关的原理基于这行代码:

        fun coroTest() {
            GlobalScope.launch {
                delay(1000L)//Delays coroutine for a given time without blocking a thread and resumes it after a specified time
                Log.i(CO_TAG, "launch ")
            }
            Log.i(CO_TAG, "----")
        }
    

    然后贴上 launch() 的源码:

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    CoroutineScope

    源码分析

    首先,launch() 是 CoroutineScope 的一个扩展函数,

    CoroutineScope 简单来说,就是协程的作用范围,每一个 Coroutine Builder,例如 CoroutineScope.launch,都是 CoroutineScope 的扩展,并且继承了其 coroutineContext .

    Corotine 提供了全局的 CoroutineScope 也就是 GlobalScope,简单看下其代码:

    //CoroutineScope.kt 中
    public object GlobalScope : CoroutineScope {
        /**
         * Returns [EmptyCoroutineContext].
         */
        override val coroutineContext: CoroutineContext
            get() = EmptyCoroutineContext
    }
    
    //EmptyCorotineContext in CorotineContextImpl.kt 文件中
    public object EmptyCoroutineContext : CoroutineContext, Serializable {
        private const val serialVersionUID: Long = 0
        private fun readResolve(): Any = EmptyCoroutineContext
    
        public override fun <E : Element> get(key: Key<E>): E? = null
        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
        public override fun plus(context: CoroutineContext): CoroutineContext = context
        public override fun minusKey(key: Key<*>): CoroutineContext = this
        public override fun hashCode(): Int = 0
        public override fun toString(): String = "EmptyCoroutineContext"
    }
    

    简单来说,GlobalScope 没有绑定任何 job,它用于构建最顶级的 coroutines,这些协程的生命周期跟随这个 Application,并且在 Application 生命周期结束之前,不会被 cancel。

    关键函数分析

    CoroutinScope 主要包含了以下扩展函数:

    actor fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit): SendChannel<E>Launches new coroutine that is receiving messages from its mailbox channel and returns a reference to its mailbox channel as a SendChannel. The resulting object can be used to send messages to this coroutine.
    async fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T): Deferred<T>Creates new coroutine and returns its future result as an implementation of Deferred. The running coroutine is cancelled when the resulting deferred is cancelled.
    broadcast fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY, onCompletion: CompletionHandler? = null, block: suspend ProducerScope<E>.() -> Unit): BroadcastChannel<E>Launches new coroutine to produce a stream of values by sending them to a broadcast channel and returns a reference to the coroutine as a BroadcastChannel. The resulting object can be used to subscribe to elements produced by this coroutine.
    cancel fun CoroutineScope.cancel(): UnitCancels this scope, including its job and all its children. Throws IllegalStateExceptionif the scope does not have a job in it.
    launch fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): JobLaunches new coroutine without blocking current thread and returns a reference to the coroutine as a Job. The coroutine is cancelled when the resulting job is cancelled.
    newCoroutineContext fun CoroutineScope.newCoroutineContext( context: CoroutineContext): CoroutineContextCreates context for the new coroutine. It installs Dispatchers.Default when no other dispatcher nor ContinuationInterceptor is specified, and adds optional support for debugging facilities (when turned on).
    plus operator fun CoroutineScope.plus( context: CoroutineContext): CoroutineScopeAdds the specified coroutine context to this scope, overriding existing elements in the current scope’s context with the corresponding keys.
    produce fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, block: suspend ProducerScope<E>.() -> Unit): ReceiveChannel<E>Launches new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine.
    • actor{} 会启动一个能够接收 Message 的 Coroutine,也就是 SendChannel.你可以通过SendChannel.send() 方法给其它协程发送消息或者结果,或者通过 SendChannel.offer() 发送消息,两者区别在于 offer() 在队列满的时候,会返回失败,而 send() 则不会。对应接收消息的是 ReceiveChannel。
    • async{} 会启动一个新的协程,并且返回一个 Deferred(它是一个 future 的实现),你可以通过 Deferred.await() 异步获取结果。
    • broadcast{} 和 actor 类似,只不过,BroadcastChannel 是一种一对多的订阅关系。
    • launch{} 启动一个新的协程,并且返回一个 Job 对象,你可以通过这个 Job 取消协程
    • plus 也就是 + ,通过该操作,你可以给该 CoroutineScope 关联一个 CoroutineContext,并且如果存在同一个 key 会替换之前的 CoroutineContext。
    • newConroutinContext 会给当前 CoroutineScop 创建一个 ,如果不存在 Dispatcher 或者 ContinuationInterceptor 则会给这个 Context 关联默认的 Dispatcher.Default。
    • produce{} 启动一个新的协程,并且返回 receiveChannel。

    官方说明链接 CoroutineScope

    CoroutineContext

    协程是运行在 CoroutinesContext 的一些集合里面,根据官方文档的意思

    * Persistent context for the coroutine. It is an indexed set of [Element] instances.
    * An indexed set is a mix between a set and a map.
    * Every element in this set has a unique [Key]. Keys are compared _by reference_.
    

    也就是说,CoroutineContext 是 coroutine 的运行的 Context,它是 Element 实例的集合,这种集合介于 set 和map 之间,每一个 Element 都有一个和对象引用相关的 key,作为 Element 的唯一标志。

    简单看下其源码:

    public interface CoroutineContext {
        /**
         * Returns the element with the given [key] from this context or `null`.
         * Keys are compared _by reference_, that is to get an element from the context the reference to its actual key
         * object must be presented to this function.
         通过key 获得对应的CoroutineContext
         */
        public operator fun <E : Element> get(key: Key<E>): E?
    
        /**
         * Accumulates entries of this context starting with [initial] value and applying [operation]
         * from left to right to current accumulator value and each element of this context.
         使用 initial 作为初始值,operation 作为累加操作
         */
        public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    
        /**
         * Returns a context containing elements from this context and elements from  other [context].
         * The elements from this context with the same key as in the other one are dropped.
         */
        public operator fun plus(context: CoroutineContext): CoroutineContext =
            if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
                context.fold(this) { acc, element ->
                    val removed = acc.minusKey(element.key)
                    if (removed === EmptyCoroutineContext) element else {
                        // make sure interceptor is always last in the context (and thus is fast to get when present)
                        val interceptor = removed[ContinuationInterceptor]
                        if (interceptor == null) CombinedContext(removed, element) else {
                            val left = removed.minusKey(ContinuationInterceptor)
                            if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                                CombinedContext(CombinedContext(left, element), interceptor)
                        }
                    }
                }
    
        /**
         * Returns a context containing elements from this context, but without an element with
         * the specified [key]. Keys are compared _by reference_, that is to remove an element from the context
         * the reference to its actual key object must be presented to this function.
         */
        public fun minusKey(key: Key<*>): CoroutineContext
    
        /**
         * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
         * Keys in the context are compared _by reference_.
         */
        public interface Key<E : Element>
    
        /**
         * An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
         */
        public interface Element : CoroutineContext {
            /**
             * A key of this coroutine context element.
             */
            public val key: Key<*>
    
            public override operator fun <E : Element> get(key: Key<E>): E? =
                @Suppress("UNCHECKED_CAST")
                if (this.key == key) this as E else null
    
            public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
                operation(initial, this)
    
            public override fun minusKey(key: Key<*>): CoroutineContext =
                if (this.key == key) EmptyCoroutineContext else this
        }
    }
    

    简单介绍下这里相关的类以及函数:

    相关类

    1. Element:CoroutineContext 的一个元素,任何一个 Coroutine context 都是自己唯一的单例。
    2. Key: 作为CoroutineContext 的key,这些 Key 都是通过对象引用比较的,也就是说,同一个对象的key 才是相同的。这里也就是提供一种泛型的能力,使子类的CoroutineContext 的key,可以是任何继承自 CoroutineContext 的类实例。
    3. CoroutineContext:如之前的介绍

    相关函数

    1. public operator fun <E : Element> get(key: Key<E>): E? 根据给定的 Key 获取特定的 CoroutineContext
    2. public fun <R> fold(initial: R, operation: (R, Element) -> R): R 把传入的 initial 作为初始变量,通过传入的 operation 操作,累次操作 Context 中的 Element。
    3. public operator fun plus(context: CoroutineContext): CoroutineContext 返回一个包含了当前 Context 和传入的 CoroutineContext 包含的所有的 Element 的一个 Context。如果是两者都存在的 Element 会被删除掉。
    4. public fun minusKey(key: Key<*>): CoroutineContext 根据传入的 key 返回该Context 存在的但是不包含传入的 Keys 的或集。

    具体怎样去理解这些内容呢?我们结合 CoroutineContextImpl.kt 文件里面提供的几个 Context 来理解一下:

    例如 EmptyCoroutineContext

    public object EmptyCoroutineContext : CoroutineContext, Serializable {
        private const val serialVersionUID: Long = 0
        private fun readResolve(): Any = EmptyCoroutineContext
        //EmptyCoroutineContext 不包含一个任何一个 Element
        public override fun <E : Element> get(key: Key<E>): E? = null
        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
        public override fun plus(context: CoroutineContext): CoroutineContext = context
        public override fun minusKey(key: Key<*>): CoroutineContext = this
        public override fun hashCode(): Int = 0
        public override fun toString(): String = "EmptyCoroutineContext"
    }
    

    CoroutineDispatcher

    简单用法

    CoroutineDispatcher 每一个CoroutineContext 都包含一个 CoroutineDispatcher ,它设置了对应的协程使用一个或者多个线程,协程调度器可以将协程的执行局限在指定的线程中,调度它运行在线程池中或让它不受限的运行。

    先来看下具体怎么使用它吧,下面是一个简单的例子:

        fun dispatchTest() {
            runBlocking {
                launch {
                    doLog("in main thread")
                }
                launch(Dispatchers.Unconfined) {
                    doLog("in Unconfined thread 1")
                }
    
                launch(Dispatchers.Default) {
                    doLog(" in child thread 2")
                }
    
                launch(newSingleThreadContext("child thread3")) {
                    doLog("in new thread")
                }
    
            }
        }
    

    然后控制台输出如下:

    01-12 17:20:48.212 14843-14843/com.yy.yylite.kotlinshare I/Context: in Unconfined thread 1
    01-12 17:20:48.213 14843-14950/com.yy.yylite.kotlinshare I/Context:  in child thread 2
    01-12 17:20:48.215 14843-14843/com.yy.yylite.kotlinshare I/Context: in main thread
    01-12 17:20:48.215 14843-14953/com.yy.yylite.kotlinshare I/Context: in new thread
    

    所以我们可以指定一个协程在特定的子线程执行。

    如果直接调用 launch,不指定Dispatcher,那么它使用的是启动它的 CoroutineScope 的Context 以及 Dispatch。

    常见的 Dispatcher 如下:

    • Dispatchers.Default:默认的 Dispatcher,会在jvm 层级共享线程池,会创建等于 cpu 核数的线程数目,但是始终大于等于2,因为 cpu 是非常珍贵的资源所以使用 Default Dispatcher 是非常合理的。
    • Dispatchers.IO :用于IO 操作的Dispatcher,是按需创建的线程池。
    • Dispatcher.Main:主线程的 Dispatcher
    • Dispatcher.UnConfined:不确定的 Dispatcher,会在调用的地方创建 Coroutine,但是会在任意线程执行 Coroutine,取决于第一个挂起的协程的返回结果。

    关于 Dispatcher.UnConfined 我们怎么去理解呢?先看一个例子:

    /**
     * 非限定的 Dispatcher
     */
    fun testDispatcher() = runBlocking {
        launch(Dispatchers.Unconfined) {
            doLog("start coroutine")
            delay(500)
            doLog("after delay")
        }
    
        launch {
            doLog("in main start coroutine ")
            delay(1000)
            doLog("in main after delay")
        }
        
    }
    

    然后控制台输出结果如下:

    01-19 10:22:19.785 30584-30584/com.yy.yylite.kotlinshare I/Context: start coroutine
    01-19 10:22:19.788 30584-30584/com.yy.yylite.kotlinshare I/Context: in main start coroutine 
    01-19 10:22:20.286 30584-30723/com.yy.yylite.kotlinshare I/Context: after delay
    01-19 10:22:20.788 30584-30584/com.yy.yylite.kotlinshare I/Context: in main after delay
    

    也就是说,Unconfined 类型的 Dispatcher最终分发的线程,是不确定的。

    源码分析

    协程执行在特定的 CoroutineContext,而CoroutineContext 中总是有一个特定的 Dispatcher,负责分发协程,根据官网对 CoroutineDispatcher 的说明如下:

    Coroutine context includes a coroutine dispatcher (see [CoroutineDispatcher]) that determines what thread or threads
    the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
    to a specific thread, dispatch it to a thread pool, or let it run unconfined.

    简单来说,CoroutineDispatcher 决定了当前 Coroutine 在哪个线程或者哪几个线程中执行,可能是某个特定的线程,也可能是分发到线程池或者是 unconfined。

    继承结构

    public abstract class CoroutineDispatcher :
        AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {}
    

    也就是说 CoroutineDispatcher 既是一种 Element 也是 ContinuationInterceptor。

    其存在多个子类,如下:

    image

    其中 ExecutorCoroutineDispatcher 是基于线程池 Executor 来分发任务的,并且每一个线程池,需要Dispatcher 自己本身去关闭。

    MainCoroutinCoroutineDispatcher 是在主线程分发的特殊 Dispatcher,你可以通过 Dispatcher.Mian 获得,它是一个 Object,可以被直接访问。

    关键函数分析

    /**
     * 是否需要将协程执行分发到其它线程,如果是 true 则表示需要,false 表示不需要;一般都是返回 true
     */
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    
    /**
     * Dispatches execution of a runnable [block] onto another thread in the given [context].
       将协程的执行代码,也就是 Runnable 分发到指定 Context 的线程
     */
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    
        /**
         * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
        返回一封装了原始 continuation 的 continuation,实现可以拦截所有 Coroutin 的 resumtions
         */
        public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
            DispatchedContinuation(this, continuation)
    

    从上面的介绍,我们可以大概的知道,dispatch() 类似java Executor 里面的 void execute(Runnable command);方法,帮助执行子线程的方法。

    ExecutorCoroutineDispatcher

    ExecutorCoroutineDispatcher 是 CoroutineDispatcher 的子类,主要用于在子线程分发 Coroutine 的场景,源码比较简单,直接看吧:

    public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
        /**
         * Closes this coroutine dispatcher and shuts down its executor.
         *
         * It may throw an exception if this dispatcher is global and cannot be closed.
         */
        public abstract override fun close()
    
        /**
         * Underlying executor of current [CoroutineDispatcher].
         */
        public abstract val executor: Executor
    }
    

    这里包含了一个 Executor 线程池类的变量,然后就是定义了一个 close() 方法,用于关闭线程池。需要注意的是,该类间接继承了 CoroutineContext 类,所以存在 cancel() 以及 cancelChildren() 方法,用于取消当前 Job 或者该 Context 的Children Job。

    该类的子类是 ExecutorCoroutineDispatcherBase,看下面分析。

    ExecutorCoroutineDispatcherBase

    继承结构

    internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {}
    

    除了继承ExecutorCoroutineDispatcher 之外,还实现了Delay 接口,Delay 接口如下:

    public interface Delay {
        /**
           挂起协程,并且不会阻塞当前线程
         */
        suspend fun delay(time: Long) {
            if (time <= 0) return // don't delay
            return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) }
        }
        /**
         */
        fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>)
    
        /**
         */
        fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
            DefaultDelay.invokeOnTimeout(timeMillis, block)
    }
    

    Delay 接口,使得Dispatcher 具备任务调度的能力,例如 delay(time: Long),可以挂起协程,并且不会阻塞当前线程。

    直接看ExecutorCoroutineDispatcherBase源码:

    //internal 修饰,表示包内可见
    internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
    
        private var removesFutureOnCancellation: Boolean = false
    
        internal fun initFutureCancellation() {
            removesFutureOnCancellation = removeFutureOnCancel(executor)
        }
    
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            try {
                executor.execute(timeSource.wrapTask(block))
            } catch (e: RejectedExecutionException) {
                timeSource.unTrackTask()
                DefaultExecutor.enqueue(block)
            }
        }
    
        /*
         * removesFutureOnCancellation is required to avoid memory leak.
         * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
         * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
         */
        override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
            val future = if (removesFutureOnCancellation) {
                scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
            } else {
                null
            }
            // If everything went fine and the scheduling attempt was not rejected -- use it
            if (future != null) {
                continuation.cancelFutureOnCancellation(future)
                return
            }
            // Otherwise fallback to default executor
            DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
        }
    
        override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
            val future = if (removesFutureOnCancellation) {
                scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
            } else {
                null
            }
    
            return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
        }
    
        private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
            return try {
                (executor as? ScheduledExecutorService)?.schedule(block, time, unit)
            } catch (e: RejectedExecutionException) {
                null
            }
        }
    
        override fun close() {
            (executor as? ExecutorService)?.shutdown()
        }
    
        override fun toString(): String = executor.toString()
        override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
        override fun hashCode(): Int = System.identityHashCode(executor)
    }
    

    首先看下 dispatch() 方法:

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        try {
            executor.execute(timeSource.wrapTask(block))
        } catch (e: RejectedExecutionException) {
            timeSource.unTrackTask()
            DefaultExecutor.enqueue(block)
        }
    }
    

    直接调用了 executor.execute() 执行封装好的 Runnable,也就是之前通过 launch 或者 async 传入的代码块封装的,不过这之前会先用 timeSource.wrapTask(block),包裹一下这个 Runnable,那么在 wrapTask() 里面做了什么操作?其实,没做啥,相反,我们看到了其它一些代码:

    internal object DefaultTimeSource : TimeSource {
        override fun currentTimeMillis(): Long = System.currentTimeMillis()
        override fun nanoTime(): Long = System.nanoTime()
        override fun wrapTask(block: Runnable): Runnable = block
        override fun trackTask() {}
        override fun unTrackTask() {}
        override fun registerTimeLoopThread() {}
        override fun unregisterTimeLoopThread() {}
    
        override fun parkNanos(blocker: Any, nanos: Long) {
            LockSupport.parkNanos(blocker, nanos)
        }
    
        override fun unpark(thread: Thread) {
            LockSupport.unpark(thread)
        }
    }
    
    internal var timeSource: TimeSource = DefaultTimeSource
    

    也就是说,Coroutine 提供的阻塞功能是通过LockSupport 挂起的。

    CoroutineDispatcher 选择

    创建 Dispatcher 的时候会创建一个 CoroutineScheduler,代码如下:

    //第一次调用 Dispatcher.Default 会执行此方法
     public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    
    //接着会根据useCoroutinesScheduler 创建 DefaultScheduler 或者 CommonPool
    internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
        if (useCoroutinesScheduler) DefaultScheduler else CommonPool
    
    //接着这里创建 CoroutineScheduler,既是一个 Dispatcher 也是一个 Executor
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    

    这里存在两种 Dispatcher(也是线程池Executor),那么各自存在什么特点同时又有什么不同呢?

    CoroutineStart

    通过 launch{} 等创建一个协程的时候,你也可以传入一个 CoroutineStart 枚举值,这个枚举值参数定义了 CoroutineBuilder 的执行 Coroutine 的时机,具体的时机由以下几种:

    • DEFAULTE:会根据该 Coroutine 依赖的 Context 立刻执行该 Coroutine
    • LAZY:按需执行 Coroutine,仅仅在你调用了 Job.start() 或者 Job.await() 之后会执行
    • ATOMIC: 原子操作类型,也就是说会根据依赖的Context 执行 Coroutine,但是该 Coroutine 不可取消。对比 DEFAULT 类型,它不不可取消,不能通过 job.cancel() 去取消的。
    • UNDISPATCHED:会在当前线程第一个挂起点执行 Coroutine

    关于 Lazy 模式,简单例子如下:

    fun testLazy() {
        doLog("testLazy")
        val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
            doLog("I'm begin")
            delay(1000)
            doLog("I'm finish")
        }
    
        runBlocking {
            delay(1000)
            doLog("Job start")
            job.start()
        }
    }
    

    设置该 launch 为 CoroutineStart 为 Lazy,则 Coroutine 会在 Job.start() 之后执行。

    输入结果如下:

    01-14 12:30:51.128 22558-22558/com.yy.yylite.kotlinshare I/Context: testLazy
    01-14 12:30:52.137 22558-22558/com.yy.yylite.kotlinshare I/Context: Job start
    01-14 12:30:52.139 22558-22607/com.yy.yylite.kotlinshare I/Context: I'm begin
    01-14 12:30:53.142 22558-22609/com.yy.yylite.kotlinshare I/Context: I'm finish
    

    简单看下源码:

    //launch{} 源码中 Coroutine.launch()
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //Builder.common.kt 文件中
    private open class StandaloneCoroutine(
        parentContext: CoroutineContext,
        active: Boolean
    ) : AbstractCoroutine<Unit>(parentContext, active) {
        override val cancelsParent: Boolean get() = true
        override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
    }
    
    private class LazyStandaloneCoroutine(
        parentContext: CoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) : StandaloneCoroutine(parentContext, active = false) {
        private var block: (suspend CoroutineScope.() -> Unit)? = block
    
        override fun onStart() {
            val block = checkNotNull(this.block) { "Already started" }
            this.block = null
            block.startCoroutineCancellable(this, this)
        }
    }
    

    也就是说,默认创建的是 StandaloneCoroutine(newContext,active = true),Lazy 模式创建的是 LazyStandaloneCoroutine(newContext, block) 其 active 是 false。

    CoroutineExceptionHandler

    /**
     * An optional element in the coroutine context to handle uncaught exceptions.
     *
     * Normally, uncaught exceptions can only result from coroutines created using [launch][CoroutineScope.launch] builder.
     * A coroutine that was created using [async][CoroutineScope.async] always catches all its exceptions and represents them
     * in the resulting [Deferred] object.
     *
     * By default, when no handler is installed, uncaught exception are handled in the following way:
     * * If exception is [CancellationException] then it is ignored
     *   (because that is the supposed mechanism to cancel the running coroutine)
     * * Otherwise:
     *     * if there is a [Job] in the context, then [Job.cancel] is invoked;
     *     * Otherwise, all instances of [CoroutineExceptionHandler] found via [ServiceLoader]
     *     * and current thread's [Thread.uncaughtExceptionHandler] are invoked.
     **/
    

    默认的情况下,使用 launch{} 会默认使用一个 CoroutineExceptionHandler,但是如果是 async 的话,你会通过 Deferred 获得失败的情况。

    默认情况下,CoroutineExceptionHandler 会处理一些异常情况。

    下一章节,我会介绍一下 launch 的原理和流程

    相关文章

      网友评论

        本文标题:kotlin coroutines 协程教程(二)关键类分析

        本文链接:https://www.haomeiwen.com/subject/myggoqtx.html