协程基础
- 轻量级线程。在一个线程中可以启动多个协程。
- 在协程中使用同步方式写出异步代码(协程挂起时不会阻塞线程),解决回调地狱。
GlobalScope.launch(Dispatchers.Main) {//开始协程:主线程
val result = userApi.getUserSuspend("suming")//网络请求(IO 线程)
tv_name.text = result?.name //更新 UI(主线程)
}
在主线程中创建协程A中执行整个业务流程,如果遇到异步调用任务则协程A被挂起,切换到IO线程中创建子协程B,获取结果后再恢复到主线程的协程A上,然后继续执行剩下的流程。
xxxScope.launch()、runBlocking:T与async
- runBlocking:T:顶层函数,创建一个新的协程同时阻塞当前线程,直到其内部所有逻辑以及子协程所有逻辑全部执行完成,返回值是泛型T,一般在项目中不会使用,主要是为main函数和测试设计的。
- launch:创建一个新的协程,不会阻塞当前线程,必须在协程作用域中才可以调用。它返回的是一个该协程任务的引用,即Job对象。这是最常用的用于启动协程的方式。
- async:创建一个新的协程,不会阻塞当前线程,必须在协程作用域中才可以调用。并返回Deffer对象,可通过调用Deffer.await()方法等待该子协程执行完成并获取结果。常用于并发执行-同步等待和获取返回值的情况。
# Builders.common.kt
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
Job与Deffered
- Job与Deffered的api设计类似于Thread
- Job实例作为协程的唯一标识,用于处理协程,并且负责管理协程的生命周期
public interface Job : CoroutineContext.Element {
//活跃的,是否仍在执行
public val isActive: Boolean
//启动协程,如果启动了协程,则为true;如果协程已经启动或完成,则为false
public fun start(): Boolean
//取消Job,可通过传入Exception说明具体原因
public fun cancel(cause: CancellationException? = null)
//挂起协程直到此Job完成
public suspend fun join()
//取消任务并等待任务完成,结合了[cancel]和[join]的调用
public suspend fun Job.cancelAndJoin()
//给Job设置一个完成通知,当Job执行完成的时候会同步执行这个函数
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
}
Job 还可以有层级关系,一个Job可以包含多个子Job,当父Job被取消后,所有的子Job也会被自动取消;当子Job被取消或者出现异常后父Job也会被取消。具有多个子 Job 的父Job 会等待所有子Job完成(或者取消)后,自己才会执行完成。
public interface Deferred<out T> : Job {
//等待协程执行完成并获取结果
public suspend fun await(): T
}
协程作用域
- runBlocking:顶层函数,它的第二个参数为接收者是CoroutineScope的函数字面量,可启动协程。但是它会阻塞当前线程,主要用于测试。
- GlobalScope:全局协程作用域,通过GlobalScope创建的协程不会有父协程,可以把它称为根协程。它启动的协程的生命周期只受整个应用程序的生命周期的限制,且不能取消,在运行时会消耗一些内存资源,这可能会导致内存泄露,所以仍不适用于业务开发。
- coroutineScope:创建一个独立的协程作用域,直到所有启动的协程都完成后才结束自身。它是一个挂起函数,需要运行在协程内或挂起函数内。当这个作用域中的任何一个子协程失败时,这个作用域失败,所有其他的子程序都被取消。为并行分解工作而设计的。
- supervisorScope:与coroutineScope类似,不同的是子协程的异常不会影响父协程,也不会影响其他子协程。(作用域本身的失败(在block或取消中抛出异常)会导致作用域及其所有子协程失败,但不会取消父协程。)
- MainScope:为UI组件创建主作用域。一个顶层函数,上下文是SupervisorJob() + Dispatchers.Main,说明它是一个在主线程执行的协程作用域,通过cancel对协程进行取消。推荐使用。
- lifecycleScope:Lifecycle Ktx库提供的具有生命周期感知的协程作用域,与Lifecycle绑定生命周期,生命周期被销毁时,此作用域将被取消。会与当前的UI组件绑定生命周期,界面销毁时该协程作用域将被取消,不会造成协程泄漏,推荐使用。
- viewModelScope:与lifecycleScope类似,与ViewModel绑定生命周期,当ViewModel被清除时,这个作用域将被取消。推荐使用。
fun launchTest2() {
print("start")
//开启一个IO模式的协程,通过协程上下文创建一个CoroutineScope对象,需要一个类型为CoroutineContext的参数
val job = CoroutineScope(Dispatchers.IO).launch {
delay(1000)//1秒无阻塞延迟(默认单位为毫秒)
print("CoroutineScope.launch")
}
print("end")//主线程继续,而协程被延迟
}
private suspend fun testSupervisorScope() = supervisorScope {
launch { throw IllegalArgumentException("随便抛一个异常") }
launch {
delay(1000)
Log.e("crx", "另一个协程")
}
}
private suspend fun testCoroutineScope() = coroutineScope {
launch { throw IllegalArgumentException("随便抛一个异常") }
launch {
delay(1000)
Log.e("crx", "另一个协程")
}
}
//执行testSupervisorScope方法打印的结果
E/crx: 异常信息: 随便抛一个异常
E/crx: 另一个协程
E/crx: 在执行完了Scope之后
//执行testCoroutineScope方法打印的结果
E/crx: 异常信息: 随便抛一个异常
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
btn_data.setOnClickListener {
lifecycleScope.launch {//使用lifecycleScope创建协程
//协程执行体
}
}
}
}
class MainViewModel : ViewModel() {
fun getData() {
viewModelScope.launch {//使用viewModelScope创建协程
//执行协程
}
}
}
协程异常
当协程作用域中的一个协程发生异常时,此时的异常流程如下所示:
- 发生异常的协程被cancel
- 异常传递到它的父协程
- 父协程cancel(取消其所有子协程)
- 将异常在协程树上进一步向上传播
被封装到deferred对象中的异常才会在调用await时抛出。
private val job: Job = Job()
private val scope = CoroutineScope(Dispatchers.Default + job)
private fun doWork(): Deferred<String> = scope.async { throw NullPointerException("自定义空指针异常") }
private fun loadData() = scope.launch {
try {
doWork().await()
} catch (e: Exception) {
Log.d("try catch捕获的异常:", e.toString())
}
}
Job.cancel 取消任务时会抛出CancellationException 给指定协程,但是不会结构化并发到父协程。
CoroutineExceptionHandler
CoroutineExceptionHandler只能处理当前域内开启的子协程或者当前协程抛出的异常。
supervisorScope 和SupervisorJob
supervisorScope 和 SupervisorJob的原理是:将异常不传播给自己的父协程。
调度器Dispatcher
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//将可运行块的执行分派到给定上下文中的另一个线程上。这个方法应该保证给定的[block]最终会被调用。
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//返回一个continuation,它封装了提供的[continuation],拦截了所有的恢复。
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//CoroutineDispatcher是一个协程上下文元素,而'+'是一个用于协程上下文的集合和操作符。
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}
使用方式一:
fun dispatchersTest() {
//创建一个在主线程执行的协程作用域
val mainScope = MainScope()
mainScope.launch {
launch(Dispatchers.Main) {//在协程上下参数中指定调度器
print("主线程调度器")
}
launch(Dispatchers.Default) {
print("默认调度器")
}
launch(Dispatchers.Unconfined) {
print("任意调度器")
}
launch(Dispatchers.IO) {
print("IO调度器")
}
}
}
使用方式二:
//用给定的协程上下文调用指定的挂起块,挂起直到它完成,并返回结果。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T
GlobalScope.launch(Dispatchers.Main) {//开始协程:主线程
val result: User = withContext(Dispatchers.IO) {//网络请求(IO 线程)
userApi.getUserSuspend("FollowExcellence")
}
tv_title.text = result.name //更新 UI(主线程)
}
启动模式
CoroutineStart是一个枚举类,为协程构建器定义启动选项。在协程构建的start参数中使用。
协程上下文
协程使用以下几种元素集定义协程的行为,它们均继承自CoroutineContext:
- Job:协程的句柄,对协程的控制和管理生命周期。
- CoroutineName:协程的名称,可用于调试。
- CoroutineDispatcher:调度器,确定协程在指定的线程来执行。
- CoroutineExceptionHandler:协程异常处理器,处理未捕获的异常。
suspend本质
//Continuation接口表示挂起点之后的延续,该挂起点返回类型为“T”的值。
public interface Continuation<in T> {
//对应这个Continuation的协程上下文
public val context: CoroutineContext
//恢复相应协程的执行,传递一个成功或失败的结果作为最后一个挂起点的返回值。
public fun resumeWith(result: Result<T>)
}
//将[value]作为最后一个挂起点的返回值,恢复相应协程的执行。
fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
//恢复相应协程的执行,以便在最后一个挂起点之后重新抛出[异常]。
fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
Kotlin 使用堆栈帧来管理要运行哪个函数以及所有局部变量。挂起(暂停)协程时,会复制并保存当前的堆栈帧以供稍后使用,将信息保存到Continuation对象中。恢复协程时,会将堆栈帧从其保存位置复制回来,对应的Continuation通过调用resumeWith函数才会恢复协程的执行,然后函数再次开始运行。同时返回Result<T>类型的成功或者异常的结果。
@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User
反编译后
public abstract getUserSuspend(Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public interface Continuation<in T> {
//协程上下文
public val context: CoroutineContext
//恢复相应协程的执行,传递一个成功或失败的[result]作为最后一个挂起点的返回值。
public fun resumeWith(result: Result<T>)
}
协程原理
https://mp.weixin.qq.com/s/nXfweTaOCpm6Bj34rW-wLA 协程的本质和原理:基于CPS( Continuation-Passing-Style Transformation)和状态机
- 挂起函数,在执行的时候并不一定都会挂起,挂起函数里包含其他挂起函数的时候,它才会真正被挂起
- 挂起函数只能在其他挂起函数中被调用(or 协程作用域)
Continuation 则代表了,程序继续运行下去需要执行的代码,接下来要执行的代码 或者 剩下的代码。
internal abstract class BaseContinuationImpl(...) {
// 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由编译生成的协程相关类来实现,例如 postItem$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
suspend fun testCoroutine() {
log("start")
val user = getUserInfo()
log(user)
val friendList = getFriendList(user)
log(friendList)
val feedList = getFeedList(friendList)
log(feedList)
}
fun testCoroutine(completion: Continuation<Any?>): Any? {
//completion表示运行完testCoroutine挂起函数后需要运行的代码
class TestContinuation(completion: Continuation<Any?>?) : ContinuationImpl(completion) {
// 表示协程状态机当前的状态
var label: Int = 0
// 协程返回结果
var result: Any? = null
// 用于保存之前协程的计算结果
var mUser: Any? = null
var mFriendList: Any? = null
// invokeSuspend 是协程的关键(Continuation#resumeWith中会调用invokeSuspend)
// 它最终会调用 testCoroutine(this) 开启协程状态机
// 状态机相关代码就是后面的 when 语句
// 协程的本质,可以说就是 CPS + 状态机
override fun invokeSuspend(_result: Result<Any?>): Any? {
result = _result
label = label or Int.Companion.MIN_VALUE
return testCoroutine(this)
}
}
//说明在运行期间只会生成一个Continuation对象
val continuation = if (completion is TestContinuation) {
completion
} else {
// 作为参数
// ↓
//1.初次运行
//2.用一个新的Continuation包装了旧的Continuation
TestContinuation(completion)
}
// 三个变量,对应原函数的三个变量
lateinit var user: String
lateinit var friendList: String
lateinit var feedList: String
// result 接收协程的运行结果
var result = continuation.result
// suspendReturn 接收挂起函数的返回值
var suspendReturn: Any? = null
// CoroutineSingletons 是个枚举类
// COROUTINE_SUSPENDED 代表当前函数被挂起了
val sFlag = CoroutineSingletons.COROUTINE_SUSPENDED
when (continuation.label) {
0 -> {
// 检测异常
throwOnFailure(result)
log("start")
// 将 label 置为 1,准备进入下一次状态
continuation.label = 1
// 执行 getUserInfo
suspendReturn = getUserInfo(continuation)
// 判断是否挂起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
1 -> {
throwOnFailure(result)
// 获取 user 值
user = result as String
log(user)
// 将协程结果存到 continuation 里
continuation.mUser = user
// 准备进入下一个状态
continuation.label = 2
// 执行 getFriendList
suspendReturn = getFriendList(user, continuation)
// 判断是否挂起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
2 -> {
throwOnFailure(result)
user = continuation.mUser as String
// 获取 friendList 的值
friendList = result as String
log(friendList)
// 将协程结果存到 continuation 里
continuation.mUser = user
continuation.mFriendList = friendList
// 准备进入下一个状态
continuation.label = 3
// 执行 getFeedList
suspendReturn = getFeedList(friendList, continuation)
// 判断是否挂起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
3 -> {
throwOnFailure(result)
user = continuation.mUser as String
friendList = continuation.mFriendList as String
feedList = continuation.result as String
log(feedList)
loop = false
}
}
}
协程线程切换原理
https://mp.weixin.qq.com/s/iitYHxn6vPpE_wMsiPoplg
协程的并发处理
https://mp.weixin.qq.com/s/6paEFQDD-lHYMjcWmwZdhw
非阻塞式锁Mutex:拿不到锁时协程就挂起
线程中锁都是阻塞式,在没有获取锁时无法执行其他逻辑,而协程可以通过挂起函数解决这个,没有获取锁就挂起协程,获取后再恢复协程,协程挂起时线程并没有阻塞可以执行其他逻辑。这种互斥锁就是Mutex,它与synchronized关键字有些类似,还提供了withLock扩展函数,替代常用的mutex.lock; try {...} finally { mutex.unlock() }:
image.pngMutex大致逻辑还是非常清晰的,协程先获取锁,然后执行代码块,然后释放锁,其他协程如果进入,必须先获取锁,获取不到协程执行挂起方法suspend fun lockSuspend(owner: Any?), 加入等待队列,挂起协程。等待其他协程释放锁之后,恢复协程。
整体还是建立在CAS基础上,封装的一套解决方案。
协程的异常处理方式
https://juejin.cn/post/6935472332735512606/
如果协程本身不使用try-catch子句自行处理异常,则不会重新抛出该异常,因此无法通过外部try-catch子句进行处理。
异常会在“Job层次结构中传播”,可以由已设置的CoroutineExceptionHandler处理。 如果未设置,则调用该线程的未捕获异常处理程序。如下代码依然会崩溃
fun main() {
val topLevelScope = CoroutineScope(Job())
topLevelScope.launch {
try {
launch {
throw RuntimeException("RuntimeException in nested coroutine")
}
} catch (exception: Exception) {
println("Handle $exception")
}
}
Thread.sleep(100)
}
为了使CoroutineExceptionHandler起作用,必须将其设置在CoroutineScope或顶级协程中, 给子协程设置CoroutineExceptionHandler是没有效果的。
// ...
val topLevelScope = CoroutineScope(Job() + coroutineExceptionHandler)
// ...
// ...
topLevelScope.launch(coroutineExceptionHandler) {
// ...
在代码的特定部分处理异常,可使用try-catch。
全局捕获异常,并且其中一个任务异常,其他任务不执行,可使用CoroutineExceptionHandler,节省资源消耗。
并行任务间互不干扰,任何一个任务失败,其他任务照常运行,可使用SupervisorScope+async模式。
协程的结构化并发
1、父作用域的生命周期持续到所有子作用域执行完;
2、当结束父作用域结束时,同时结束它的各个子作用域;
3、子作用域未捕获到的异常将不会被重新抛出,而是一级一级向父作用域传递,这种异常传播将导致父作用域失败,进而导致其子作用域的所有请求被取消。
网友评论