前段时间面试聊到了协程 , 但自己又很久没去阅读协程相关的源码 ,所以回答的并不是很好。
过程:
面试官 : 项目中怎么处理多线程相关的
xxx: 使用Kolint协程来处理的
面试官:那你说说协程是怎么使用的
xxx : Activity 和 ViewModel 都有协程域,直接调用launch方法使用就行
面试官 :那在Application 或者 service 中如何使用呢
xxx:可以使用GlobalScope
面试官 : 那你说说GlobalScope 有什么注意事项?
xxx : emm。。。
想要回答好这个问题,那肯定是对协程必须要知根知底。 正好这几天看了些相关文章,所以想写篇文章总结一下,希望能帮到大家。
协程是什么?
用轻量级线程来回答其实并不准确,因为协程并不是继承自线程,而是运行在线程之上的。每一个协程都实现了Continuation接口 , 这个接口里面有个resumeWith 方法和context (这个context不是android 中activity继承那个context)。Continuation有很多子类,最低级的子类是SuspendLambda。协程域里 launch/async/withContext等 里面的代码经过编译器编译之后都存在SuspendLambda中。说了这么多,所以协程也可以理解成若干个Continuation协作构成的程序。
协程方法必须使用suspend 标记,suspend标记的方法经过kt编译器CPS转换后,会在方法末尾添加一个参数Continuation,和将方法的返回值修改为Object 。
SuspendLambda
SuspendLambda会实现Function2接口, 因为 suspend CoroutineScope.() -> Unit 经过kt编译器在java对应着的就是Function2<CoroutineScope, Continuation<? super Unit>, Object> ,所以CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
)方法最后的那个传入block,在java就是 MainActivityonCreateonCreateonCreate1,如下:
final class MainActivity$onCreate$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
MainActivity$onCreate$1(Continuation<? super MainActivity$onCreate$1> continuation) {
super(2, continuation);
}
public final Continuation<Unit> create(Object obj, Continuation<?> continuation) {
return new MainActivity$onCreate$1<>(continuation);
}
public final Object invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) {
return ((MainActivity$onCreate$1) create(coroutineScope, continuation)).invokeSuspend(Unit.INSTANCE);
}
public final Object invokeSuspend(Object $result) {
MainActivity$onCreate$1 mainActivity$onCreate$1;
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
mainActivity$onCreate$1 = this;
long r2 = LiveLiterals$MainActivityKt.INSTANCE.m135xec86fb79();
Continuation continuation = mainActivity$onCreate$1;
mainActivity$onCreate$1.label = 1;
if (DelayKt.delay(r2, continuation) == coroutine_suspended) {
return coroutine_suspended;
}
break;
case 1:
mainActivity$onCreate$1 = this;
ResultKt.throwOnFailure($result);
break;
case 2:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
System.out.println(LiveLiterals$MainActivityKt.INSTANCE.m139x894324b7() + Thread.currentThread().getName());
long r22 = LiveLiterals$MainActivityKt.INSTANCE.m136x7689ba5d();
Continuation continuation2 = mainActivity$onCreate$1;
mainActivity$onCreate$1.label = 2;
if (DelayKt.delay(r22, continuation2) == coroutine_suspended) {
return coroutine_suspended;
}
MainActivity$onCreate$1 mainActivity$onCreate$12 = mainActivity$onCreate$1;
return Unit.INSTANCE;
}
}
如果需要切换线程则使用DispatchedContinuation , 这个也是Continuation的子类,而且这个类是继承自Runnable的,如果需要切换线程则把SuspendLambda包装在这个类里面。
如果在当前线程执行则直接调用SuspendLambda#resumeWith方法就行了,resumeWith方法实现在BaseContinuationImpl中。好了暂时先粗略了解这两个子类,关于协程是什么就先说到这里了。
协程作用域
GlobalScope是不支持cancel的,但是GlobalScope.launch会返回一个job,这个job是支持取消的。因为GlobalScope 的EmptyCoroutineContext里是没有Job的。所以更推荐使用 CoroutineScope(Dispatchers.Default) ,这个会在context上加上Job 。
MainScope 是在主线程使用的协程作用域,因此在这个域里不能执行耗时操作的,如果要执行耗时操作必须要启动子协程并且指定调度器。
协程的启动
协程必须在协程域里面启动,有四种启动方式DEFAULT 、 ATOMIC、UNDISPATCHED、LAZY ,DEFAULT 启动 是支持取消的。协程启动之前先要经过协程调度器去调度到对应的线程,之后才执行协程体内的代码。
launch启动协程不是lazy情况,每次都会新建StandaloneCoroutine,StandaloneCoroutine继承AbstractCoroutine,AbstractCoroutine继承自JobSupport和实现Continuation,这个可以理解为顶级协程,这个协程支持cancel或者其他job支持的操作。
withContext 启动协程,会挂起当前协程直到获取到返回值,才恢复当前协程执行。
async 启动协程不会挂起当前协程 ,会返回一个Deferred,调用Deferred#await方法如果返回值还没准备好会挂起当前协程。
所以总结下: 这么多启动子协程无非就两种方式,一种挂起当前协程启动,另一种是不挂起当前协程启动。
协程调度器
DEFAULT 调度器 ,通过CoroutineScope.launch启动的时候会先构建出协程上下文,调度器为 Dispatchers.Default 即默认调度器 ,Dispatchers.Default 是一个单例 ,里面的线程数量和当前手机的cpu核数相等。如果是双核的话,调度器为默认调度器的情况,协程里面的代码只能在两个线程跑(不信可以通过Thread.sleep去测试),所以请求网络只用这个调度器肯定不行,两个线程不够跑。
IO调度器,里面最少有64个线程,网络请求、IO操作都可以使用这个调度器,并且这个调度器也会用到默认调度器中的线程(资源利用最大化)。
调度器中的Worker数量即线程数量,每个Worker有它自己的本地队列,这个队列是一个生产者消费者队列,最大的缓冲阈值为128。
协程执行流程
回到最开始那个代码片段,无论通过什么scope.launch启动协程 ,其实都是调用CoroutineScope的扩展方法launch。通过withContext开始协程会调用到suspendCoroutineUninterceptedOrReturn会挂起当前协程。
以哪种方式启动协程最终都会执行代码(以默认启动为例), block.startCoroutineCancellable(completion) ,这个又会执行createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
。 createCoroutineUnintercepted 这个会执行上面那个代码的create方法获取到MainActivityonCreateonCreateonCreate1,completion就是顶级协程,在当前协程执行后,即invokeSuspend方法执行完,会调用顶级协程的resumeWith方法。顶级协程的invokeSuspend方法执行完当前协程域的所有协程就结束了。
intercepted()方法如果需要调度线程则会将协程包装成DispatchedContinuation,所有前提都准备好了会调用当前协程的resumeWith方法。
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// 调用父协程的resumeWith的方法
completion.resumeWith(outcome)
return
}
}
}
}
//要执行的协程代码体
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
resumeWith里面有个死循环,执行完invokeSuspend 方法,返回为COROUTINE_SUSPENDED则需要挂起,挂起则直接调用return 退出当前方法,所以协程挂起也没多少神秘就是return结束当前方法去执行子协程,并把当前协程传给子协程,子协程resumeWith方法中,因为是while(true),在执行完自身invokeSuspend方法后把 current = completion ,又恢复到当前协程执行当前协程的invokeSuspend方法。completion 如果不是BaseContinuationImpl则是顶级协程,顶级协程继承自AbstractCoroutine,所有子协程都是继承自SuspendLambda, SuspendLambda又是继承自BaseContinuationImpl。所以调用完顶级协程的completion.resumeWith(outcome),return当前协程域的协程就执行完了。
总结一下:resumeWith这个死循环要跳出只能是挂起当前协程或者是执行完顶级协程的resumeWith()方法。
再来说一下其他的几个协程中常用的api
delay 方法
这个方法就是用来挂起当前的协程的,并且支持取消挂起。但是delay方法挂起并不会阻塞主线程,因为这个内部通过另开一个线程配合DelayedTaskQueue队列来实现的,并不会影响主线程。
delay内部也是通过suspendCancellableCoroutine实现。
suspendCancellableCoroutine、suspendCoroutine
这两个方法会挂起当前协程,去执行耗时操作,当耗时操作执行完恢复当前协程执行的时候就可以获取到suspendCancellableCoroutine、suspendCoroutine的返回值,所以一般用于和其他库做适配比如retrofit,注意这两个方法内部并不会开启子协程 。
retrofit 中使用如下
@JvmName("awaitNullable")
suspend fun <T : Any> Call<T?>.await(): T? {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T?> {
override fun onResponse(call: Call<T?>, response: Response<T?>) {
if (response.isSuccessful) {
continuation.resume(response.body())
} else {
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<T?>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
enqueue 是异步方法,这里把异步处理完请求后通过continuation.resume 系列方法回到当前协程,执行当前协程的invokeSuspend方法。
GlobalScope正确使用
如果在很多处通过GlobalScope.launch启动协程,这样会造成协程非常难管理,因为不能通过顶级域GlobalScope去取消协程,而且这种方式启动的生命周期跟随应用的生命周期,非常容易造成内存泄漏。
如果真要使用GlobalScope的话,可以把GlobalScope.launch启动协程的返回值job都保存在map中,自己管理这些job的状态,在协程需要取消的时候从map移除job并调用其cancel方法。
网友评论