美文网首页
Android单向数据流——MvRx核心源码解析

Android单向数据流——MvRx核心源码解析

作者: 珞泽珈群 | 来源:发表于2020-05-07 16:25 被阅读0次

    前言

    背景知识,Android真响应式架构——MvRx

    MvRx是什么?最简单的解释是,Mv(ModelView)和Rx(ReactiveX),MvRx是AAC ModelView和RxJava的结合,但这只是表象,MvRx最核心的价值在于,它是React理念在原生Android开发领域的一种实现,并且这种实现充分利用了AAC和Kotlin的能力,以一种相对简单的方式实现,降低了它的理解和使用门槛,对客户端是比较友好的。那什么是React的理念呢?我认为关键有两点:

    1. View由状态(State)表征
    2. 状态由事件(Event,或者称为动作、意图,不同地方有不同的叫法)单向驱动

    如上图所示,使用State数据渲染View,View产生Event事件,发送给StateStore,StateStore根据旧的State构造出新的State,再传递给View。这种单向数据流的架构在其它地方也被称作MVI(Model-View-Intent)架构。

    那么实现这么一套架构复杂吗?其实还是很复杂的,其中涉及到State的管理,对State的观察,View如何由State表征等等一系列问题。但是,如果借助现有的一些库,整个实现也并不复杂。State用Kotlin data class来表示,从View到StateStore的连接借助ViewModel来实现,State的管理StateStore使用RxJava中的Subject来完成,对State的观察使用LifecycleObserver+RxJava Observer来实现,从State到View的渲染借助Epoxy来完成。这就是MvRx的设计思路,所以MvRx本身的源码并不多,也不复杂。本文主要对MvRx的核心源码StateStore和State Observer进行解析。源码版本com.airbnb.android:mvrx:1.5.1

    1. 太长不看

    每个MvRxViewModel都包含一个MvRxStateStore,通过MvRxViewModel对MvRxStateStore进行的setState,getState的操作会被分别放进两个队列中,并且会给MvRxStateStore中的一个BehaviorSubject发送一个信号,BehaviorSubject收到信号后,会在一个单独的线程从setState,getState的队列中取元素,先从setState队列中取,之后是getState队列(也就是说异步getState获取的State是之前还没执行的setState执行之后的最新的State),直至getState队列都为空;每次从setState队列取一个元素(元素类型State.()->State,一般称之为reducer)后,都会执行这个reducer,完成从旧State到新State的更新,然后通知StateObservable,这样外部就能观察到State的变化了。

    上图出自Unidirectional data flow on Android using Kotlin,他把Event称为Action,这篇文章从概念上解释了什么是Android单向数据流,如果你不关心具体的源码实现,只是想从概念上对单向数据流建立一个图像,那么推荐你看看这篇文章。

    2. MvRxStateStore

    如上所述,StateStore的职责就是对State的管理,主要包括接收外部输入,由旧生新;让外部可以观察State的变化,基本上就这些。MvRx定义了一个接口MvRxStateStore明确了这些职责:

    interface MvRxStateStore<S : Any> : Disposable {
        val state: S //同步方式获取 state
        fun get(block: (S) -> Unit) //异步方式获取 state
        fun set(stateReducer: S.() -> S) // 更新 state,由旧生新
        val observable: Observable<S> //外部观察 state
    }
    

    获取State的方式有两种:同步和异步,获取State并不等同于观察State,一般是为了根据当前State决定下一步的动作,是一种一次性的行为。

    每个MvRxViewModel都必须明确它的State,并且每个MvRxViewModel也都包含一个MvRxStateStore:

    interface MvRxState
    
    abstract class BaseMvRxViewModel<S : MvRxState>(
        initialState: S,
        debugMode: Boolean,
        private val stateStore: MvRxStateStore<S> = RealMvRxStateStore(initialState)
    ) : ViewModel() {
        //通过stateStore同步获取state
        internal val state: S
            get() = stateStore.state
        
        
        protected fun setState(reducer: S.() -> S) {
            if (debugMode) {
                //debug模式下会进行一些验证
                //核心逻辑就是,reducer运行两遍,两遍运行得到的 state要求相等,这基本上保证了reducer是个“纯函数”
            } else {
                stateStore.set(reducer)
            }
        }
    
        
        protected fun withState(block: (state: S) -> Unit) {
            stateStore.get(block)
        }
    }
    

    MvRxState只是一个标记接口,所以State可以是任意类,只要它实现了MvRxState接口。但是,BaseMvRxViewModel包含了对State的验证逻辑,要求State必须是Kotlin data class,并且其所有属性必须是不可变的(immutable),不能在data class中定义var属性,并且不能使用诸如MutableListMutableMap之类的可变集合,这是为了方便MvRxStateStore对State的管理,强制要求改变State必须通过MvRxStateStore由旧生新。
    每个MvRxViewModel都包含一个MvRxStateStore,默认值是RealMvRxStateStore,并且构造时就传入了初始State,初始State对于RealMvRxStateStore而言是很重要的。
    BaseMvRxViewModelMvRxStateStore的使用是很直接的,其实对MvRxStateStore.observable也是很直接的使用,但是observable的观察者比较复杂,我们之后再看。

    3. RealMvRxStateStore

    class RealMvRxStateStore<S : Any>(initialState: S) : MvRxStateStore<S> {
        //State Subject
        private val subject: BehaviorSubject<S> = BehaviorSubject.createDefault(initialState)
    
        //刷新“队列”的信号
        private val flushQueueSubject = BehaviorSubject.create<Unit>()
    
        //“队列”实体,Jobs类的定义之后会看到
        private val jobs = Jobs<S>()
    
        //State Observable
        override val observable: Observable<S> = subject
        //同步获取 state
        override val state: S
            get() = subject.value!! //必然不为null,因为subject创建的时候提供了初始值 initialState
    
        init {
            //在一个单独的线程刷新队列,避免setState的竞争
            flushQueueSubject.observeOn(Schedulers.newThread())
                //在flushQueueSubject收到信号的时候,刷新队列,state由旧生新的核心逻辑就在flushQueues方法中
                .subscribe({ flushQueues() }, ::handleError)
                .registerDisposable()
        }
    
        //异步获取 state
        override fun get(block: (S) -> Unit) {
            //入getState队列,然后发送信号
            jobs.enqueueGetStateBlock(block)
            flushQueueSubject.onNext(Unit)
        }
    
        override fun set(stateReducer: S.() -> S) {
            //入setState队列,然后发送信号
            jobs.enqueueSetStateBlock(stateReducer)
            flushQueueSubject.onNext(Unit)
        }
    }
    

    一个小的背景知识,Subject是RxJava中一种既是Observable又是Observer的类,而BehaviorSubjectSubject几个子类中最“正常”的那一个,把它自己接收到的数据,再发送出去。RealMvRxStateStore中包含两个BehaviorSubject,一个就是我们的State Subject,另一个是信号Subject。当set,get State的时候,会把要执行的内容入队列,然后向信号Subject发送信号,在RealMvRxStateStore的构造函数中就已经注册了信号Subject的观察者——flushQueues(),信号会在一个新的线程中被接收到,接收到信号就会在这个新线程中执行flushQueues(),虽然我们还没有看flushQueues()的具体内容,但想想也知道肯定就是从队列中取内容然后执行,如果是setState会设置“新State”给State Subject,这样新的state就传递了出去,Done。

    来看看“队列”的实体类Jobs:

    class Jobs<S> {
    
        private val getStateQueue = LinkedList<(state: S) -> Unit>()
        private var setStateQueue = LinkedList<S.() -> S>()
    
        @Synchronized
        fun enqueueGetStateBlock(block: (state: S) -> Unit) {
            getStateQueue.add(block)
        }
    
        @Synchronized
        fun enqueueSetStateBlock(block: S.() -> S) {
            setStateQueue.add(block)
        }
    
        @Synchronized
        fun dequeueGetStateBlock(): ((state: S) -> Unit)? {
            //getStateQueue为空,就会返回 null
            return getStateQueue.poll()
        }
    
        //出队所有setStateQueue
        @Synchronized
        fun dequeueAllSetStateBlocks(): List<(S.() -> S)>? {
            if (setStateQueue.isEmpty()) return null
    
            val queue = setStateQueue
            setStateQueue = LinkedList()
            return queue
        }
    }
    

    相当之无聊,就两个LinkedList,进进队出出队。RealMvRxStateStoresetget方法一般都会是在后台线程中执行,对setStateQueue,getStateQueue进出队是会有多线程同步的问题,所以这些方法都加了锁。

    好了,现在一切都准备好了,就剩下flushQueues()的具体实现了:

    class RealMvRxStateStore<S : Any>(initialState: S) : MvRxStateStore<S> {
        //不常用的 tailrec功能,即尾递归优化,会把递归调用改为循环的模式
        private tailrec fun flushQueues() {
            //先出队 setStateQueue并且执行
            flushSetStateQueue()
            //然后再出队 getStateQueue
            val block = jobs.dequeueGetStateBlock() ?: return
            block(state)
            //递归调用,直至getStateQueue为空
            flushQueues()
        }
    
        private fun flushSetStateQueue() {
            //所有setStateQueue全部出队,挨个执行
            val blocks = jobs.dequeueAllSetStateBlocks() ?: return
            for (block in blocks) {
                val newState = state.block()
                // state不同才发送
                if (newState != state) {
                    subject.onNext(newState)
                }
            }
        }
    }
    

    正如我们预期的那样,flushQueues()核心逻辑就是出队然后执行,只不过执行顺序是有讲究的。在MvRxModelView中,我们经常这么写:

    getState { state ->
      if (state.isLoading) return
      //或者是发送一个网络请求,总之最后会调用 setState
      setState { state ->
        state.copy(...)
      }
    }
    

    也就是说,getStateQueue队列中的内容时常包含着入队setStateQueue,那么考虑下面一个问题:

    getStateA {
      setStateA {}
    }
    getStateB {
      setStateB {}
    }
    

    如果是像上面这样的调用顺序,那么我们期望的执行顺序是getStateA->setStateA->getStateB->setStateB,你仔细思考一下flushQueues()的出队顺序,得到的执行顺序正是我们期望的那样。
    最后,调用subject.onNext(newState),通过RealMvRxStateStoreoverride val observable: Observable<S> = subject,新state状态就被传递了出去。注意,flushQueues()是在一个单独的新线程中执行,每个RealMvRxStateStore都会新建一个线程,多个RealMvRxStateStore之间不存在竞争。

    这里补充一个小细节,flushSetStateQueue方法中会依次遍历setStateQueue出队的各个元素,并且对每个元素执行完reducer操作之后,都会把得到的新的State传递出去subject.onNext(newState),其实还有另外一种选择,那就是“状态折叠”,假设setStateQueue出队元素有两个A,B,我们可以A,B依次执行,但是只对外传递最终的newStateB,A作为中间状态就不对外传递了,这样可以提高效率,但是这样会引发一个问题,oldState->newStateA->newStateB,如果只传递最终的状态newStateBnewStateA就会消失了,至于这是否是我们想要的结果,这取决于实际情况,所以“状态折叠”会提高效率,但是可能会有问题,不进行“状态折叠”会些许降低效率,但是总是一个不会出错方案,MvRx也是从最初的“状态折叠”调整为现在的不再折叠。

    3. MvRxLifecycleAwareObserver

    经过RealMvRxStateStore的一番操作,新的State通过Observable被传递了出去,要想观察State的变化就需要一个Observer,同时,这个Observe还需要感知生命周期,所以这个这个观察者应该是RxJava Observer + LifecycleObserver,在MvRx中的实现就是MvRxLifecycleAwareObserver
    有一点需要明确,无论我们以何种方式观察State的变化,观察整个State,还是观察State中的某几个属性,在View中观察,还是在ViewModel中观察,最终都会调用BaseMvRxViewModelsubscribeLifecycle方法:

    abstract class BaseMvRxViewModel<S : MvRxState> {
        
        private fun <T : Any> Observable<T>.subscribeLifecycle(
            lifecycleOwner: LifecycleOwner? = null,
            deliveryMode: DeliveryMode,
            subscriber: (T) -> Unit
        ): Disposable {
            //观察者逻辑在主线程执行
            return observeOn(AndroidSchedulers.mainThread())
                .resolveSubscription(lifecycleOwner, deliveryMode, subscriber)
                .disposeOnClear()
        }
    
        private fun <T : Any> Observable<T>.resolveSubscription(
            lifecycleOwner: LifecycleOwner? = null,
            deliveryMode: DeliveryMode, //忽略这个参数
            subscriber: (T) -> Unit
        ): Disposable = if (lifecycleOwner == null || FORCE_DISABLE_LIFECYCLE_AWARE_OBSERVER) {
            //没有提供生命周期,或者调试状态,直接观察
            this.subscribe(subscriber)
        } else {
            //绑定生命周期,那么观察者会被包装成 MvRxLifecycleAwareObserver
            this.subscribeWith(
                MvRxLifecycleAwareObserver(
                    lifecycleOwner,
                    onNext = Consumer { value ->
                        subscriber(value)
                    }
                )
            )
        }
    }
    

    如果提供的观察者subscriber绑定了生命周期,那么它会被包装成MvRxLifecycleAwareObserver。既然我们提供了生命周期,那么就是想实现类似LiveData那样的数据观察模式,生命周期≥STARTED时,才通知观察者。MvRxLifecycleAwareObserver就是实现这样的逻辑。

    //实现了三个接口 LifecycleObserver, Observer<T>, Disposable,我们忽略Disposable的相关逻辑,主要看LifecycleObserver, Observer
    internal class MvRxLifecycleAwareObserver<T : Any>(
        private var owner: LifecycleOwner?,
        private val activeState: Lifecycle.State = State.STARTED,
        private val deliveryMode: DeliveryMode = RedeliverOnStart,
        private var lastDeliveredValueFromPriorObserver: T?,
        private var sourceObserver: Observer<T>?,
        private val onDispose: () -> Unit
    ) : AtomicReference<Disposable>(), LifecycleObserver, Observer<T>, Disposable {
        
        //次构造函数,就是把sourceObserver一个参数拆分成onNext, onError, onComplete, onSubscribe四个参数
        constructor(
            owner: LifecycleOwner,
            activeState: Lifecycle.State = DEFAULT_ACTIVE_STATE,
            deliveryMode: DeliveryMode = RedeliverOnStart,
            lastDeliveredValue: T? = null,
            onComplete: Action = Functions.EMPTY_ACTION,
            onSubscribe: Consumer<in Disposable> = Functions.emptyConsumer(),
            onError: Consumer<in Throwable> = Functions.ON_ERROR_MISSING,
            onNext: Consumer<T> = Functions.emptyConsumer(),
            onDispose: () -> Unit
        ) : this(owner, activeState, deliveryMode, lastDeliveredValue, LambdaObserver<T>(onNext, onError, onComplete, onSubscribe), onDispose)
    
        //上次未传输的值,未传输是因为 Lifecycle未活跃
        private var lastUndeliveredValue: T? = null
        //上次的值,可能已经传输过了
        private var lastValue: T? = null
        //Lifecycle处于未活跃状态时则上锁
        private val locked = AtomicBoolean(true)
        private val isUnlocked
            get() = !locked.get()
    
        override fun onSubscribe(d: Disposable) {
            if (DisposableHelper.setOnce(this, d)) {
                //开始观察时,也会对生命周期进行观察
                owner!!.lifecycle.addObserver(this)
                sourceObserver!!.onSubscribe(this)
            }
        }
    
        //观察 Lifecycle ON_DESTROY事件
        @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
        fun onDestroy() {
            owner!!.lifecycle.removeObserver(this)
            if (!isDisposed) {
                dispose()
            }
            owner = null
            sourceObserver = null
        }
    
        //观察 Lifecycle任意事件
        @OnLifecycleEvent(Event.ON_ANY)
        fun onLifecycleEvent() {
            updateLock()
        }
    
        private fun updateLock() {
            //每个Lifecycle事件都更新锁,处于活跃状态时解锁,否则加锁
            if (owner?.lifecycle?.currentState?.isAtLeast(activeState) == true) {
                unlock()
            } else {
                lock()
            }
        }
    
        override fun onNext(nextValue: T) {
            if (isUnlocked) {
                sourceObserver!!.onNext(nextValue)
            } else {
                //记录下未传输的值
                lastUndeliveredValue = nextValue
            }
            lastValue = nextValue
        }
    
        override fun onError(e: Throwable) {
            if (!isDisposed) {
                lazySet(DisposableHelper.DISPOSED)
                sourceObserver!!.onError(e)
            }
        }
    
        override fun onComplete() {
            sourceObserver!!.onComplete()
        }
    
        private fun unlock() {
            if (!locked.getAndSet(false)) {
                return
            }
            if (!isDisposed) {
                val valueToDeliverOnUnlock = //根据 deliveryMode确定要传输的值
                lastUndeliveredValue = null
                //只有Lifecycle处于活跃状态,并且没被dispose,才会传输值
                if (valueToDeliverOnUnlock != null) {
                    onNext(valueToDeliverOnUnlock)
                }
            }
        }
    
        private fun lock() {
            locked.set(true)
        }
    }
    

    MvRxLifecycleAwareObserver既是RxJava Observer又是LifecycleObserver,每个Lifecycle的Event事件,都会导致MvRxLifecycleAwareObserver锁标记的更新,Lifecycle处于活跃时locked = false,否则locked = true,只有在Lifecycle处于活跃时才会调用我们定义的观察逻辑sourceObserver,否则只是把值记录下来,待到活跃时再传输。最终sourceObserver接收到的值还和deliveryMode的取值有关,这是个sealed class,只有两种类型:RedeliverOnStartUniqueOnly,顾名思义,在RedeliverOnStart模式下,值会被重新传输给我们的观察者sourceObserver,哪怕这个值之前已经传输过;而UniqueOnly模式下则只会传输还未传输的值。比如手机不断的息屏亮屏,假设这期间State的值没有变化,那么在RedeliverOnStart模式下,每次亮屏时,sourceObserver都会接收到之前的值;UniqueOnly模式下则不会接收到任何值(所以UniqueOnly模式一般适用于一次性的行为,例如Toast,Snackbar,Log等)。简明起见,关于deliveryMode的逻辑,在上述代码中都被删除了。

    4. 总结

    MvRx最核心的逻辑就是MvRxStateStore了,这是实现单向数据流的关键,MvRx利用了RxJava Subject的特性,非常简明地完成了StateStore的功能。State管理的核心逻辑其实就是一个多向输入,单向输出的过程,而Subject的功能正契合这一需求,这使得RxJava Subject成了Android平台实现单向数据流的不二之选——直到Kotlin coroutines的出现。

    关于Android单向数据流/MvRx更多的思考请看Android单向数据流——Side Effect

    相关文章

      网友评论

          本文标题:Android单向数据流——MvRx核心源码解析

          本文链接:https://www.haomeiwen.com/subject/vbpaghtx.html