美文网首页Android进阶笔记
干掉RxJava系列--2. 手写FlowBus替代RxBus/

干掉RxJava系列--2. 手写FlowBus替代RxBus/

作者: 今阳说 | 来源:发表于2021-12-31 10:09 被阅读0次

    LiveData的不足

    • LiveData 是一个专用于 Android 的具备自主生命周期感知能力的可观察的数据存储器类,被有意简化设计,这使得开发者很容易上手,但其不足有如下两点:
    1. LiveData只能在主线程更新数据(postValue底层也是切换到主线程的,而且可能会有丢数据的问题);
    2. LiveData操作符不够强大, 对于较为复杂的交互数据流场景,建议使用 RxJava 或 Flow;
    3. LiveData与Android平台紧密相连,虽然LiveData在表现层中运行良好,但它并不适合领域层,因为领域层最好是独立于平台的;
    • LiveData 对于 Java 开发者、初学者或是一些简单场景而言仍是可行的解决方案。对于MVVM架构而言,View和ViewModel之间可以通过LiveData交互(看了下面就知道其实也可以用StateFlow), ViewModel和Repository之间就可以通过Flow交互;

    RxJava的不足

    • RxJava还是相当强大的,基于事件流的链式调用,进行耗时任务,线程切换,是一个很好的异步操作库, 但是对于Android开发来说其也有一些不足之处
    1. 强大意味着复杂,其繁多的操作符简直是初学者的噩梦;
    2. 它是非官方的,google自然也就不会花大力气去推广和优化;
    3. 为项目的包体积带来了额外的增加;

    Flow

    • Flow 是一种 "冷流"(Cold Stream)。"冷流" 是一种数据源,该类数据源的生产者会在每个监听者开始消费事件的时候执行(即不消费则不生产数据,而LiveData的发送端并不依赖于接收端),从而在每个订阅上创建新的数据流(有多个订阅者的时候,他们各自的事件是独立的)。一旦消费者停止监听或者生产者的阻塞结束,数据流将会被自动关闭。
    • Flow 是 Kotlin 协程与响应式编程模型结合的产物,支持线程切换、背压,通过协程取消功能提供自动清理功能,因此倾向于执行一些重型任务。
    • 使用 take, first, toList 等操作符可以简化 Flow 的相关代码测试。
    • Flow本身并不了解Android的生命周期,也不提供Android生命周期状态变化时收集器的自动暂停和恢复,可以使用LifecycleCoroutineScope的扩展,如 launchWhenStarted来启动coroutine来收集我们的Flow--这些收集器将自动暂停,并与组件的Lifecycle同步恢复。
    • 相较于 Channel,Flow 末端操作符 会触发数据流的执行,同时会根据生产者一侧流操作来决定是成功完成操作还是抛出异常,因此 Flows 会自动地关闭数据流,不会在生产者一侧泄漏资源;而一旦 Channel 没有正确关闭,生产者可能不会清理大型资源,因此 Channels 更容易造成资源泄漏。

    Flow的一些常用操作符

    //        val flow = flowOf(1,2,3,4,5)
    //        val flow: Flow<Int> = flow {
    //            List(20) {
    //                emit(it)//发送数据
    //                delay(300)
    //            }
    //        }
    val flow = (1..10).asFlow()
    lifecycleScope.launch {
        flow.flowOn(Dispatchers.IO)//设定它运行时所使用的调度器,设置的调度器只对它之前的操作有影响
            .onStart { log("onStart") }
            .flowOn(Dispatchers.Main)
            .onEach {
                log("onEach:$it")
                delay(300)
            }
            .filter {//过滤
                it % 2 == 0
            }
            .map {//变换
                log("map:$it*$it")
                it * it
            }
            .transform<Int,String> {
                "num=$it"
    //                    emit("num1=$it")
    //                    emit("num2=$it")
            }
            .flowOn(Dispatchers.IO)
            .onCompletion {//订阅流的完成,执行在流完成时的逻辑
                log("onCompletion: $it")
            }
            .catch {//捕获 Flow 的异常,catch 函数只能捕获它的上游的异常
                log("catch: $it")
            }
            .flowOn(Dispatchers.Main)
            .collect {//消费Flow
                log("collect1_1: $it")
            }
        //Flow 可以被重复消费
        flow.collect { log("collect1_2: $it") }
        //除了可以在 collect 处消费 Flow 的元素以外,还可以通过 onEach 来做到这一点。
        // 这样消费的具体操作就不需要与末端操作符放到一起,collect 函数可以放到其他任意位置调用
        flow.onEach {
            log("onEach2:$it")
        }
        withContext(Dispatchers.IO) {
            delay(1000)
            flow.collect()
        }
        //除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文
        flow.onEach {
            log("onEach2:$it")
        }.launchIn(CoroutineScope(Dispatchers.IO))
            .join()//主线程等待这个协程执行结束
    

    Flow的取消

    lifecycleScope.launch(Dispatchers.IO) {
        val flow2 = (1..10).asFlow().onEach { delay(1000) }
        val job: Job = lifecycleScope.launch {
            log("lifecycleScope.launch")
            flow2.flowOn(Dispatchers.IO)//设定它运行时所使用的调度器
                .collect {//消费Flow
                    log("flow2:$it")
                }
        }
        delay(2000)
        job.cancelAndJoin()
    }
    

    Flow 的背压

    • 只要是响应式编程,就一定会有背压问题,我们先来看看背压究竟是什么。
    • 背压问题在生产者的生产速率高于消费者的处理速率的情况下出现。为了保证数据不丢失,我们也会考虑添加缓存来缓解问题:
    //为 Flow 添加缓冲
    flow {
        List(5) {
            emit(it)
        }
    }.buffer().collect {
        log("flow buffer collect:$it")
    }
    
    • 也可以为 buffer 指定一个容量。不过,如果我们只是单纯地添加缓存,而不是从根本上解决问题就始终会造成数据积压。
    • 问题产生的根本原因是生产和消费速率的不匹配,除直接优化消费者的性能以外,我们也可以采取一些取舍的手段。
    • 第一种是 conflate。与 Channel 的 Conflate 模式一致,新数据会覆盖老数据,
    flow {
        List(10) {
            emit(it)
        }
    }
    .conflate()
    .collect { value ->
        log("flow conflate Collecting $value")
        delay(100)
        log("$value collected flow conflate ")
    }
    
    • 第二种是 collectLatest。顾名思义,只处理最新的数据,这看上去似乎与 conflate 没有区别,其实区别大了:它并不会直接用新数据覆盖老数据,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消除 collectLatest 之外还有 mapLatest、flatMapLatest 等等,都是这个作用。
    flow {
        List(10) {
            emit(it)
        }
    }.collectLatest { value ->
        log("flow collectLatest Collecting $value")
        delay(100)
        log("$value collected flow collectLatest ")
    }
    

    使用更为安全的方式收集 Android UI 数据流

    • 在 Android 开发中,请使用 LifecycleOwner.addRepeatingJob、suspend Lifecycle.repeatOnLifecycle 或 Flow.flowWithLifecycle 从 UI 层安全地收集数据流。(lifecycle-runtime-ktx:2.4.+ 库中所提供的)
    lifecycleScope.launch {
        delay(500)
        repeatOnLifecycle(Lifecycle.State.STARTED) {
            flow.collect { log("collect2: $it") }
        }
    }
    lifecycleScope.launchWhenStarted {
        delay(1000)
        flow.collect { log("collect3: $it") }
    }
    lifecycleScope.launch {
        delay(1500)
        flow.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
            .collect { log("collect4: $it") }
    }
    

    SharedFlow

    • 冷流和订阅者只能是一对一的关系,当我们要实现一个流,多个订阅者的需求时,就需要热流了,SharedFlow就是一种热流
    • 其构造函数如下
    public fun <T> MutableSharedFlow(
        replay: Int = 0,//当新的订阅者Collect时,发送几个已经发送过的数据给它,默认为0,即默认新订阅者不会获取以前的数据
        extraBufferCapacity: Int = 0,//表示减去replay,MutableSharedFlow还缓存多少数据,默认为0
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND//表示缓存策略,即缓冲区满了之后Flow如何处理
        //BufferOverflow.SUSPEND 策略,也就是挂起策略, 默认为挂起
        //BufferOverflow.DROP_OLDEST: 丢弃旧数据
        //BufferOverflow.DROP_LATEST: 丢弃最新的数据
    )
    
    • 简单使用如下
    val sharedFlow = MutableSharedFlow<String>()
    lifecycleScope.launch(Dispatchers.IO) {
        delay(1000)
        sharedFlow.emit("aaa")
        delay(1000)
        sharedFlow.emit("bbb")
        delay(1000)
        sharedFlow.emit("ccc")
    }
    lifecycleScope.launch {
        delay(500)
        sharedFlow.collect { log("collect1:$it") }
    }
    lifecycleScope.launch {
        delay(1500)
        sharedFlow.collect { log("collect2:$it") }
    }
    lifecycleScope.launch {
        delay(2500)
        sharedFlow.collect { log("collect3:$it") }
    }
    lifecycleScope.launch {
        delay(3500)
        sharedFlow.collect { log("collect4:$it") }
    }
    
    • 将冷流Flow转化为SharedFlow
    lifecycleScope.launch {
        (1..5).asFlow().shareIn(
            //1. 共享开始时所在的协程作用域范围
            scope = lifecycleScope,
            //2. 控制共享的开始和结束的策略
            // started = SharingStarted.Lazily,//当首个订阅者出现时开始,在scope指定的作用域被结束时终止
            // started = SharingStarted.Eagerly,//立即开始,而在scope指定的作用域被结束时终止
            //对于那些只执行一次的操作,您可以使用Lazily或者Eagerly。然而,如果您需要观察其他的流,就应该使用WhileSubscribed来实现细微但又重要的优化工作
            //WhileSubscribed策略会在没有收集器的情况下取消上游数据流
            started = SharingStarted.WhileSubscribed(
                500,//stopTimeoutMillis 控制一个以毫秒为单位的延迟值,指的是最后一个订阅者结束订阅与停止上游流的时间差。默认值是 0(比如当用户旋转设备时,原来的视图会先被销毁,然后数秒钟内重建)
                Long.MAX_VALUE//replayExpirationMillis表示数据重播的过时时间,如果用户离开应用太久,此时您不想让用户看到陈旧的数据,你可以用到这个参数
            ),
            //3. 状态流的重播个数
            replay = 0
        ).collect { log("shareIn.collect:$it") }
    }
    

    StateFlow

    • StateFlow继承于SharedFlow,是SharedFlow的一个特殊变种
    • 构造函数如下,只需要传入一个默认值
    public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
    
    • StateFlow本质上是一个replay为1,并且没有缓冲区的SharedFlow,因此第一次订阅时会先获得默认值
    • StateFlow仅在值已更新,并且值发生了变化时才会返回,即如果更新后的值没有变化,也不会回调Collect方法,这点与LiveData不同
    • StateFlow 与 LiveData是最接近的,因为:
    1. 它始终是有值的。
    2. 它的值是唯一的。
    3. 它允许被多个观察者共用 (因此是共享的数据流)。
    4. 它永远只会把最新的值重现给订阅者,这与活跃观察者的数量是无关的。
    
    • 简单使用
    log("StateFlow 默认值:111")
    val stateFlow = MutableStateFlow("111")
    
    lifecycleScope.launch {
        delay(500)
        stateFlow.collect { log("StateFlow collect1:$it") }
    }
    lifecycleScope.launch {
        delay(1500)
        stateFlow.collect { log("StateFlow collect2:$it") }
    }
    lifecycleScope.launch {
        delay(2500)
        stateFlow.collect { log("StateFlow collect3:$it") }
    }
    
    lifecycleScope.launch(Dispatchers.IO) {
        delay(5000)
        log("StateFlow re emit:111")
        stateFlow.emit("111")
        delay(1000)
        log("StateFlow emit:222")
        stateFlow.emit("222")
    }
    
    • 普通流Flow转化成StateFlow
    val stateFlow2: StateFlow<Int> = flow {
        List(10) {
            delay(300)
            emit(it)
        }
    }.stateIn(
        scope = lifecycleScope,
        started = WhileSubscribed(5000),//等待5秒后仍然没有订阅者存在就终止协程
        initialValue = 666//默认值
    )
    lifecycleScope.launchWhenStarted {//STARTED状态时会开始收集流,并且在RESUMED状态时保持收集,进入STOPPED状态时结束收集过程
        stateFlow2.collect { log("StateFlow shareIn.collect:$it") }
    
    }
    

    StateFlow与SharedFlow 的区别

    1. SharedFlow配置更为灵活,支持配置replay,缓冲区大小等,StateFlow是SharedFlow的特化版本,replay固定为1,缓冲区大小默认为0;
    2. StateFlow与LiveData类似,支持通过myFlow.value获取当前状态,如果有这个需求,必须使用StateFlow;
    3. SharedFlow支持发出和收集重复值,而StateFlow当value重复时,不会回调collect;
    4. 对于新的订阅者,StateFlow只会重播当前最新值,SharedFlow可配置重播元素个数(默认为0,即不重播);

    基于SharedFlow封装FlowBus

    创建消息类EventMessage

    class EventMessage {
        /**
         * 消息的key
         */
        var key: Int
    
        /**
         * 消息的主体message
         */
        var message: Any? = null
        private var messageMap: HashMap<String, Any?>? = null
    
        constructor(key: Int, message: Any?) {
            this.key = key
            this.message = message
        }
    
        constructor(key: Int) {
            this.key = key
        }
    
        fun put(key: String, message: Any?) {
            if (messageMap == null) {
                messageMap = HashMap<String, Any?>()
            }
            messageMap?.set(key, message)
        }
    
        operator fun <T> get(key: String?): T? {
            if (messageMap != null) {
                try {
                    return messageMap!![key] as T?
                } catch (e: ClassCastException) {
                    e.printStackTrace()
                }
            }
            return null
        }
    }
    

    创建FlowBus

    class FlowBus : ViewModel() {
        companion object {
            val instance by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { FlowBus() }
        }
    
        //正常事件
        private val events = mutableMapOf<String, Event<*>>()
    
        //粘性事件
        private val stickyEvents = mutableMapOf<String, Event<*>>()
    
        fun with(key: String, isSticky: Boolean = false): Event<Any> {
            return with(key, Any::class.java, isSticky)
        }
    
        fun <T> with(eventType: Class<T>, isSticky: Boolean = false): Event<T> {
            return with(eventType.name, eventType, isSticky)
        }
    
        @Synchronized
        fun <T> with(key: String, type: Class<T>?, isSticky: Boolean): Event<T> {
            val flows = if (isSticky) stickyEvents else events
            if (!flows.containsKey(key)) {
                flows[key] = Event<T>(key, isSticky)
            }
            return flows[key] as Event<T>
        }
    
    
        class Event<T>(private val key: String, isSticky: Boolean) {
    
            // private mutable shared flow
            private val _events = MutableSharedFlow<T>(
                replay = if (isSticky) 1 else 0,
                extraBufferCapacity = Int.MAX_VALUE
            )
    
            // publicly exposed as read-only shared flow
            val events = _events.asSharedFlow()
    
            /**
             * need main thread execute
             */
            fun observeEvent(
                lifecycleOwner: LifecycleOwner,
                dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
                minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
                action: (t: T) -> Unit
            ) {
                lifecycleOwner.lifecycle.addObserver(object : DefaultLifecycleObserver {
                    override fun onDestroy(owner: LifecycleOwner) {
                        super.onDestroy(owner)
                        LjyLogUtil.d("EventBus.onDestroy:remove key=$key")
                        val subscriptCount = _events.subscriptionCount.value
                        if (subscriptCount <= 0)
                            instance.events.remove(key)
                    }
                })
                lifecycleOwner.lifecycleScope.launch(dispatcher) {
                    lifecycleOwner.lifecycle.whenStateAtLeast(minActiveState) {
                        events.collect {
                            try {
                                action(it)
                            } catch (e: Exception) {
                                LjyLogUtil.d("ker=$key , error=${e.message}")
                            }
                        }
                    }
                }
            }
    
            /**
             * send value
             */
            suspend fun setValue(
                event: T,
                dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate
            ) {
                withContext(dispatcher) {
                    _events.emit(event)
                }
    
            }
        }
    }
    

    使用FlowBus

    FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
        LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
    lifecycleScope.launch(Dispatchers.IO) {
        withContext(Dispatchers.Main) {//不创建新的协程,指定协程上运行代码块,可以切换线程
            FlowBus.instance.with(EventMessage::class.java)
                .observeEvent(this@EventBusActivity) {
                    LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
                }
        }
    }
    FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
        LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
    lifecycleScope.launch(Dispatchers.Main) {
    
        val event = EventMessage(111)
        LjyLogUtil.d(
            "FlowBus:send1_${Thread.currentThread().name}_${
                GsonUtils.toJson(
                    event
                )
            }"
        )
        FlowBus.instance.with(EventMessage::class.java).setValue(event)
        delay(2000)
        FlowBus.instance.with(EventMessage::class.java)
            .setValue(EventMessage(101))
        FlowBus.instance.with(EventMessage::class.java)
            .setValue(EventMessage(102))
        FlowBus.instance.with(EventMessage::class.java)
            .setValue(EventMessage(103))
        FlowBus.instance.with(EventMessage::class.java)
            .setValue(EventMessage(104))
        FlowBus.instance.with(EventMessage::class.java)
            .setValue(EventMessage(105))
    }
    lifecycleScope.launch(Dispatchers.IO) {
        delay(4000)
        val event = EventMessage(222, "bbb")
        LjyLogUtil.d(
            "FlowBus:send2_${Thread.currentThread().name}_${
                GsonUtils.toJson(
                    event
                )
            }"
        )
        FlowBus.instance.with(EventMessage::class.java).setValue(event)
    }
    lifecycleScope.launch(Dispatchers.Default) {
        delay(6000)
        withContext(Dispatchers.Main) {
            val event = EventMessage(333, "ccc")
            event.put("key1", 123)
            event.put("key2", "abc")
            LjyLogUtil.d(
                "FlowBus:send3_${Thread.currentThread().name}_${
                    GsonUtils.toJson(
                        event
                    )
                }"
            )
            FlowBus.instance.with(EventMessage::class.java).setValue(event)
        }
    }
    

    进一步优化

    • 利用扩展函数,ViewModelStoreOwner,及预传EventMessage::class.javas是当前项目中的使用更加简单
    //利用扩展函数
    fun LifecycleOwner.observeEvent(
        dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
        minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
        isSticky: Boolean = false,
        action: (t: EventMessage) -> Unit
    ) {
        ApplicationScopeViewModelProvider
            .getApplicationScopeViewModel(FlowBus::class.java)
            .with(EventMessage::class.java, isSticky = isSticky)
            .observeEvent(this@observeEvent, dispatcher, minActiveState, action)
    }
    
    fun postValue(
        event: EventMessage,
        delayTimeMillis: Long = 0,
        isSticky: Boolean = false,
        dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
    ) {
        LjyLogUtil.d("FlowBus:send_${Thread.currentThread().name}_${GsonUtils.toJson(event)}")
        ApplicationScopeViewModelProvider
            .getApplicationScopeViewModel(FlowBus::class.java)
            .viewModelScope
            .launch(dispatcher) {
                delay(delayTimeMillis)
                ApplicationScopeViewModelProvider
                    .getApplicationScopeViewModel(FlowBus::class.java)
                    .with(EventMessage::class.java, isSticky = isSticky)
                    .setValue(event)
            }
    }
    
    private object ApplicationScopeViewModelProvider : ViewModelStoreOwner {
    
        private val eventViewModelStore: ViewModelStore = ViewModelStore()
    
        override fun getViewModelStore(): ViewModelStore {
            return eventViewModelStore
        }
    
        private val mApplicationProvider: ViewModelProvider by lazy {
            ViewModelProvider(
                ApplicationScopeViewModelProvider,
                ViewModelProvider.AndroidViewModelFactory.getInstance(FlowBusInitializer.application)
            )
        }
    
        fun <T : ViewModel> getApplicationScopeViewModel(modelClass: Class<T>): T {
            return mApplicationProvider[modelClass]
        }
    
    }
    
    object FlowBusInitializer {
        lateinit var application: Application
        //在Application中初始化
        fun init(application: Application) {
            FlowBusInitializer.application = application
        }
    }
    
    • 使用
    lifecycleScope.launch(Dispatchers.IO) {
        observeEvent {
            LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
        }
        observeEvent(Dispatchers.IO) {
            LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
        }
    
        observeEvent(Dispatchers.Main) {
            LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
        }
    }
    
    lifecycleScope.launch(Dispatchers.IO) {
        delay(1000)
        postValue(EventMessage(100))
        postValue(EventMessage(101), 1000)
        postValue(EventMessage(102, "bbb"), dispatcher = Dispatchers.IO)
        val event3 = EventMessage(103, "ccc")
        event3.put("key1", 123)
        event3.put("key2", "abc")
        postValue(event3, 2000, dispatcher = Dispatchers.Main)
    }
    

    参考

    我是今阳,如果想要进阶和了解更多的干货,欢迎关注微信公众号 “今阳说” 接收我的最新文章

    相关文章

      网友评论

        本文标题:干掉RxJava系列--2. 手写FlowBus替代RxBus/

        本文链接:https://www.haomeiwen.com/subject/ttdgqrtx.html