Kotlin协程源码分析(一)

作者: LSteven | 来源:发表于2019-02-04 17:45 被阅读2次

    参考资料

    搞个例外,先推荐几篇很好的资料,不然理解会比较困难,不从基础讲起了。

    Kotlin Coroutine 原理解析

    Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

    深入理解 Kotlin Coroutine (一)

    CoroutineContext

    协程的上下文。可以通过context[CoroutineContext.Element]拿到当前context关联的任意element,后面要看到的拦截器也是靠interface ContinuationInterceptor : CoroutineContext.Element与上下文关联起来的

    plus

    对于context,我们会经常看到:

    Dispatchers.IO + job

    @NotNull
      public final CoroutineContext invoke(@NotNull CoroutineContext paramCoroutineContext, @NotNull CoroutineContext.Element paramElement)
      {
        Intrinsics.checkParameterIsNotNull(paramCoroutineContext, "acc");
        Intrinsics.checkParameterIsNotNull(paramElement, "element");
        CoroutineContext localCoroutineContext1 = paramCoroutineContext.minusKey(paramElement.getKey());
        if (localCoroutineContext1 == EmptyCoroutineContext.INSTANCE) {
          return (CoroutineContext)paramElement;
        }
        ContinuationInterceptor localContinuationInterceptor = (ContinuationInterceptor)localCoroutineContext1.get((CoroutineContext.Key)ContinuationInterceptor.Key);
        CombinedContext localCombinedContext;
        if (localContinuationInterceptor == null)
        {
          localCombinedContext = new CombinedContext(localCoroutineContext1, paramElement);
        }
        else
        {
          CoroutineContext localCoroutineContext2 = localCoroutineContext1.minusKey((CoroutineContext.Key)ContinuationInterceptor.Key);
          if (localCoroutineContext2 == EmptyCoroutineContext.INSTANCE) {
            localCombinedContext = new CombinedContext((CoroutineContext)paramElement, (CoroutineContext.Element)localContinuationInterceptor);
          } else {
            localCombinedContext = new CombinedContext((CoroutineContext)new CombinedContext(localCoroutineContext2, paramElement), (CoroutineContext.Element)localContinuationInterceptor);
          }
        }
        return (CoroutineContext)localCombinedContext;
      }
    
    

    Continuation

    BaseContinuationImpl

    传入一个completion作为协程结束后的操作行为。

    BaseContinuationImpl.png
    resumeWith 精华部分

    首先自己invokeSuspend(paramObject)

    如果返回IntrinsicsKt.getCOROUTINE_SUSPENDED()说明需要暂停

    public final void resumeWith(@NotNull Object paramObject)
      {
        DebugProbesKt.probeCoroutineResumed((Continuation)this);
        BaseContinuationImpl localBaseContinuationImpl1 = (BaseContinuationImpl)this;
        Continuation localContinuation;
        Object localObject2;
        for (Object localObject1 = paramObject;; localObject1 = localObject2)
        {
          BaseContinuationImpl localBaseContinuationImpl2 = localBaseContinuationImpl1;
          localContinuation = localBaseContinuationImpl2.completion;
          if (localContinuation == null) {
            Intrinsics.throwNpe();
          }
          try
          {
            Object localObject3 = localBaseContinuationImpl2.invokeSuspend(localObject1);
            if (localObject3 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
              return;
            }
            localObject2 = Result.constructor-impl(localObject3);
          }
          catch (Throwable localThrowable)
          {
            localObject2 = Result.constructor-impl(ResultKt.createFailure(localThrowable));
          }
          localBaseContinuationImpl2.releaseIntercepted();
          if (!(localContinuation instanceof BaseContinuationImpl)) {
            break;
          }
          localBaseContinuationImpl1 = (BaseContinuationImpl)localContinuation;
        }
        localContinuation.resumeWith(localObject2);
      }
    
    

    ContinuationImpl

    base的基础上加上intercepted()

    SafeContinuation

    suspend时会在外面套一层safeContinuation,判断当前是继续执行还是resume

    public SafeContinuation(@NotNull Continuation<? super T> paramContinuation)
      {
        this(paramContinuation, CoroutineSingletons.UNDECIDED);
      }
    
    
    resumeWith
     public void resumeWith(@NotNull Object paramObject)
      {
        for (;;)
        {
          Object localObject = this.result;
          if (localObject == CoroutineSingletons.UNDECIDED)
          {
            if (!RESULT.compareAndSet(this, CoroutineSingletons.UNDECIDED, paramObject)) {}
          }
          else
          {
            if (localObject != IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
              break;
            }
            if (RESULT.compareAndSet(this, IntrinsicsKt.getCOROUTINE_SUSPENDED(), CoroutineSingletons.RESUMED))
            {
              this.delegate.resumeWith(paramObject);
              return;
            }
          }
        }
        throw ((Throwable)new IllegalStateException("Already resumed"));
      }
    
    

    如果当前continuation要继续时,状态可以从:

    • CoroutineSingletons.UNDECIDED -> paramObject

    如果之前协程处于suspend状态,那就转成CoroutineSingletons.RESUMED状态,同时this.delegate.resumeWith(paramObject)

    getOrThow
    @Nullable
     public final Object getOrThrow()
     {
       Object localObject = this.result;
       if (localObject == CoroutineSingletons.UNDECIDED)
       {
         if (RESULT.compareAndSet(this, CoroutineSingletons.UNDECIDED, IntrinsicsKt.getCOROUTINE_SUSPENDED())) {
           return IntrinsicsKt.getCOROUTINE_SUSPENDED();
         }
         localObject = this.result;
       }
       if (localObject == CoroutineSingletons.RESUMED) {
         return IntrinsicsKt.getCOROUTINE_SUSPENDED();
       }
       if ((localObject instanceof Result.Failure)) {
         throw ((Result.Failure)localObject).exception;
       }
       return localObject;
     }
    
    

    如果想要拿到结果时,当前result仍为CoroutineSingletons.UNDECIDED,那就说明当前协程需要暂停,状态转为IntrinsicsKt.getCOROUTINE_SUSPENDED()

    CombinedContext

    经常会看到:

    context get() = job() + IO....
    
    

    这里+号就重载成了CombinedContext

    IntrinsicsKt__IntrinsicsJvmKt

    createCoroutineUnintercepted

    直接看简单点的吧:

    if ((paramFunction2 instanceof BaseContinuationImpl)) {
        return ((BaseContinuationImpl)paramFunction2).create(paramR, localContinuation);
      }
    
    
    private static final <T> Object startCoroutineUninterceptedOrReturn(@NotNull Function1<? super Continuation<? super T>, ? extends Object> paramFunction1, Continuation<? super T> paramContinuation)
      {
        if (paramFunction1 == null) {
          throw new TypeCastException("null cannot be cast to non-null type (kotlin.coroutines.Continuation<T>) -> kotlin.Any?");
        }
        return ((Function1)TypeIntrinsics.beforeCheckcastToFunctionOfArity(paramFunction1, 1)).invoke(paramContinuation);
      }
    
    

    其实就是function.invoke()..

    具体例子

    fun main(args: Array<String>) {
           log("before coroutine")
           //启动我们的协程
           asyncCalcMd5("test.zip") {
               log("in coroutine. Before suspend.")
               //暂停我们的线程,并开始执行一段耗时操作
               val result: String = suspendCoroutine {
                       continuation ->
                   log("in suspend block.")
                   continuation.resume(calcMd5(continuation.context[FilePath]!!.path))
                   log("after resume.")
               }
               log("in coroutine. After suspend. result = $result")
           }
           log("after coroutine")
       }
    
       /**
        * 上下文,用来存放我们需要的信息,可以灵活的自定义
        */
       class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){
           companion object Key : CoroutineContext.Key<FilePath>
       }
    
       fun asyncCalcMd5(path: String, block: suspend () -> Unit) {
           val continuation = object : Continuation<Unit> {
               override fun resumeWith(result: Result<Unit>) {
                   log("resume: $result")
               }
    
               override val context: CoroutineContext
                   get() = FilePath(path)
    
           }
           block.startCoroutine(continuation)
       }
    
       fun calcMd5(path: String): String{
           log("calc md5 for $path.")
           //暂时用这个模拟耗时
           Thread.sleep(1000)
           //假设这就是我们计算得到的 MD5 值
           return System.currentTimeMillis().toString()
       }
    
    

    这里block被转成:

    传入的continuation即为block.startCoroutine(continuation),startCoroutine会被编译器转化为startCoroutine(Function, Continutation),这里

    • Function: block
    • Continuation: val continutaion

    startCoroutine

     @SinceKotlin(version="1.3")
    public static final <R, T> void startCoroutine(@NotNull Function2<? super R, ? super Continuation<? super T>, ? extends Object> paramFunction2, R paramR, @NotNull Continuation<? super T> paramContinuation)
    {
      Intrinsics.checkParameterIsNotNull(paramFunction2, "receiver$0");
      Intrinsics.checkParameterIsNotNull(paramContinuation, "completion");
      Continuation localContinuation = IntrinsicsKt.intercepted(IntrinsicsKt.createCoroutineUnintercepted(paramFunction2, paramR, paramContinuation));
      Unit localUnit = Unit.INSTANCE;
      localContinuation.resumeWith(Result.constructor-impl(localUnit));
    }
    
    

    先创建,再resume

    创建分两步:

    IntrinsicsKt.createCoroutineUnintercepted(paramFunction2, paramR, paramContinuation)

    if ((paramFunction1 instanceof BaseContinuationImpl)) {
      return ((BaseContinuationImpl)paramFunction1).create(localContinuation);
    }
    
    

    IntrinsicsKt.intercepted

    那我们俩看block转换成了什么样子的paramFunction:

    
    final class block extends SuspendLambda
      implements Function1<Continuation<? super Unit>, Object>{
          @NotNull
      public final Continuation<Unit> create(@NotNull Continuation<?> paramContinuation)
      {
        //create出baseCoroutineImpl
        Intrinsics.checkParameterIsNotNull(paramContinuation, "completion");
        return new 1(this.this$0, paramContinuation);
      }
    
      public final Object invoke(Object paramObject)
      {
        return ((1)create((Continuation)paramObject)).invokeSuspend(Unit.INSTANCE);
      }
    
      //resumeWith以后,先走invokeSuspend
      @Nullable
      public final Object invokeSuspend(@NotNull Object paramObject)
      {
        Object localObject1 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        Object localObject2;
        switch (this.label)
        {
        default: 
          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        case 1: 
          ((1)this.L$0);
          if (!(paramObject instanceof Result.Failure)) {
            localObject2 = paramObject;
          } else {
            throw ((Result.Failure)paramObject).exception;
          }
          break;
        case 0: 
          if ((paramObject instanceof Result.Failure)) {
            break label276;
          }
          this.this$0.log("in coroutine. Before suspend.");
          this.L$0 = this;
          this.label = 1;
          SafeContinuation localSafeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(this));
          Continuation localContinuation = (Continuation)localSafeContinuation;
          this.this$0.log("in suspend block.");
          HongMoActivity localHongMoActivity1 = this.this$0;
          CoroutineContext.Element localElement = localContinuation.getContext().get((CoroutineContext.Key)HongMoActivity.FilePath.Key);
          if (localElement == null) {
            Intrinsics.throwNpe();
          }
          String str1 = localHongMoActivity1.calcMd5(((HongMoActivity.FilePath)localElement).getPath());
          localContinuation.resumeWith(Result.constructor-impl(str1));
          this.this$0.log("after resume.");
          localObject2 = localSafeContinuation.getOrThrow();
          if (localObject2 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
            DebugProbesKt.probeCoroutineSuspended(this);
          }
          if (localObject2 == localObject1) {
            return localObject1;
          }
          break;
        }
        String str2 = (String)localObject2;
        HongMoActivity localHongMoActivity2 = this.this$0;
        StringBuilder localStringBuilder = new StringBuilder();
        localStringBuilder.append("in coroutine. After suspend. result = ");
        localStringBuilder.append(str2);
        localHongMoActivity2.log(localStringBuilder.toString());
        return Unit.INSTANCE;
        label276:
        throw ((Result.Failure)paramObject).exception;
      }
    }
    
    

    转成safeContinuation后,计算得到结果resumeWith+getOrThrow然后传递给completion

    关键代码在于:

    public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
        suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
            val safe = SafeContinuation(c.intercepted())
            block(safe)
            safe.getOrThrow()
        }
    
    

    这里会转换成cps代码,

    • 使用suspendCoroutine时,先转成SafeContinuation,
    • getOrThrow如果拿到的是suspend,直接返回。
    • 返回到前面BaseContinuationImpl的循环后,因为completion不是BaseContinuationImpl,跳出,走到competion.resumeWith结束。

    异步时

    case 0: 
          if ((paramObject instanceof Result.Failure)) {
            break label228;
          }
          this.this$0.log("in coroutine. Before suspend.");
          this.L$0 = this;
          this.label = 1;
          SafeContinuation localSafeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(this));
          Continuation localContinuation = (Continuation)localSafeContinuation;
          this.this$0.log("in suspend block.");
          HongMoActivity.access$getExecutor$p(this.this$0).submit((Runnable)new HongMoActivity.main.1.invokeSuspend..inlined.suspendCoroutine.lambda.1(localContinuation, this));
          localObject2 = localSafeContinuation.getOrThrow();
          if (localObject2 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
            DebugProbesKt.probeCoroutineSuspended(this);
          }
          if (localObject2 == localObject1) {
            return localObject1;
          }
          break;
        }
    
    

    可以看到submit后,直接getOrThrow此时拿到的为suspend所以直接返回,

    runnable中:

    public final void run()
      {
        Continuation localContinuation = this.$continuation;
        HongMoActivity localHongMoActivity = this.this$0.this$0;
        CoroutineContext.Element localElement = this.$continuation.getContext().get((CoroutineContext.Key)HongMoActivity.FilePath.Key);
        if (localElement == null) {
          Intrinsics.throwNpe();
        }
        String str = localHongMoActivity.calcMd5(((HongMoActivity.FilePath)localElement).getPath());
        localContinuation.resumeWith(Result.constructor-impl(str));
        this.this$0.this$0.log("after resume.");
      }
    
    

    safeContinunation重新resumeWith会进入label = 1返回值给completion

    总结

    suspend代表挂起,即线程执行到这里时可能会直接break返回,同时会增加一个continuation代表一个继续点,这个也好理解,下次被resume时就会从这个continuation继续执行。

    待续

    • async|launch等是如何精简协程操作的
    • intercept是怎么操作的
    • combineContext到底有什么意义
    • 异步是如何拿到continuation且继续的
    • suspendCoroutine用什么进行替代

    下期继续学习上面这些疑惑,btw新年快乐啊,这篇拖了太久了打脸

    相关文章

      网友评论

        本文标题:Kotlin协程源码分析(一)

        本文链接:https://www.haomeiwen.com/subject/bmopsqtx.html