使用Kotlin协程撸一个简易异步调用库

作者: 027f63d16800 | 来源:发表于2017-11-10 21:43 被阅读733次

    由于android限制了只能在UI线程更新视图,而在UI线程中做耗时任务又会导致ANR,因此在平时的开发中,需要将耗时的数据请求工作放到子线程中执行,而视图更新工作放到UI线程中,使用传统的handler或者asyncTask,需要将逻辑分到多个函数内

    使用kotlin的协程机制,可以用同步的方式实现异步
    kotlin的协程机制是基于状态机模型和C-P-S风格实现的。
    一个协程通过resume启动,当协程内部调用supended函数时,协程会被暂停,通过调用 resume可以再次启动协程。每次暂停都会修改协程的状态,再次启动协程时,会从新的状态处开始执行。

    现在通过kotlin的基础api实现一个简单的异步调用接口,最后的效果如下:

    btn.setOnClickListener {
               runOnUI {  
                   //执行在主线程,可以做一些初始化操作                         
                   Log.e("log", Thread.currentThread().name)
                   var used = async {               //从工作线程直接返回数据到主线程
                      //切换到工作线程执行,而且lambda可以直接访问外部变量,构成闭包
                       Log.e("log", Thread.currentThread().name)
                       var start = System.currentTimeMillis()
                       Thread.sleep(3000)
                       System.currentTimeMillis() - start
                   }
                   //继续执行在主线程
                   Log.e("log", Thread.currentThread().name)
                   Toast.makeText(this@MainActivity, "后台线程用时${used}ms", Toast.LENGTH_SHORT).show()
               }
           }
    

    在后续的内容中,我将在实现的过程中逐步分析kotlin协程机制的基本原理

    首先声明一个创建协程的函数:

    //该函数接收一个 suspend类型的lambda
    inline fun runOnUI(noinline block: suspend () -> Unit) {
        var continuation = object : Continuation<Unit> {
          //ThreadSwitcher是ContinuationInterceptor的子类,用于在协程resume时切换到主线程执行
            override val context: CoroutineContext
                get() = ThreadSwitcher()  
    
            override inline fun resume(value: Unit) = Unit
    
            override inline fun resumeWithException(exception: Throwable) = Unit
        }
            //使用suspend类型的lambda创建一个协程并启动
            block.createCoroutine(continuation).resume(Unit)
    }
    

    createCoroutine是官方提供的一个基础api,该函数如下:

    public fun <T> (suspend () -> T).createCoroutine(
            completion: Continuation<T>
    ): Continuation<Unit> = SafeContinuation(createCoroutineUnchecked(completion), COROUTINE_SUSPENDED)
    

    可以看到调用了createCoroutineUnchecked创建一个Coroutine,继续查看该方法:

    @SinceKotlin("1.1")
    @kotlin.jvm.JvmVersion
    public fun <T> (suspend () -> T).createCoroutineUnchecked(
            completion: Continuation<T>
    ): Continuation<Unit> =
    //这里的this是执行createCoroutine函数的block
            if (this !is kotlin.coroutines.experimental.jvm.internal.CoroutineImpl)
                buildContinuationByInvokeCall(completion) {
                    @Suppress("UNCHECKED_CAST")
                    (this as Function1<Continuation<T>, Any?>).invoke(completion)
                }
            else
    //编译时,block会被编译成一个CoroutineImpl的子类,所以走这个分支
                (this.create(completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade
    

    查看编译之后生成的block

    //查看在Activity#onCreate调用runOnUI处传入的lambda的编译类
    final class ymc/demo/com/asyncframe/MainActivity$onCreate$1$1 
              extends kotlin/coroutines/experimental/jvm/internal/CoroutineImpl   
              implements kotlin/jvm/functions/Function1  {      //lambda编译类都实现FunctionN函数
      ...
    }
    

    可以看到传入runOnUIlambda确实被编译成了一个CoroutineImpl,这是因为编译器推断出了这个lambdasuspend类型的。

    继续上面的分析,创建协程所涉及到的两个方法中都出现了 Continuation这个类,那么这个类是干嘛的呢?
    首先,先看看completion,这个是我们调用createCoroutine手动传入的,当协程结束时,他的resume会被调用,当协程异常结束时,他的resumeWithException会被调用。
    再看看createCoroutineUnchecked,这个函数也返回了一个Continuation,那么这个又是什么呢?

     (this.create(completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade
    

    可以看到,返回的是CoroutineImplfacade,那这个又是什么呢?
    我们进入CoroutineImpl,可以看到

    abstract class CoroutineImpl(
            arity: Int,
            @JvmField
            protected var completion: Continuation<Any?>?
    ) : Lambda(arity), Continuation<Any?> {     //Coroutine本身是一个Continuation
    
      override val context: CoroutineContext
              get() = _context!!
    
      private var _facade: Continuation<Any?>? = null
     
      val facade: Continuation<Any?> get() {
              if (_facade == null) _facade = interceptContinuationIfNeeded(_context!!, this)
              return _facade!!
          }
      ...
    }
    

    原来这是一个代理属性,接着查看interceptContinuationIfNeeded

    internal fun <T> interceptContinuationIfNeeded(
            context: CoroutineContext,
            continuation: Continuation<T>
    ) = context[ContinuationInterceptor]?.interceptContinuation(continuation) ?: continuation
    

    这个函数从Coroutine的上下文中查找ContinuationInterceptor,如果有就调用他的interceptContinuation对传入的continuation进行包装,否则直接返回传入的continuation

    Continuation是一个可继续执行体的抽象,每个Coroutine都是一个可继续执行体,Continuation是一个协程对外的接口,启动/恢复协程的resume就是在该接口中定义的。
    协程可以是链式连接的,一个协程可以有子协程,子协程持有父协程的引用,当子协程执行时,父协程暂停,子协程结束时,内部通过调用父协程的resume返回父协程。

    还记得我们前面用到的ThreadSwitcher吗,他就是一个ContinuationInterceptor
    我们来看看来看ThreadSwitcher的实现:

    /**
    Interceptor用于用于拦截并包装Continuation,让我们有机会在协程resume前做一些额外的操作,比如线程切换
    **/
    class ThreadSwitcher : ContinuationInterceptor, AbstractCoroutineContextElement(ContinuationInterceptor.Key) {
    
        override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
                = object : Continuation<T> by continuation {
    
            override fun resume(value: T) {
              //如果在主线程,直接执行
                if (Looper.getMainLooper() === Looper.myLooper()) {
                    continuation.resume(value)
                } else {
                //否则,使用handler机制post到主线程执行
                    postman.post {
                        resume(value)
                    }
                }
            }
    
            override fun resumeWithException(exception: Throwable) {
                if (Looper.getMainLooper() === Looper.myLooper()) {
                    continuation.resumeWithException(exception)
                } else {
                    postman.post {
                        resumeWithException(exception)
                    }
                }
            }
        }
    }
    

    从上面的分析中,我们可以想象,我们创建的协程会被ThreadSwitcher包装,

    block.createCoroutine(continuation).resume(Unit)
    

    createCoroutine返回的实际是ThreadSwitcher返回的Continuation,所以当我们执行resume启动协程时,会先切换到主线程执行。

    紧接着,我们来实现async

    suspend inline fun <T> async(crossinline block: () -> T): T
            = suspendCoroutine {
    //dispatcher是一个对线程池的封装,将任务分发到子线程中
        dispatcher.dispatch {
            it.resume(block())
        }
    }
    

    使用suspend修饰的方法只可以在协程内部调用,而suspendCoroutine方法是kotlin提供的一个基础api,用于实现暂停协程。
    我们接着来分析suspendCoroutine,查看他的实现:

    public inline suspend fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
            suspendCoroutineOrReturn { c: Continuation<T> ->
                val safe = SafeContinuation(c)
                block(safe)
                safe.getResult()
            }
    

    可以看到这个方法接收的block是带Continuation参数的
    真正实现功能的是suspendCoroutineOrReturn,当我们继续跟进时,发现:

    public inline suspend fun <T> suspendCoroutineOrReturn(crossinline block: (Continuation<T>) -> Any?): T =
            throw NotImplementedError("Implementation is intrinsic")
    

    what!直接抛出异常了???
    这是因为这是一个特殊的函数,需要编译器特殊处理,他需要将当前协程内的_facade属性,包装成SafeContinuation,再作为我们传入的block的参数,而且这个_facade是经过ContinuationInterceptor处理过的,也就是说当我们调用resume恢复线程时,会先切换到主线程。
    为了验证上面的分析,我们查看async编译之后的字节码:

    //可以看到编译之后,我们的async多了一个Continuation类型的参数
     private final static async(Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
       L0
        LINENUMBER 70 L0
        NOP
       L1
        LINENUMBER 77 L1
        ICONST_0
        INVOKESTATIC kotlin/jvm/internal/InlineMarker.mark (I)V
        ALOAD 1  //将第二个参数,也就是Continuation入栈
    //调用CoroutineIntrinsics.normalizeContinuation 
        INVOKESTATIC kotlin/coroutines/experimental/jvm/internal/CoroutineIntrinsics.normalizeContinuation (Lkotlin/coroutines/experimental/Continuation;)Lkotlin/coroutines/experimental/Continuation;  
    //将返回值存到slot3
        ASTORE 3
       L2
        LINENUMBER 78 L2
    //new 一个SafeContinuation
        NEW kotlin/coroutines/experimental/SafeContinuation
        DUP  
      //将刚刚normalizeContinuation返回的continuation传入SafeContinuation的构造函数
        ALOAD 3
        INVOKESPECIAL kotlin/coroutines/experimental/SafeContinuation.<init> (Lkotlin/coroutines/experimental/Continuation;)V
        ASTORE 4
       L3
      ...
    

    我们可以看到,编译之后的字节码已经没有了suspendCoroutinesuspendCoroutineOrReturn的身影,因为这两个函数都是inline函数。
    我们接着来看CoroutineIntrinsics.normalizeContinuation的实现:

    fun <T> normalizeContinuation(continuation: Continuation<T>): Continuation<T> =
            (continuation as? CoroutineImpl)?.facade ?: continuation
    

    还记得我们刚刚分析过facade这个属性吗?他是对_facade的代理,这个函数返回的是经过拦截器处理过的Continuation
    根据刚刚的字节码,我们可以发现suspend类型的函数,都会隐式额外接受一个当前协程的引用,但是又不能在函数中直接访问。

    最后,还有两个上文出现过的线程切换处理类,postmandispatcher,使用的是单例模式:

    object postman : Handler(Looper.getMainLooper()) {
        override fun handleMessage(msg: Message?) {
            msg?.callback?.run()
        }
    }
    
    object dispatcher {
        val mCachedThreads = Executors.newCachedThreadPool()
        inline fun dispatch(noinline block: () -> Unit) {
            mCachedThreads.execute(block)
        }
    }
    

    到此,我们实现了一个简易的异步调用库!

    相关文章

      网友评论

      • 编走编想:suspendCoroutineOrReturn 是在哪里实现的
        027f63d16800:@编走编想 有些函数是编译器直接内联进去的,源码里面看不到
      • 编走编想:写的很不错,我也在研究 Kotlin 协程,多交流
        027f63d16800:@编走编想 嗯嗯

      本文标题:使用Kotlin协程撸一个简易异步调用库

      本文链接:https://www.haomeiwen.com/subject/ckkjpxtx.html