Kotlin flow实践总结

作者: Android技术圈 | 来源:发表于2022-08-11 21:26 被阅读0次

    Flow是什么

    按顺序发出多个值的数据流。 本质就是一个生产者消费者模型,生产者发送数据给消费者进行消费。

    • 冷流:当执行collect的时候(也就是有消费者的时候),生产者才开始发射数据流。 生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。
    • 热流:不管有没有执行collect(也就是不管有没有消费者),生产者都会发射数据流到内存中。 生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据

    实践场景

    场景一:简单列表数据的加载状态

    简单的列表显示场景,可以使用onStart,onEmpty,catch,onCompletion等回调操作符,监听数据流的状态,显示相应的加载状态UI。

    • onStart:在数据发射之前触发,onStart所在的线程,是数据产生的线程
    • onCompletion:在数据流结束时触发,onCompletion所在的线程,是数据产生的线程
    • onEmpty:当数据流结束了,缺没有发出任何元素的时候触发。
    • catch:数据流发生错误的时候触发
    • flowOn:指定上游数据流的CoroutineContext,下游数据流不会受到影响
    private fun coldFlowDemo() {
        //创建一个冷流,在3秒后发射一个数据
        val coldFlow = flow<Int> {
            delay(3000)
            emit(1)
        }
        lifecycleScope.launch(Dispatchers.IO) {
            coldFlow.onStart {
                Log.d(TAG, "coldFlow onStart, thread:${Thread.currentThread().name}")
                mBinding.progressBar.isVisible = true
                mBinding.tvLoadingStatus.text = "加载中"
            }.onEmpty {
                Log.d(TAG, "coldFlow onEmpty, thread:${Thread.currentThread().name}")
                mBinding.progressBar.isVisible = false
                mBinding.tvLoadingStatus.text = "数据加载为空"
            }.catch {
                Log.d(TAG, "coldFlow catch, thread:${Thread.currentThread().name}")
                mBinding.progressBar.isVisible = false
                mBinding.tvLoadingStatus.text = "数据加载错误:$it"
            }.onCompletion {
                Log.d(TAG, "coldFlow onCompletion, thread:${Thread.currentThread().name}")
                mBinding.progressBar.isVisible = false
                mBinding.tvLoadingStatus.text = "加载完成"
            }
                //指定上游数据流的CoroutineContext,下游数据流不会受到影响
                .flowOn(Dispatchers.Main)
                .collect {
                    Log.d(TAG, "coldFlow collect:$it, thread:${Thread.currentThread().name}")
                }
        }
    }
    
    

    比如上面的例子。 使用flow构建起函数,创建一个冷流,3秒后发送一个值到数据流中。 使用onStart,onEmpty,catch,onCompletion操作符,监听数据流的状态。

    日志输出:

    coldFlow onStart, thread:main
    coldFlow onCompletion, thread:main
    coldFlow collect:1, thread:DefaultDispatcher-worker-1
    
    

    场景二:同一种数据,需要加载本地数据和网络数据

    在实际的开发场景中,经常会将一些网络数据保存到本地,下次加载数据的时候,优先使用本地数据,再使用网络数据。 但是本地数据和网络数据的加载完成时机不一样,所以可能会有下面几种场景。

    1. 本地数据比网络数据先加载完成:那先使用本地数据,再使用网络数据
    2. 网络数据比本地数据先加载完成:
    • 网络数据加载成功,那只使用网络数据即可,不需要再使用本地数据了。
    • 网络数据加载失败,可以继续尝试使用本地数据进行兜底。
    1. 本地数据和网络数据都加载失败:通知上层数据加载失败

    实现CacheRepositity

    将上面的逻辑进行简单封装成一个基类,CacheRepositity。 相应的子类,只需要实现两个方法即可。

    • CResult:代表加载结果,Success 或者 Error。
    • fetchDataFromLocal(),实现本地数据读取的逻辑
    • fetchDataFromNetWork(),实现网络数据获取的逻辑
    abstract class CacheRepositity<T> {
        private val TAG = "CacheRepositity"
    
        fun getData() = channelFlow<CResult<T>> {
            supervisorScope {
                val dataFromLocalDeffer = async {
                    fetchDataFromLocal().also {
                        Log.d(TAG,"fetchDataFromLocal result:$it , thread:${Thread.currentThread().name}")
                        //本地数据加载成功  
                        if (it is CResult.Success) {
                            send(it)
                        }
                    }
                }
    
                val dataFromNetDeffer = async {
                    fetchDataFromNetWork().also {
                        Log.d(TAG,"fetchDataFromNetWork result:$it , thread:${Thread.currentThread().name}")
                        //网络数据加载成功  
                        if (it is CResult.Success) {
                            send(it)
                            //如果网络数据已加载,可以直接取消任务,就不需要处理本地数据了
                            dataFromLocalDeffer.cancel()
                        }
                    }
                }
    
                //本地数据和网络数据,都加载失败的情况
                val localData = dataFromLocalDeffer.await()
                val networkData = dataFromNetDeffer.await()
                if (localData is CResult.Error && networkData is CResult.Error) {
                    send(CResult.Error(Throwable("load data error")))
                }
            }
        }
    
        protected abstract suspend fun fetchDataFromLocal(): CResult<T>
    
        protected abstract suspend fun fetchDataFromNetWork(): CResult<T>
    
    }
    
    sealed class CResult<out R> {
        data class Success<out T>(val data: T) : CResult<T>()
        data class Error(val throwable: Throwable) : CResult<Nothing>()
    }
    
    

    测试验证

    写个TestRepositity,实现CacheRepositity的抽象方法。 通过delay延迟耗时来模拟各种场景,观察日志的输出顺序。

    private fun cacheRepositityDemo(){
        val repositity=TestRepositity()
        lifecycleScope.launch {
            repositity.getData().onStart {
                Log.d(TAG, "TestRepositity: onStart")
            }.onCompletion {
                Log.d(TAG, "TestRepositity: onCompletion")
            }.collect {
                Log.d(TAG, "collect: $it")
            }
        }
    }
    
    

    本地数据比网络数据加载快

    class TestRepositity : CacheRepositity<String>() {
        override suspend fun fetchDataFromLocal(): CResult<String> {
            delay(1000)
            return CResult.Success("data from fetchDataFromLocal")
        }
    
        override suspend fun fetchDataFromNetWork(): CResult<String> {
            delay(2000)
            return CResult.Success("data from fetchDataFromNetWork")
        }
    }
    
    

    模拟数据:本地加载delay1秒,网络加载delay2秒 日志输出:collect 执行两次,先收到本地数据,再收到网络数据。

    onStart
    fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
    collect: Success(data=data from fetchDataFromLocal)
    fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
    collect: Success(data=data from fetchDataFromNetWork)
    onCompletion
    
    

    网络数据比本地数据加载快

    class TestRepositity : CacheRepositity<String>() {
        override suspend fun fetchDataFromLocal(): CResult<String> {
            delay(2000)
            return CResult.Success("data from fetchDataFromLocal")
        }
    
        override suspend fun fetchDataFromNetWork(): CResult<String> {
            delay(1000)
            return CResult.Success("data from fetchDataFromNetWork")
        }
    }
    
    

    模拟数据:本地加载delay 2秒,网络加载delay 1秒 日志输出:collect 只执行1次,只收到网络数据。

    onStart
    fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
    collect: Success(data=data from fetchDataFromNetWork)
    onCompletion
    
    

    网络数据加载失败,使用本地数据

    class TestRepositity : CacheRepositity<String>() {
        override suspend fun fetchDataFromLocal(): CResult<String> {
            delay(2000)
            return CResult.Success("data from fetchDataFromLocal")
        }
    
        override suspend fun fetchDataFromNetWork(): CResult<String> {
            delay(1000)
            return CResult.Error(Throwable("fetchDataFromNetWork Error"))
        }
    }
    

    模拟数据:本地加载delay 2秒,网络数据加载失败 日志输出:collect 只执行1次,只收到本地数据。

    onStart
    fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
    fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
    collect: Success(data=data from fetchDataFromLocal)
    onCompletion
    

    网络数据和本地数据都加载失败

    class TestRepositity : CacheRepositity<String>() {
        override suspend fun fetchDataFromLocal(): CResult<String> {
            delay(2000)
            return CResult.Error(Throwable("fetchDataFromLocal Error"))
        }
    
        override suspend fun fetchDataFromNetWork(): CResult<String> {
            delay(1000)
            return CResult.Error(Throwable("fetchDataFromNetWork Error"))
        }
    }
    

    模拟数据:本地数据加载失败,网络数据加载失败 日志输出: collect 只执行1次,结果是CResult.Error,代表加载数据失败。

    onStart
    fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
    fetchDataFromLocal result:Error(throwable=java.lang.Throwable: fetchDataFromLocal Error) , thread:main
    collect: Error(throwable=java.lang.Throwable: load data error)
    onCompletion
    

    场景三:多种数据源,按照顺序合并进行展示

    在实际的开发场景中,经常一个页面的数据,是需要发起多个网络请求之后,组合数据之后再进行显示。 比如类似这种页面,3种数据,需要由3个网络请求获取得到,然后再进行相应的显示。

    实现目标:

    1. 接口间不需要互相等待,哪些数据先回来,就先展示哪部分
    2. 控制数据的显示顺序

    flow combine操作符

    可以合并多个不同的 Flow 数据流,生成一个新的流。 只要其中某个子 Flow 数据流有产生新数据的时候,就会触发 combine 操作,进行重新计算,生成一个新的数据。

    例子

    class HomeViewModel : ViewModel() {
    
        //暴露给View层的列表数据
        val list = MutableLiveData<List<String?>>()
    
        //多个子Flow,这里简单都返回String,实际场景根据需要,返回相应的数据类型即可
        private val bannerFlow = MutableStateFlow<String?>(null)
        private val channelFlow = MutableStateFlow<String?>(null)
        private val listFlow = MutableStateFlow<String?>(null)
    
    
        init {
            //使用combine操作符
            viewModelScope.launch {
                combine(bannerFlow, channelFlow, listFlow) { bannerData, channelData, listData ->
                    Log.d("HomeViewModel", "combine  bannerData:$bannerData,channelData:$channelData,listData:$listData")
                    //只要子flow里面的数据不为空,就放到resultList里面
                    val resultList = mutableListOf<String?>()
                    if (bannerData != null) {
                        resultList.add(bannerData)
                    }
                    if (channelData != null) {
                        resultList.add(channelData)
                    }
                    if (listData != null) {
                        resultList.add(listData)
                    }
                    resultList
                }.collect {
                    //收集combine之后的数据,修改liveData的值,通知UI层刷新列表
                    Log.d("HomeViewModel", "collect: ${it.size}")
                    list.postValue(it)
                }
            }
        }
    
        fun loadData() {
            viewModelScope.launch(Dispatchers.IO) {
                //模拟耗时操作
                async {
                    delay(1000)
                    Log.d("HomeViewModel", "getBannerData success")
                    bannerFlow.emit("Banner")
                }
                async {
                    delay(2000)
                    Log.d("HomeViewModel", "getChannelData success")
                    channelFlow.emit("Channel")
                }
                async {
                    delay(3000)
                    Log.d("HomeViewModel", "getListData success")
                    listFlow.emit("List")
                }
            }
        }
    }
    
    

    HomeViewModel

    1. 提供一个 LiveData 的列表数据给View层使用
    2. 内部有3个子 flow ,分别负责相应数据的生产。(这里简单都返回String,实际场景根据需要,返回相应的数据类型即可)。
    3. 通过 combine 操作符,组合这3个子flow的数据。
    4. collect 接收生成的新数据,并修改liveData的数据,通知刷新UI

    View层使用

    private fun flowCombineDemo() {
        val homeViewModel by viewModels<HomeViewModel>()
        homeViewModel.list.observe(this) {
            Log.d("HomeViewModel", "observe size:${it.size}")
        }
        homeViewModel.loadData()
    }
    
    

    简单的创建一个 ViewModel ,observe 列表数据对应的 LiveData。 通过输出的日志发现,触发数据加载之后,每次子 Flow 流生产数据的时候,都会触发一次 combine 操作,生成新的数据。

    日志输出:
    combine  bannerData:null,channelData:null,listData:null
    collect: 0
    observe size:0
    
    getBannerData success
    combine  bannerData:Banner,channelData:null,listData:null
    collect: 1
    observe size:1
    
    getChannelData success
    combine  bannerData:Banner,channelData:Channel,listData:null
    collect: 2
    observe size:2
    
    getListData success
    combine  bannerData:Banner,channelData:Channel,listData:List
    collect: 3
    observe size:3
    
    

    总结

    具体场景,具体分析。刚好这几个场景,配合Flow进行使用,整体实现也相对简单了一些。

    作者:入魔的冬瓜
    链接:https://juejin.cn/post/7065327938064875534
    来源于掘金

    相关文章

      网友评论

        本文标题:Kotlin flow实践总结

        本文链接:https://www.haomeiwen.com/subject/ccvtgrtx.html