美文网首页
kotlin协程

kotlin协程

作者: taoyyyy | 来源:发表于2022-11-22 08:59 被阅读0次

协程基础

  • 轻量级线程。在一个线程中可以启动多个协程。
  • 在协程中使用同步方式写出异步代码(协程挂起时不会阻塞线程),解决回调地狱。
image.png
GlobalScope.launch(Dispatchers.Main) {//开始协程:主线程
   val result = userApi.getUserSuspend("suming")//网络请求(IO 线程)
   tv_name.text = result?.name //更新 UI(主线程)
}

在主线程中创建协程A中执行整个业务流程,如果遇到异步调用任务则协程A被挂起,切换到IO线程中创建子协程B,获取结果后再恢复到主线程的协程A上,然后继续执行剩下的流程。

xxxScope.launch()、runBlocking:T与async

  • runBlocking:T:顶层函数,创建一个新的协程同时阻塞当前线程,直到其内部所有逻辑以及子协程所有逻辑全部执行完成,返回值是泛型T,一般在项目中不会使用,主要是为main函数和测试设计的。
  • launch:创建一个新的协程,不会阻塞当前线程,必须在协程作用域中才可以调用。它返回的是一个该协程任务的引用,即Job对象。这是最常用的用于启动协程的方式。
  • async:创建一个新的协程,不会阻塞当前线程,必须在协程作用域中才可以调用。并返回Deffer对象,可通过调用Deffer.await()方法等待该子协程执行完成并获取结果。常用于并发执行-同步等待和获取返回值的情况。
# Builders.common.kt
    
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

Job与Deffered

  • Job与Deffered的api设计类似于Thread
  • Job实例作为协程的唯一标识,用于处理协程,并且负责管理协程的生命周期
public interface Job : CoroutineContext.Element {
    //活跃的,是否仍在执行
    public val isActive: Boolean


    //启动协程,如果启动了协程,则为true;如果协程已经启动或完成,则为false
    public fun start(): Boolean
    
    //取消Job,可通过传入Exception说明具体原因
    public fun cancel(cause: CancellationException? = null)
    
    //挂起协程直到此Job完成
    public suspend fun join()
    
    //取消任务并等待任务完成,结合了[cancel]和[join]的调用
    public suspend fun Job.cancelAndJoin()


    //给Job设置一个完成通知,当Job执行完成的时候会同步执行这个函数
    public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
}

Job 还可以有层级关系,一个Job可以包含多个子Job,当父Job被取消后,所有的子Job也会被自动取消;当子Job被取消或者出现异常后父Job也会被取消。具有多个子 Job 的父Job 会等待所有子Job完成(或者取消)后,自己才会执行完成。

public interface Deferred<out T> : Job {
    //等待协程执行完成并获取结果
    public suspend fun await(): T
}

协程作用域

  • runBlocking:顶层函数,它的第二个参数为接收者是CoroutineScope的函数字面量,可启动协程。但是它会阻塞当前线程,主要用于测试。
  • GlobalScope:全局协程作用域,通过GlobalScope创建的协程不会有父协程,可以把它称为根协程。它启动的协程的生命周期只受整个应用程序的生命周期的限制,且不能取消,在运行时会消耗一些内存资源,这可能会导致内存泄露,所以仍不适用于业务开发。
  • coroutineScope:创建一个独立的协程作用域,直到所有启动的协程都完成后才结束自身。它是一个挂起函数,需要运行在协程内或挂起函数内。当这个作用域中的任何一个子协程失败时,这个作用域失败,所有其他的子程序都被取消。为并行分解工作而设计的。
  • supervisorScope:与coroutineScope类似,不同的是子协程的异常不会影响父协程,也不会影响其他子协程。(作用域本身的失败(在block或取消中抛出异常)会导致作用域及其所有子协程失败,但不会取消父协程。)
  • MainScope:为UI组件创建主作用域。一个顶层函数,上下文是SupervisorJob() + Dispatchers.Main,说明它是一个在主线程执行的协程作用域,通过cancel对协程进行取消。推荐使用。
  • lifecycleScope:Lifecycle Ktx库提供的具有生命周期感知的协程作用域,与Lifecycle绑定生命周期,生命周期被销毁时,此作用域将被取消。会与当前的UI组件绑定生命周期,界面销毁时该协程作用域将被取消,不会造成协程泄漏,推荐使用。
  • viewModelScope:与lifecycleScope类似,与ViewModel绑定生命周期,当ViewModel被清除时,这个作用域将被取消。推荐使用。
fun launchTest2() {
    print("start")
    //开启一个IO模式的协程,通过协程上下文创建一个CoroutineScope对象,需要一个类型为CoroutineContext的参数
    val job = CoroutineScope(Dispatchers.IO).launch {
        delay(1000)//1秒无阻塞延迟(默认单位为毫秒)
        print("CoroutineScope.launch")
    }
    print("end")//主线程继续,而协程被延迟
}
private suspend fun testSupervisorScope() = supervisorScope {    
    launch { throw IllegalArgumentException("随便抛一个异常") }    
    launch {        
        delay(1000)        
        Log.e("crx", "另一个协程")
    }
}


private suspend fun testCoroutineScope() = coroutineScope {    
    launch { throw IllegalArgumentException("随便抛一个异常") }    
    launch {        
        delay(1000)        
        Log.e("crx", "另一个协程")    
    }
}

//执行testSupervisorScope方法打印的结果
E/crx: 异常信息: 随便抛一个异常
E/crx: 另一个协程
E/crx: 在执行完了Scope之后


//执行testCoroutineScope方法打印的结果
E/crx: 异常信息: 随便抛一个异常
class MainActivity : AppCompatActivity() {


    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)


        btn_data.setOnClickListener {
            lifecycleScope.launch {//使用lifecycleScope创建协程
                //协程执行体
            }
        }
    }
}


class MainViewModel : ViewModel() {
    fun getData() {
        viewModelScope.launch {//使用viewModelScope创建协程
            //执行协程
        }
    }
}

协程异常

当协程作用域中的一个协程发生异常时,此时的异常流程如下所示:

  • 发生异常的协程被cancel
  • 异常传递到它的父协程
  • 父协程cancel(取消其所有子协程)
  • 将异常在协程树上进一步向上传播

被封装到deferred对象中的异常才会在调用await时抛出。

    private val job: Job = Job()
    private val scope = CoroutineScope(Dispatchers.Default + job)

    private fun doWork(): Deferred<String> = scope.async { throw NullPointerException("自定义空指针异常") }


    private fun loadData() = scope.launch {
        try {
            doWork().await()
        } catch (e: Exception) {
            Log.d("try catch捕获的异常:", e.toString())
        }
    }

Job.cancel 取消任务时会抛出CancellationException 给指定协程,但是不会结构化并发到父协程。

CoroutineExceptionHandler

CoroutineExceptionHandler只能处理当前域内开启的子协程或者当前协程抛出的异常。

supervisorScope 和SupervisorJob

supervisorScope 和 SupervisorJob的原理是:将异常不传播给自己的父协程。

调度器Dispatcher

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {


    //将可运行块的执行分派到给定上下文中的另一个线程上。这个方法应该保证给定的[block]最终会被调用。
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)


    //返回一个continuation,它封装了提供的[continuation],拦截了所有的恢复。
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>


    //CoroutineDispatcher是一个协程上下文元素,而'+'是一个用于协程上下文的集合和操作符。
    public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}

使用方式一:

fun dispatchersTest() {
    //创建一个在主线程执行的协程作用域
    val mainScope = MainScope()
    mainScope.launch {
        launch(Dispatchers.Main) {//在协程上下参数中指定调度器
            print("主线程调度器")
        }
        launch(Dispatchers.Default) {
            print("默认调度器")
        }
        launch(Dispatchers.Unconfined) {
            print("任意调度器")
        }
        launch(Dispatchers.IO) {
            print("IO调度器")
        }
    }
}

使用方式二:

//用给定的协程上下文调用指定的挂起块,挂起直到它完成,并返回结果。
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T


GlobalScope.launch(Dispatchers.Main) {//开始协程:主线程
    val result: User = withContext(Dispatchers.IO) {//网络请求(IO 线程)
        userApi.getUserSuspend("FollowExcellence")
    }
    tv_title.text = result.name //更新 UI(主线程)
}

启动模式

CoroutineStart是一个枚举类,为协程构建器定义启动选项。在协程构建的start参数中使用。

image.png

协程上下文

协程使用以下几种元素集定义协程的行为,它们均继承自CoroutineContext:

  • Job:协程的句柄,对协程的控制和管理生命周期。
  • CoroutineName:协程的名称,可用于调试。
  • CoroutineDispatcher:调度器,确定协程在指定的线程来执行。
  • CoroutineExceptionHandler:协程异常处理器,处理未捕获的异常。

suspend本质

//Continuation接口表示挂起点之后的延续,该挂起点返回类型为“T”的值。
public interface Continuation<in T> {
    //对应这个Continuation的协程上下文
    public val context: CoroutineContext


    //恢复相应协程的执行,传递一个成功或失败的结果作为最后一个挂起点的返回值。
    public fun resumeWith(result: Result<T>)
}


//将[value]作为最后一个挂起点的返回值,恢复相应协程的执行。
fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))


//恢复相应协程的执行,以便在最后一个挂起点之后重新抛出[异常]。
fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

Kotlin 使用堆栈帧来管理要运行哪个函数以及所有局部变量。挂起(暂停)协程时,会复制并保存当前的堆栈帧以供稍后使用,将信息保存到Continuation对象中。恢复协程时,会将堆栈帧从其保存位置复制回来,对应的Continuation通过调用resumeWith函数才会恢复协程的执行,然后函数再次开始运行。同时返回Result<T>类型的成功或者异常的结果。

@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User

反编译后
public abstract getUserSuspend(Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public interface Continuation<in T> {
    //协程上下文
    public val context: CoroutineContext


    //恢复相应协程的执行,传递一个成功或失败的[result]作为最后一个挂起点的返回值。
    public fun resumeWith(result: Result<T>)
}

协程原理

https://mp.weixin.qq.com/s/nXfweTaOCpm6Bj34rW-wLA 协程的本质和原理:基于CPS( Continuation-Passing-Style Transformation)和状态机

  • 挂起函数,在执行的时候并不一定都会挂起,挂起函数里包含其他挂起函数的时候,它才会真正被挂起
  • 挂起函数只能在其他挂起函数中被调用(or 协程作用域)
    Continuation 则代表了,程序继续运行下去需要执行的代码,接下来要执行的代码 或者 剩下的代码。
image.png image.png
internal abstract class BaseContinuationImpl(...) {
// 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由编译生成的协程相关类来实现,例如 postItem$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
suspend fun testCoroutine() {
    log("start")
    val user = getUserInfo()
    log(user)
    val friendList = getFriendList(user)
    log(friendList)
    val feedList = getFeedList(friendList)
    log(feedList)
}
fun testCoroutine(completion: Continuation<Any?>): Any? {

    //completion表示运行完testCoroutine挂起函数后需要运行的代码
    class TestContinuation(completion: Continuation<Any?>?) : ContinuationImpl(completion) {
        // 表示协程状态机当前的状态
        var label: Int = 0
        // 协程返回结果
        var result: Any? = null


        // 用于保存之前协程的计算结果
        var mUser: Any? = null
        var mFriendList: Any? = null


        // invokeSuspend 是协程的关键(Continuation#resumeWith中会调用invokeSuspend)
        // 它最终会调用 testCoroutine(this) 开启协程状态机
        // 状态机相关代码就是后面的 when 语句
        // 协程的本质,可以说就是 CPS + 状态机
        override fun invokeSuspend(_result: Result<Any?>): Any? {
            result = _result
            label = label or Int.Companion.MIN_VALUE
            return testCoroutine(this)
        }
    }

    //说明在运行期间只会生成一个Continuation对象
    val continuation = if (completion is TestContinuation) {
        completion
    } else {
        //                作为参数
        //                   ↓
        //1.初次运行
        //2.用一个新的Continuation包装了旧的Continuation
        TestContinuation(completion)
    }


        // 三个变量,对应原函数的三个变量
    lateinit var user: String
    lateinit var friendList: String
    lateinit var feedList: String


    // result 接收协程的运行结果
    var result = continuation.result


    // suspendReturn 接收挂起函数的返回值
    var suspendReturn: Any? = null


    // CoroutineSingletons 是个枚举类
    // COROUTINE_SUSPENDED 代表当前函数被挂起了
    val sFlag = CoroutineSingletons.COROUTINE_SUSPENDED

    when (continuation.label) {
    0 -> {
        // 检测异常
        throwOnFailure(result)


        log("start")
        // 将 label 置为 1,准备进入下一次状态
        continuation.label = 1


        // 执行 getUserInfo
        suspendReturn = getUserInfo(continuation)


        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    1 -> {
        throwOnFailure(result)


        // 获取 user 值
        user = result as String
        log(user)
        // 将协程结果存到 continuation 里
        continuation.mUser = user
        // 准备进入下一个状态
        continuation.label = 2


        // 执行 getFriendList
        suspendReturn = getFriendList(user, continuation)


        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    2 -> {
        throwOnFailure(result)


        user = continuation.mUser as String


        // 获取 friendList 的值
        friendList = result as String
        log(friendList)


        // 将协程结果存到 continuation 里
        continuation.mUser = user
        continuation.mFriendList = friendList


        // 准备进入下一个状态
        continuation.label = 3


        // 执行 getFeedList
        suspendReturn = getFeedList(friendList, continuation)


        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    3 -> {
        throwOnFailure(result)


        user = continuation.mUser as String
        friendList = continuation.mFriendList as String
        feedList = continuation.result as String
        log(feedList)
        loop = false
    }
}

}

协程线程切换原理

https://mp.weixin.qq.com/s/iitYHxn6vPpE_wMsiPoplg

协程的并发处理

https://mp.weixin.qq.com/s/6paEFQDD-lHYMjcWmwZdhw

非阻塞式锁Mutex:拿不到锁时协程就挂起

线程中锁都是阻塞式,在没有获取锁时无法执行其他逻辑,而协程可以通过挂起函数解决这个,没有获取锁就挂起协程,获取后再恢复协程,协程挂起时线程并没有阻塞可以执行其他逻辑。这种互斥锁就是Mutex,它与synchronized关键字有些类似,还提供了withLock扩展函数,替代常用的mutex.lock; try {...} finally { mutex.unlock() }:

image.png

Mutex大致逻辑还是非常清晰的,协程先获取锁,然后执行代码块,然后释放锁,其他协程如果进入,必须先获取锁,获取不到协程执行挂起方法suspend fun lockSuspend(owner: Any?), 加入等待队列,挂起协程。等待其他协程释放锁之后,恢复协程。

整体还是建立在CAS基础上,封装的一套解决方案。

协程的异常处理方式

https://juejin.cn/post/6935472332735512606/

如果协程本身不使用try-catch子句自行处理异常,则不会重新抛出该异常,因此无法通过外部try-catch子句进行处理。

异常会在“Job层次结构中传播”,可以由已设置的CoroutineExceptionHandler处理。 如果未设置,则调用该线程的未捕获异常处理程序。如下代码依然会崩溃

fun main() {

    val topLevelScope = CoroutineScope(Job())

    topLevelScope.launch {

        try {

            launch {

                throw RuntimeException("RuntimeException in nested coroutine")

            }

        } catch (exception: Exception) {

            println("Handle $exception")

        }

    }

    Thread.sleep(100)

}

为了使CoroutineExceptionHandler起作用,必须将其设置在CoroutineScope或顶级协程中, 给子协程设置CoroutineExceptionHandler是没有效果的。

// ...

val topLevelScope = CoroutineScope(Job() + coroutineExceptionHandler)

// ...

// ...

topLevelScope.launch(coroutineExceptionHandler) {

// ...

在代码的特定部分处理异常,可使用try-catch。

全局捕获异常,并且其中一个任务异常,其他任务不执行,可使用CoroutineExceptionHandler,节省资源消耗。

并行任务间互不干扰,任何一个任务失败,其他任务照常运行,可使用SupervisorScope+async模式。

协程的结构化并发

1、父作用域的生命周期持续到所有子作用域执行完;

2、当结束父作用域结束时,同时结束它的各个子作用域;

3、子作用域未捕获到的异常将不会被重新抛出,而是一级一级向父作用域传递,这种异常传播将导致父作用域失败,进而导致其子作用域的所有请求被取消。

相关文章

网友评论

      本文标题:kotlin协程

      本文链接:https://www.haomeiwen.com/subject/yibgxdtx.html