协程

作者: 纳兰沫 | 来源:发表于2021-05-13 14:36 被阅读0次

    摘录自即学即用Kotlin - 协程

    1.概念

    协程是轻量级的线程,是因为它基于线程池API,所以,处理并发任务这件事上游刃有余
    协程可以使用阻塞的方式写出非阻塞的代码,触发并发时常见的回调地狱

    2.使用

    GlobalScope.launch(Dispatchers.Main) {
        val res = getResult(2)
        mNumTv.text = res.toString()
    }
    

    GlobalScope是协程的作用域
    Dispatchers 是调度器
    launch 协程构建器

    协程的作用域
    • runBlocking 顶层函数,和coroutineScope不一样,会阻塞当前线程来等待,在业务中并不适用
    • GlobalScope 全局协程作用域,可以在整个应用的什么周期中操作,扔不适合用业务开发
    • 自定义作用域 自定义协程的作用域, 不会造成内存泄漏
    class MainActivity : AppCompatActivity() {
    
        val scope = MainScope()
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            //启动协程
            scope.launch(Dispatchers.Unconfined) {
                val one  = getResult(20)
                val two = getResult(40)
                val str = (one + two).toString()
            }
        }
    
        override fun onDestroy() {
            super.onDestroy()
            scope.cancel()
        }
    
    
    
        private suspend fun getResult(num: Int) : Int {
            delay(5000)
            return num * num
        }
    }
    
    调度器

    调度器的作用是将协程限制在特定的线程执行,主要的类型有

    • Dispatcher.Main 指定执行的线程是主线程
    • Dispatcher.IO 指定执行的线程是IO线程
    • Dispatcher.Default 默认的调度器 适合执行CPU密集型的任务
    • Dispatcher.Unconfined 非限制的调度器 指定的线程可能会随着挂起的函数发生变化


      image.png

    lauch

    lauch启动一个新的协程 返回的是一个Job对象,可以使用Job#cancel()取消这个协程
    async是创建一个协程,返回的是一个Deferred<T>对象,可以调用Deferred#await()去获取返回的值

    class MainActivity : AppCompatActivity() {
    
        val scope = MainScope()
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            //启动协程
            scope.launch(Dispatchers.Unconfined) {
                val one  = async { getResult(20) }
                val two = async { getResult(40) }
                val str = (one.await()+ two.await()).toString()
            }
        }
    
        override fun onDestroy() {
            super.onDestroy()
            scope.cancel()
        }
    
        private suspend fun getResult(num: Int) : Int {
            delay(5000)
            return num * num
        }
    }
    

    async能执行并发任务,执行任务的时间缩短了一半

    //启动协程
    scope.launch(Dispatchers.Unconfined) {
      val one = async(start = CoroutineStart.LAZY) { getResult(20) }
      val two = async(start = CoroutineStart.LAZY) { getResult(40) }
      val str = (one.await()+ two.await()).toString()
    }
    

    可以在使用async调用任务的时候设置为懒加载,只有在调用它的时候才为它分配资源

    suspend

    suspend 修饰函数的关键字,意思是当前函数可以挂起,但是仅仅是提醒的作用
    使用挂起函数的常用场景

    • 耗时操作 使用withContext切换到指定的IO线程去进行网络或者数据库请求
    • 等待操作 使用delay方法去等待某个事件
     private suspend fun getResult(num: Int) : Int {
           return withContext(Dispatchers.IO) {
               num * num
           }
      }
    
    private suspend fun getResult(num: Int) : Int {
            delay(5000)
            return num * num
    }
    
    结合 Android Jetpack
        dependencies {
            def lifecycle_version = "2.2.0"
            
            // ViewModel
            implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version"
            // LiveData
            implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle_version"
            // Lifecycles only (without ViewModel or LiveData)
            implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle_version"
        }
    
    lifecycleScope.launch {
        // 代表当前生命周期处于 Resumed 的时候才会执行(选择性使用)
        whenResumed { 
            // ... 具体的协程代码
        }
    }
    

    二、流

    1.基础

    lifecycleScope.launch { 
              createFlow()
                      .collect { num ->
                            //具体的操作
                      }
    }
    
    fun createFlow(): Flow<Int> = flow {
            for (i in 1..10)
                emit(i)
     }
    

    collect函数是一个suspend方法,它必须发生在协程或者带有suspend的方法里面

    2.线程切换

    lifecycleScope.launch { 
                createFlow()
                        //将数据发射的操作放到IO线程中的协程
                        .flowOn(Dispatchers.IO)
                        .collect { num ->
                            //具体的操作
                        }
      }
    
    改变数据发射的线程

    flowOn 使用的参数是协程对应的调度器 实质改变的是协程对应的线程

    改变消费数据的线程

    Flow 的消费线程在我们启动协程指定调度器的时候就确认好了,对应着启动协程的调度器

    3. 异常和完成

    异常捕获
    lifecycleScope.launch {
                flow<Int> {
                    //...
                }.catch { e->
    
                }.collect(
    
                )
    }
    
    完成
    lifecycleScope.launch {
                createFlow()
                    .onCompletion {  }
                    .collect {  }
    
    }
    

    4.Flow的特点

    • 冷流
      当触发collect方法发时候,数据才开始发射
    lifecycleScope.launch {
                val flow = (1..10).asFlow().flowOn(Dispatchers.Main)
                flow.collect { num ->
                    
                }
    }
    
    • 有序
    lifecycleScope.launch { 
                flow<Int> { 
                    for (i in 1..3) {
                        Log.e("Flow","$i emit")
                        emit(i)
                    }
                }.filter {
                    Log.e("Flow","$it filter")
                    it % 2 != 0
                }.map {
                    Log.e("Flow","$it map")
                    "${it * it} money"
                }.collect { 
                    Log.e("Flow","i get $it")
                }
    }
    

    所有的数据都是经过emit filter map和collect一整套完整的处理流程之后,下一个数据才会开始处理

    • 协作取消
      Flow的collect只能在可取消的挂起函数中挂起的时候取消,否则不能取消
     lifecycleScope.launch {
                val f = flow<Int> {
                    for (i in 1..3) {
                        delay(500)
                        Log.e("TAG","emit $i")
                        emit(i)
                    }
                }
                withTimeoutOrNull(1600) {
                    f.collect {
                        delay(5000)
                        Log.e("TAG","consume $it")
                    }
                }
                Log.e("TAG","cancel")
    }
    

    5. 操作符对比

    普通操作符
    image.png
    特殊操作符
    image.png
    组合操作符
    image.png
    扁平流操作符
    image.png
    末端操作符
    image.png

    三、通道

    Channel是一个面向多协程之间数据传输的BlockQueue

           lifecycleScope.launch { 
                //生成一个Channel
                val channel = Channel<Int>()
                
                //Channel发送数据
                launch { 
                    for (i in 1..5) {
                        delay(200)
                        channel.send(i * i)
                    }
                    channel.close()
                }
                //channel接收数据
                launch { 
                    for (y in channel)
                        Log.e("TAG","get $y")
                }
            }
    

    1.创建Channel

    • 直接创建对象
    • 扩展函数produce
    lifecycleScope.launch {
        // 1. 生成一个 Channel
        val channel = produce<Int> {
            for(i in 1..5){
                delay(200)
                send(i * i)
            }
            close()
        }
    
        // 2. 接收数据
    }
    

    2.发送数据

    发送数据使用的 Channel#send() 方法,当我们数据发送完毕的时候,可以使用 Channel#close() 来表明通道已经结束数据的发送。

    3.接收数据

    正常情况下,我们仅需要调用 Channel#receive() 获取数据,但是该方法只能获取一次传递的数据

    repeat(4) {
          Log.e("TAG","get ${channel.receive()}")
    }
    

    当发送的数据不可预估,迭代Channel

    for (y in channel)
           Log.e("TAG","get $y")
    

    相关文章

      网友评论

        本文标题:协程

        本文链接:https://www.haomeiwen.com/subject/pfsadltx.html