美文网首页Kotlin-Coroutines
解决Android Flow无法收到更新的数据的问题

解决Android Flow无法收到更新的数据的问题

作者: XFY9326 | 来源:发表于2020-12-18 10:20 被阅读0次

使用场景

最近在做项目的时候遇到了一个需求。从DataStore获取最新的ID,并根据ID到Room数据库内获取相应的内容。由于DataStore和Room都可以通过Flow获取到最新的数据更新,因此我设想是否能通过数据驱动界面的方式,抛开以往的通知界面更新方法,直接对数据源的变动进行监听并设置界面。
需要实现以上的需求就需要对DataStore的ID进行监听的同时,还要对Room数据库内相应ID的数据进行同步监听,保证两个数据源的数据的变化都可以监听到。

Tips: 以下部分大都是问题发现与解决的步骤,符合从认识到实践的二次飞跃过程,因此想要直接看最终可用解决方案的可以直接翻到最后。

初步实现

最简单的设想是在transform方法中进行嵌套实现,代码如下:

// 使用时转换为LiveData后,在Activity中进行数据变化监听
val data = flow1.transform { id ->
    val flow2 = getFlow2(id)
    emitAll(flow2)
}.asLiveData()

实际使用以上代码时却出了大问题,A界面获取数据并初始化视图成功,但是在B界面修改数据后返回A界面后,数据更新并没有被监听到。以上的数据不更新情况还是不确定的,因为如果在B界面修改完数据后等待一段时间再返回A界面,数据更新就能够被监听到了。

初步解决方案

无法监听数据变化的情况让我抓狂了一阵子,最终“修改完数据后等待一段时间能够被监听到”还是在asLiveData方法的源码中找到了原因。源码如下:

// CoroutineLiveData.kt
internal const val DEFAULT_TIMEOUT = 5000L

// FlowLiveData.kt
@JvmOverloads
fun <T> Flow<T>.asLiveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT
): LiveData<T> = liveData(context, timeoutInMs) {
    collect {
        emit(it)
    }
}

好家伙,这里有个DEFAULT_TIMEOUT的超时设定。掐指一算确实只需要在B界面等待5秒后再返回A界面,仅能够成功更新界面了。因此在使用asLiveData(timeoutInMs=0)后问题貌似就解决了。那么此处的超时时间是用来做什么的呢?继续看源码:

// CoroutineLiveData.kt
internal class BlockRunner<T>(
    private val liveData: CoroutineLiveData<T>,
    private val block: Block<T>,
    private val timeoutInMs: Long,
    private val scope: CoroutineScope,
    private val onDone: () -> Unit
) {
    ··· ···
    @MainThread
    fun cancel() {
        ··· ···
        cancellationJob = scope.launch(Dispatchers.Main.immediate) {
            delay(timeoutInMs)
            if (!liveData.hasActiveObservers()) {
                // one last check on active observers to avoid any race condition between starting
                // a running coroutine and cancelation
                runningJob?.cancel()
                runningJob = null
            }
        }
    }
}

首先asLiveData中的调用的Flow的collect方法是在一个lambda函数中的,CoroutineLiveData使用BlockRunner作为collect方法所在的lambda函数的运行器,监听LifeCycle的变化,以启动与取消collect方法。
而超时时间设置的原因,简单理解一下就是,由于担心界面变化太快(例如屏幕旋转)导致BlockRunner重复的启动与取消带来的任何竞争问题,所以此处等待指定时间后再检查是否还有活动的观察器,并最后取消collect的运行。
显然,超时时间并不是造成无法收到更新数据的主要原因,因为设置这个超时时间的出发点以及功能显然是好的。
但是以上源码却给我带来了启示,因为等待时间超过5秒后取消了对于collect方法的执行,后面再重新执行collect方法就可以获取到更新的数据了。

查找根本原因

此时,我突然想起collect方法是一个suspend方法,会挂起当前运行的协程,而对数据变化进行监听的话,collect方法显然会挂起当前运行的协程。会不会就是collect方法带来的协程挂起导致了以上的问题?开始试验:

val flow1 = flow {
    emit("F1-A")
    delay(2000L)
    emit("F1-B")
    delay(2000L)
    emit("F1-C")
    delay(10000000L)
}.transform {
    val flow2 = flow {
        emit("$it-F2")
        delay(10000000L)
    }
    emitAll(flow2)
}
runBlocking {
    launch {
        flow1.collect {
            println("${System.currentTimeMillis()} 1 $it")
        }
    }
    delay(3000L)
    launch {
        flow1.collect {
            println("${System.currentTimeMillis()} 2 $it")
        }
    }
}

delay(2000L)表示数据的更新间隔,delay(10000000L)模拟长时间不关闭的监听,两个独立的collect协程表示监听的先后。最后得到的结果如下:

1608255907350 1 F1-A-F2
1608255910333 2 F1-A-F2
// 后面就卡住了

实锤了collect方法的嵌套导致协程挂起,导致了无法收到更新数据的问题。

寻找解决方案

此时我突然想到了combine方法,同样都是将两个Flow合并在一起,为什么combine方法就没有问题?查找后源码如下:

@PublishedApi
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
    flows: Array<out Flow<T>>,
    arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
    transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
    ··· ···
        // Coroutine per flow that keeps track of its value and sends result to downstream
        launch {
            try {
                flows[i].collect { value ->
                    resultChannel.send(Update(i, value))
                    yield() // Emulate fairness, giving each flow chance to emit
                }
            }
··· ···

恍然大悟,原来combine方法是将每个对collect的监听放到单独的Scope中才能够实现对多个flow的监听与同步更新。
那么这个flowScope以及combineInternal能不能改造后为我所用呢?不行,因为全都是内部方法,所以无法调用。那么只能模仿其写一个了。

fun <T1, T2> Flow<T1>.dependCombine(transform: (T1) -> Flow<T2>) = flow {
    var oldJob: Job? = null
    collect {
        oldJob?.cancel()
        oldJob = GlobalScope.launch {
            emitAll(transform(it))
        }
    }
}

然而,最终却出错了?!!

Exception in thread "DefaultDispatcher-worker-1 @coroutine#3" java.lang.IllegalStateException: Flow invariant is violated:
        Emission from another coroutine is detected.
        Child of "coroutine#3":StandaloneCoroutine{Active}@5bbc3ad9, expected child of "coroutine#2":StandaloneCoroutine{Active}@41f88e7d.
        FlowCollector is not thread-safe and concurrent emissions are prohibited.
        To mitigate this restriction please use 'channelFlow' builder instead of 'flow'

好吧,flow中不能够运行其他的协程,所以只能使用官方建议的channelFlow了。

最终解决方案

直接放代码:

fun <T1, T2> Flow<T1>.combine(transform: suspend (T1) -> Flow<T2>): Flow<T2> =
    combineTransform(transform) { _, t2 ->
        t2
    }

fun <T1, T2, R> Flow<T1>.combineTransform(combineTransform: suspend (T1) -> Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
    channelFlow {
        var oldJob: Job? = null
        collect { t1 ->
            oldJob?.cancel()
            oldJob = launch {
                combineTransform(t1).collect { t2 ->
                    send(transform(t1, t2))
                }
            }
        }
        awaitClose {
            oldJob?.cancel()
        }
    }

通过channelFlow即可正确的解决问题了
以上只是为本人的解决方案,并不代表只能这么解决,如果有更好的方法望不吝赐教。

One more thing

对于一个需要在多处被collect的Flow,与shareIn方法一起使用效果更佳。


Made By XFY9326

相关文章

网友评论

    本文标题:解决Android Flow无法收到更新的数据的问题

    本文链接:https://www.haomeiwen.com/subject/iefxnktx.html