美文网首页
花式Kotlin协程用法 - Paging 3源码进阶

花式Kotlin协程用法 - Paging 3源码进阶

作者: 九心_ | 来源:发表于2021-01-18 08:06 被阅读0次

    技术不止,文章有料,加 JiuXinDev 入群,Android 搬砖路上不孤单

    前言

    在上一篇文章中,我和大家简单交流了一下关于 Paging 3 的用法,有同学就立即表示,Paging 3 有啥好学的,不就是一个分页库吗?

    其实不然,Paging 3 看上去是一个分页的库,但实际你看到的只是表象,除去分页这个外衣,Paging 可以帮助我们管控数据的生命周期,从获取,到展示,提供了一套自动化啊和标准化的管控流程,加上Android Jetpack 中其他组件的加持:

    • LiveData 可以为我们解决组件生命周期异常带来的困扰
    • 比如 Room 提供的 PagingSource 在数据库新增数据的时候会立刻在UI中刷新

    别问,问就一个字,香~

    众所周知,研究源码最好的方法,是带着问题去研究源码。

    所以,我们从 Paging 3 入手,褪去分页这层外衣,看看它是如何处理了:

    1. 分页的整个流程是什么样的?
    2. PagingSource 和 RemoteMediator 是如何结合的?
    3. 如何实现它的状态管理?

    目录

    目录

    一、从使用角度看结构

    关于使用的代码我们就不过多介绍了,感兴趣的可以阅读 《即学即用Android Jetpack - Paging 3》

    从之前的文章,我们了解到如下几个重要的类:

    说明
    PagingSource Paing 的数据源。
    RemoteMediator 如果你既有本地的数据源,又有远程的数据源,这个时候你就可以使用 PagingSource 充当本地的数据源,RemoteMediator充当远程的数据源。
    Pager 获取数据的入口,它可以向外提供一个 Flow,它是一个响应式的数据源,数据的接收方可以获取到 PagingData
    PagingData 这个我们不会接触到,但是有必要了解。官方注释说Container for Paged data from a single generation of loadssingle generation 我理解就是数据源没有发生变化的时候就是一代,也就是没有发生增删改这种情况。
    RecyclerView 我们的老朋友,不用介绍
    PagingDataAdapter Paging 3 中为 RecyclerView 量身定做的适配器

    从这几个类中,先建立一个大概的结构图:

    Paging 3使用结构

    注意一下,这个 PagingAdapter 连接 Flow<PagingData> 这个过程,我前面说过,PagingData 提供的是一代数据,你可以理解它为一份数据快照,当数据源发生变化的时候,它才会生成一个新的 PagingSource

    阅读此篇文章之前,你应该具有一定的协程知识,建议阅读我的文章:《即学即用Kotlin - 协程》

    二、分析准备

    在讲下文之前,我觉得有必要和大家探讨一下 Paging 3 中的状态管理机制和事件消费机制。

    1 状态管理和事件管理

    1.1 状态管理

    状态管理的场景:

    data class LoadStates(
        /** [LoadState] corresponding to [LoadType.REFRESH] loads. */
        val refresh: LoadState,
        /** [LoadState] corresponding to [LoadType.PREPEND] loads. */
        val prepend: LoadState,
        /** [LoadState] corresponding to [LoadType.APPEND] loads. */
        val append: LoadState
    )
    

    可以看出状态管理场景分为三种:

    1. refresh:刷新状态,顾名思义,就是 Pager 初始化数据时候的状态。
    2. append:向后加载的状态,常见于加载更多的场景。
    3. prepend:向前加载的状态,常见于数据从指定位置开始,但是这个位置之前的数据并没有加载,所以需要加载之前的数据。

    有了这个三个场景,它会有哪些状态呢?状态对应的类为 LoadState, 它为我们提供三种状态:

    1. NotLoading:未加载,并且未加载分为加载已完成和加载未完成,由成员变量 endOfPaginationReached 控制。
    2. Loading:加载中
    3. Error:错误

    有了这些状态,我们就可以做更多的事情,比如和UI交互,比如,管理请求事件。

    1.2 事件管理

    除了状态管理,我们还需要事件管理,比如,数据回来了,我需用通知一个 Insert 事件,并包含状态的变化,所以,事件管理,其实是包含了状态管理的。

    事件管理也分为三种:

    internal sealed class PageEvent<T : Any> {
        // Intentional to prefer Refresh, Prepend, Append constructors from Companion.
        @Suppress("DataClassPrivateConstructor")
        data class Insert<T : Any> private constructor(
            val loadType: LoadType,
            val pages: List<TransformablePage<T>>,
            val placeholdersBefore: Int,
            val placeholdersAfter: Int,
            val combinedLoadStates: CombinedLoadStates
        ) : PageEvent<T>() {
            // ...
        }
    
        data class Drop<T : Any>(
            val loadType: LoadType,
            val minPageOffset: Int,
            val maxPageOffset: Int,
            val placeholdersRemaining: Int
        ) : PageEvent<T>() {
            // ...
        }
    
        data class LoadStateUpdate<T : Any>(
            val loadType: LoadType,
            val fromMediator: Boolean,
            val loadState: LoadState // TODO: consider using full state object here
        ) : PageEvent<T>() {
            //...
        }
    }
    

    具体是:

    1. Insert:插入事件,包括具体的数据 pages、加载类型(加载类型对应的是Refresh\Append\Prepend)loadType和组合状态 combinedLoadStates(包含Refresh\Append\Prepend加载状态)。
    2. Drop:删除事件
    3. LoadStateUpdate:加载状态变化的事件。

    三、数据的产生

    有了上面的基础,我们开始讲解整个流程。

    入口是 PagingAdapterPager 建立关系的时候:

    lifecycleScope.launch {
        // 注意:这里需要用 collectLatest,只用 collect 的话筛选会不生效
        viewModel.shoes.collectLatest {
            adapter.submitData(it)
        }
    }
    

    我们先讨论一下数据如何产生。

    上面的 viewModel.shoes 就是一个 Pager 提供的 Flow:

    // 使用的时候构建Flow的代码
    Pager(config = PagingConfig(
        pageSize = 20,
        enablePlaceholders = false,
        initialLoadSize = 20
    ), pagingSourceFactory = {
        brand?.let { shoeRepository.getShoesByBrandPagingSource(it) }
            ?: shoeRepository.getAllShoesPagingSource()
    }).flow 
    
    class Pager<Key : Any, Value : Any>
    @JvmOverloads constructor(
        config: PagingConfig,
        initialKey: Key? = null,
        @OptIn(ExperimentalPagingApi::class)
        remoteMediator: RemoteMediator<Key, Value>? = null,
        pagingSourceFactory: () -> PagingSource<Key, Value>
    ) {
        /**
         * A cold [Flow] of [PagingData], which emits new instances of [PagingData] once they become
         * invalidated by [PagingSource.invalidate] or calls to [AsyncPagingDataDiffer.refresh] or
         * [PagingDataAdapter.refresh].
         */
        val flow: Flow<PagingData<Value>> = PageFetcher(
            pagingSourceFactory,
            initialKey,
            config,
            remoteMediator
        ).flow
    }
    

    第一段是我们使用 Paging 3 的代码,不再多讲,列出来只是希望你知道,它有这个过程。

    第二段是 Pager 的源码,它其实只是一个壳,在构造函数中,最后一个参数是一个闭包,它会返回一个 PagingSource。除此以外,Pager 中还提供了一个 类型是 Flow<PagingData<Value>>flow ,它来自 PageFetcher 中的 flow

    1. PageFetcher

    PagerFlow 部分如下:

    internal class PageFetcher<Key : Any, Value : Any>(
        // ... 构造参数省略
    ) {
        // 用来控制刷新的 Channel
        private val refreshChannel = ConflatedBroadcastChannel<Boolean>()
        // 失败重试的Channel
        private val retryChannel = ConflatedBroadcastChannel<Unit>()
    
        // The object built by paging builder can maintain the scope so that on rotation we don't stop
        // the paging.
        val flow: Flow<PagingData<Value>> = channelFlow {
            // 1. 构建 RemoteMediatorAccessor
            val remoteMediatorAccessor = remoteMediator?.let {
                RemoteMediatorAccessor(this, it)
            }
            // 2. 将refreshChannel转成为Flow
            refreshChannel.asFlow()
                .onStart {
                    // 3. collect之前触发的操作
                    @OptIn(ExperimentalPagingApi::class)
                    emit(remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH)
                }
                .scan(null) {
                    // 4. 计算新结果前,可以处理一下老的结果
                    // ...
                }
                .filterNotNull()
                .mapLatest { generation ->
                    // 5. 只处理最新的值,构建PagingData
                    // ...
                }
                .collect { send(it) }
        }
        // ... 
    }
    

    鉴于比较长,省略了很多代码,具体方法处再放上代码。

    1.1 两个属性

    先看两个属性 refreshChannelretryChannel,他们两个其实就是用来发送信号的,refreshChannel 比较重要,发送刷新信号,retryChannel 用来发送重新请求的信号。

    1.2 外层

    返回的 Flow 用了一层 channelFlow 来包裹,使用 channelFlow 要么是为了在多个协程中传送数据,要么是数据数量具有不确定性,我们看看后面是不是这样的。

    1.3 构建远程数据源相关

    构建一个 remoteMediatorAccessor,它包裹了远程数据源的 RemoteMediator

    后面我们会以 Flow 的每个扩展方法为一部分,涉及到具体的扩展方法我们不会讲它的原理,只会讲它的作用,感兴趣的同学可以自己看一下它的实现。

    1.4 创建Flow

    refreshChannel 转化为 Flow,然后调用了 Flow#onStart 方法,这个 onStart 方法会在 Flow 进行 collect 操作之前调用。这个方法做了什么呢?只有一行代码:

    remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH
    

    验证 remoteMediatorAccessor 的初始化行为,之前我们提过,remoteMediatorAccessorRemoteMediator 的壳,这次我们进代码看看:

    // 第一处的调用点
    val remoteMediatorAccessor = remoteMediator?.let {
        RemoteMediatorAccessor(this, it)
    }
    
    // RemoteMediatorAccessor 方法
    internal fun <Key : Any, Value : Any> RemoteMediatorAccessor(
        scope: CoroutineScope,
        delegate: RemoteMediator<Key, Value>
    ): RemoteMediatorAccessor<Key, Value> = RemoteMediatorAccessImpl(scope, delegate)
    
    private class RemoteMediatorAccessImpl<Key : Any, Value : Any>(
        private val scope: CoroutineScope,
        private val remoteMediator: RemoteMediator<Key, Value>
    ) : RemoteMediatorAccessor<Key, Value> {
        // ...
    
        override suspend fun initialize(): RemoteMediator.InitializeAction {
            return remoteMediator.initialize().also { action ->
                if (action == RemoteMediator.InitializeAction.LAUNCH_INITIAL_REFRESH) {
                    // 如果当前在collect之前,RemoteMediator有默认的初始化行为,设置状态
                    accessorState.use {
                        it.setBlockState(LoadType.APPEND, REQUIRES_REFRESH)
                        it.setBlockState(LoadType.PREPEND, REQUIRES_REFRESH)
                    }
                }
            }
        }
    
        // ...
    }
    

    从我列出的代码来看,RemoteMediatorAccessor 包裹了 RemoteMediator,并且 RemoteMediatorAccessImpl#initialize 也调用了 RemoteMediator#initialize 方法,该方法会返回一个枚举 InitializeAction,这个枚举有两种:

    1. LAUNCH_INITIAL_REFRESH:在初始化的时候,会发射一个刷新的信号
    2. SKIP_INITIAL_REFRESH:初始化的时候不发射刷新信号,等待UI请求的时候发送

    再回到 PageFetcher 中的 flow,可以看到,在回到onStart 方法中,它会有两种情况:

    1. 如果你有 RemoteMediator,默认情况下它会发射 true
    2. 没有 RemoteMediator 或者初始化默认不请求远程的数据源,发射 false

    我们其实可以理解为它要不要在初始化的时候刷新远程的数据源。

    1.5 Scan

    进行 Flow#scan 方法,这个方法的作用就是每一次上流发射新的信号的时候,你可以获取新的信号计算新的结果,在此之前,你还可拿到老的结果,方便处理老的结果。

    从这个方法的参数你就可以看出来:

    • previousGeneration:上一次计算得出来的结果
    • triggerRemoteRefresh:上面提到的 onStart 方法发射出来的值,或者是别处调用 refreshChannel 发射的信号,是否触发刷新远程的数据源。
    internal class PageFetcher<Key : Any, Value : Any>(
        private val pagingSourceFactory: () -> PagingSource<Key, Value>,
        private val initialKey: Key?,
        private val config: PagingConfig,
        @OptIn(ExperimentalPagingApi::class)
        private val remoteMediator: RemoteMediator<Key, Value>? = null
    ) {
        // ... 
    
        // The object built by paging builder can maintain the scope so that on rotation we don't stop
        // the paging.
        val flow: Flow<PagingData<Value>> = channelFlow {
            val remoteMediatorAccessor = remoteMediator?.let {
                RemoteMediatorAccessor(this, it)
            }
            refreshChannel.asFlow()
                .onStart {
                    @OptIn(ExperimentalPagingApi::class)
                    emit(remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH)
                }
                .scan(null) {
                    previousGeneration: PageFetcherSnapshot<Key, Value>?, triggerRemoteRefresh ->
                    // 1. 产生新的数据源
                    var pagingSource = generateNewPagingSource(previousGeneration?.pagingSource)
                    while (pagingSource.invalid) {
                        pagingSource = generateNewPagingSource(previousGeneration?.pagingSource)
                    }
    
                    @OptIn(ExperimentalPagingApi::class)
                    val initialKey: Key? = previousGeneration?.refreshKeyInfo()
                        ?.let { pagingSource.getRefreshKey(it) }
                        ?: initialKey
                    // 2. 释放旧的数据源
                    previousGeneration?.close()
                    // 3. 生成新的 PageFetcherSnapshot
                    PageFetcherSnapshot<Key, Value>(
                        initialKey = initialKey,
                        pagingSource = pagingSource,
                        config = config,
                        retryFlow = retryChannel.asFlow(),
                        // Only trigger remote refresh on refresh signals that do not originate from
                        // initialization or PagingSource invalidation.
                        triggerRemoteRefresh = triggerRemoteRefresh,
                        remoteMediatorConnection = remoteMediatorAccessor,
                        invalidate = this@PageFetcher::refresh
                    )
                }
                .filterNotNull()
                .mapLatest { generation ->
                    // ...
                }
                .collect { send(it) }
        }
        //...
    }
    

    这个方法干了三件事:

    1. 生成一个新的数据源 PageSourcePageFetcher#generateNewPagingSource 这个方法调用了 PageFetcher 构造函数中的 pagingSourceFactory 创建了一个新的数据源,并做了一些监听处理。
    2. 释放旧的数据源。
    3. 返回一个新的 PageFetcherSnapshot 对象,它是数据快照的持有类。

    1.6 过滤空值

    Flow#filterNotNull方法过滤发射过来空的值。

    1.7 处理最新值

    Flow#mapLatest 只处理最新的值,当这个方法正在工作的时候,上游发了一个新的值过来,这时,它会停止手上的工作,处理新的值。

    internal class PageFetcher<Key : Any, Value : Any>(
        private val pagingSourceFactory: () -> PagingSource<Key, Value>,
        private val initialKey: Key?,
        private val config: PagingConfig,
        @OptIn(ExperimentalPagingApi::class)
        private val remoteMediator: RemoteMediator<Key, Value>? = null
    ) {
        // ... 
    
        // The object built by paging builder can maintain the scope so that on rotation we don't stop
        // the paging.
        val flow: Flow<PagingData<Value>> = channelFlow {
            val remoteMediatorAccessor = remoteMediator?.let {
                RemoteMediatorAccessor(this, it)
            }
            refreshChannel.asFlow()
                .onStart {
                    @OptIn(ExperimentalPagingApi::class)
                    emit(remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH)
                }
                .scan(null) {
                    // ...
                }
                .filterNotNull()
                .mapLatest { generation ->
                    val downstreamFlow = if (remoteMediatorAccessor == null) {
                        generation.pageEventFlow
                    } else {
                        generation.injectRemoteEvents(remoteMediatorAccessor)
                    }
                    PagingData(
                        flow = downstreamFlow,
                        receiver = PagerUiReceiver(generation, retryChannel)
                    )
                }
                .collect { send(it) }
        }
        //...
    }
    

    Flow#mapLatest 方法中,它做了两件事:

    1. 得到一个事件流 pageEventFlow
    2. 讲这个事件流封装成 PagingData
    1.8 发送PagingData

    将上面得到的 PagingData 发送出去,最终被 PagingDataAdapter 消费,回到我们一开始写的代码:

    // viewModel.shoes 就是 Flow<PagingData<T>>
    viewModel.shoes.collectLatest {
        adapter.submitData(it)
    }
    

    总结一下,虽然上面的过程很多,其实目的就是:

    1. 得到 PagingData,而 PagingData 中最重要的就是事件流 Flow<PageEvent<T>>,它来自 PageFetcherSnapshot
    2. 根据代码是否启用 RemoteMediator

    2 PagingData

    从 1 中,我们了解到事件流 Flow<PageEvent<T>>,它来自 PageFetcherSnapshot,这是跟数据相关的核心代码。

    好家伙,又是一大段代码,最重要的是 pageEventFlow

    internal class PageFetcherSnapshot<Key : Any, Value : Any>(
        internal val initialKey: Key?,
        internal val pagingSource: PagingSource<Key, Value>,
        private val config: PagingConfig,
        private val retryFlow: Flow<Unit>,
        private val triggerRemoteRefresh: Boolean = false,
        val remoteMediatorConnection: RemoteMediatorConnection<Key, Value>? = null,
        private val invalidate: () -> Unit = {}
    ) {
        // ...
        @OptIn(ExperimentalCoroutinesApi::class)
        private val pageEventChCollected = AtomicBoolean(false)
        private val pageEventCh = Channel<PageEvent<Value>>(Channel.BUFFERED)
        private val stateLock = Mutex()
        private val state = PageFetcherSnapshotState<Key, Value>(
            config = config
        )
        private val pageEventChannelFlowJob = Job()
    
        @OptIn(ExperimentalCoroutinesApi::class)
        val pageEventFlow: Flow<PageEvent<Value>> = cancelableChannelFlow(pageEventChannelFlowJob) {
            // 1. 建立一个协程pageEventCh收到的事件发送出去
            launch {
                pageEventCh.consumeAsFlow().collect {
                    // Protect against races where a subsequent call to submitData invoked close(),
                    // but a pageEvent arrives after closing causing ClosedSendChannelException.
                    try {
                        send(it)
                    } catch (e: ClosedSendChannelException) {
                        // Safe to drop PageEvent here, since collection has been cancelled.
                    }
                }
            }
    
            // 2. 接受重试的信息,是不是为了缓存
            val retryChannel = Channel<Unit>(Channel.RENDEZVOUS)
            launch { retryFlow.collect { retryChannel.offer(it) } }
    
            // 3. 重试的动作
            launch {
                retryChannel.consumeAsFlow()
                    .collect {
                        // 重试后处理对应的状态
                        // ...
                    }
            }
    
            // 4. 如果刷新的时候需要远程更新,就让remoteMediator加载数据
            if (triggerRemoteRefresh) {
                remoteMediatorConnection?.let {
                    val pagingState = stateLock.withLock { state.currentPagingState(null) }
                    it.requestLoad(LoadType.REFRESH, pagingState)
                }
            }
    
            // 5. PageSource初始化数据
            doInitialLoad(state)
    
            // 6. 消费hint
            if (stateLock.withLock { state.sourceLoadStates.get(LoadType.REFRESH) } !is LoadState.Error) {
                startConsumingHints()
            }
        }
    
        // ...
    }
    

    pageEventFlow 又被分为了6个部分,我们着重去了解1、4、5和6。

    2.1 发射PageEvent<Value>

    创建了一个协程,用来转发 pageEventCh 接收到的 PageEvent<Value>

    2.2 请求远程数据源

    如果创建了远程的数据源,并且需要在初始化的时候加载远程的数据,开始请求远程的数据,

    2.3 PagingSource初始化

    这里面发生了 PagingSource 的第一次数据初始化,来看看发生了什么?

    internal class PageFetcherSnapshot<Key : Any, Value : Any>(
        // ...
    ) {
        // ...
    
        private suspend fun doInitialLoad(
            state: PageFetcherSnapshotState<Key, Value>
        ) {
            // 1. 设置当前加载的状态 - 刷新
            stateLock.withLock { state.setLoading(LoadType.REFRESH) }
    
            // 构建参数
            val params = loadParams(LoadType.REFRESH, initialKey)
            // 2. 数据加载,得到结果 result
            when (val result = pagingSource.load(params)) {
                is PagingSource.LoadResult.Page<Key, Value> -> {
                    // 3. 处理一下得到的结果 pages
                    val insertApplied = stateLock.withLock { state.insert(0, LoadType.REFRESH, result) }
    
                    // 4. 处理一下各种状态
                    stateLock.withLock {
                        state.setSourceLoadState(LoadType.REFRESH, LoadState.NotLoading.Incomplete)
                        if (result.prevKey == null) {
                            state.setSourceLoadState(
                                type = PREPEND,
                                newState = when (remoteMediatorConnection) {
                                    null -> LoadState.NotLoading.Complete
                                    else -> LoadState.NotLoading.Incomplete
                                }
                            )
                        }
                        if (result.nextKey == null) {
                            state.setSourceLoadState(
                                type = APPEND,
                                newState = when (remoteMediatorConnection) {
                                    null -> LoadState.NotLoading.Complete
                                    else -> LoadState.NotLoading.Incomplete
                                }
                            )
                        }
                    }
    
                    // 5. 发送PageEvent
                    if (insertApplied) {
                        stateLock.withLock {
                            with(state) {
                                pageEventCh.send(result.toPageEvent(LoadType.REFRESH))
                            }
                        }
                    }
    
                    // 6. 是否有必要发生远程数据的请求
                    if (remoteMediatorConnection != null) {
                        if (result.prevKey == null || result.nextKey == null) {
                            val pagingState =
                                stateLock.withLock { state.currentPagingState(lastHint) }
    
                            if (result.prevKey == null) {
                                remoteMediatorConnection.requestLoad(PREPEND, pagingState)
                            }
    
                            if (result.nextKey == null) {
                                remoteMediatorConnection.requestLoad(APPEND, pagingState)
                            }
                        }
                    }
                }
                is PagingSource.LoadResult.Error -> stateLock.withLock {
                    // 错误状态的请求
                    val loadState = LoadState.Error(result.throwable)
                    if (state.setSourceLoadState(LoadType.REFRESH, loadState)) {
                        pageEventCh.send(PageEvent.LoadStateUpdate(LoadType.REFRESH, false, loadState))
                    }
                }
            }
        }
    }
    

    从数据第一次初始化的时候,可以看到很多东西:

    1. 数据加载状态的变化:Refresh 场景 Incomplete -- Loading - 根据返回的结果设置 CompleteIncomplete,并且一些状态都会通过第一部分的 pageEventCh 发送状态更新事件。
    2. Refresh 场景设置 Loading 状态以后,会构建加载的参数,放到 pageSource 进行数据请求,终于见到 pagingSource 了。
    3. 因为 pagingSource.load(params) 可能得到两种结果,如果是错误就直接处理错误。
    4. 如果是正常的结果,会先处理一下结果。再变更一下状态,之后统一发射一个 Insert 事件。
    5. 因为有的时候 pageSource 没有获取到结果 ,又设置了 remoteMediator,这个时候就需要再使用 remoteMediator进行下一步的数据请求

    这个时候可以回答一开始第二个问题:

    pageSource 获取不到结果的时候,如果存在 remoteMediator,会使用 remoteMediator 进行数据请求。

    2.4 如何加载更多的数据

    如果一开始刷新没有出现纰漏即最开始的刷新没有出现错误,这里会调用下一步 startConsumingHints方法:

    internal class PageFetcherSnapshot<Key : Any, Value : Any>(
        // ...
    ) {
        // ...
        private val state = PageFetcherSnapshotState<Key, Value>(
            config = config
        )
    
        @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
        private fun CoroutineScope.startConsumingHints() {
            // ...
    
            // 监听Prepend消息
            launch {
                state.consumePrependGenerationIdAsFlow()
                    .collectAsGenerationalViewportHints(PREPEND)
            }
    
            // 监听Append消息
            launch {
                state.consumeAppendGenerationIdAsFlow()
                    .collectAsGenerationalViewportHints(APPEND)
            }
        }
    
        private suspend fun Flow<Int>.collectAsGenerationalViewportHints(
            loadType: LoadType
        ) = flatMapLatest { generationId ->
            // 处理状态
            stateLock.withLock {
                // ...
            }
    
            @OptIn(FlowPreview::class)
            hintChannel.asFlow()
                .drop(if (generationId == 0) 0 else 1)
                .map { hint -> GenerationalViewportHint(generationId, hint) }
        }
            .runningReduce { previous, next ->
                if (next.shouldPrioritizeOver(previous, loadType)) next else previous
            }
            .conflate()
            .collect { generationalHint ->
                // doLoad 方法跟 doInitialLoad 类似
                // 1. 变更状态
                // 2. 使用PageSource进行请求
                // 3. 获取结果发送 Insert 事件
                // 4. 根据需要是否使用RemoteMediator
                doLoad(state, loadType, generationalHint)
            }
    }
    

    最后一部分的 doLoad 跟第三部分的 doInitialLoad 类似,当然也有一些不一样,比如加载数据的时候,它会判断当前的数据位置距已加载数据的末尾或者头部是否小于一个阈值(创建PagerConfig 时设置),这个条件成立的时候才会加载更多。

    看到这里,你可能有点晕,没关系,我们用一张图来总结我们之前讲解的部分:

    数据生产流程

    看了这部分,似乎还剩下一些问题没搞清:

    1. 除了初始化的那部分数据加载外,Ui是如何驱动加载更多的数据的?

    四、 数据的消费

    1 具体的消费行为

    从上游返回的数据中,我们得到了 PagingData<T>,来看看适配器 PagingDataAdapter 是如何处理这些数据的:

    abstract class PagingDataAdapter<T : Any, VH : RecyclerView.ViewHolder> @JvmOverloads constructor(
        diffCallback: DiffUtil.ItemCallback<T>,
        mainDispatcher: CoroutineDispatcher = Dispatchers.Main,
        workerDispatcher: CoroutineDispatcher = Dispatchers.Default
    ) : RecyclerView.Adapter<VH>() {
        private val differ = AsyncPagingDataDiffer(
            diffCallback = diffCallback,
            updateCallback = AdapterListUpdateCallback(this),
            mainDispatcher = mainDispatcher,
            workerDispatcher = workerDispatcher
        )
    
        // ...
    
        suspend fun submitData(pagingData: PagingData<T>) {
            differ.submitData(pagingData)
        }
    
        // ...
    }
    

    PagingDataAdapter 也没有亲自处理,而是把它交给了 AsyncPagingDataDiffer,就像 PagingDataAdapter 注释中所说的,PagingDataAdapter 只是 AsyncPagingDataDiffer 的壳,同样,AsyncPagingDataDiffer又把这个烫手的山芋交给了 PagingDataDiffer

    abstract class PagingDataDiffer<T : Any>(
        private val differCallback: DifferCallback,
        private val mainDispatcher: CoroutineDispatcher = Dispatchers.Main
    ) {
        // ...
    
        suspend fun collectFrom(pagingData: PagingData<T>) = collectFromRunner.runInIsolation {
            // 1. 给当前的receiver赋值
            receiver = pagingData.receiver
    
            pagingData.flow.collect { event ->
                withContext<Unit>(mainDispatcher) {
                    // 切换到主线程
                    if (event is PageEvent.Insert && event.loadType == LoadType.REFRESH) {
                        // 1. 当前是插入事件并且当前是刷新的场景
                        lastAccessedIndexUnfulfilled = false
    
                        // 2. PagePresenter负责管理本地数据,生成一个新的PagePresenter
                        val newPresenter = PagePresenter(event)
                        // 3. 重新计算加载数据的位置
                        val transformedLastAccessedIndex = presentNewList(
                            previousList = presenter,
                            newList = newPresenter,
                            newCombinedLoadStates = event.combinedLoadStates,
                            lastAccessedIndex = lastAccessedIndex
                        )
                        presenter = newPresenter
    
                        // Dispatch LoadState + DataRefresh updates as soon as we are done diffing,
                        // but after setting presenter.
                        dataRefreshedListeners.forEach { listener ->
                            listener(event.pages.all { page -> page.data.isEmpty() })
                        }
                        // 4. 通知状态的变化
                        dispatchLoadStates(event.combinedLoadStates)
    
                        // 5. 如果数据加载位置发生变化了,则使用receiver发送通知
                        transformedLastAccessedIndex?.let { newIndex ->
                            lastAccessedIndex = newIndex
                            receiver?.accessHint(
                                newPresenter.viewportHintForPresenterIndex(newIndex)
                            )
                        }
                    } else {
                        // ...
                    }
                }
            }
        }
    }
    

    我们以 Refresh 为例,简单讲解一下如何消费数据:

    1.1 缓存数据,并通知UI更新

    Refresh 的情况下,会先建一个数据管理器,这里对应的是 PagePresenter

    接着就是通知数据刷新,这里的 presentNewList 方法是交给子类去实现的:

    private val differBase = object : PagingDataDiffer<T>(differCallback, mainDispatcher) {
        override suspend fun presentNewList(
            previousList: NullPaddedList<T>,
            newList: NullPaddedList<T>,
            newCombinedLoadStates: CombinedLoadStates,
            lastAccessedIndex: Int
        ) = when {
            // fast path for no items -> some items
            previousList.size == 0 -> {
                // 第一次刷新的通知新的数据插入
                differCallback.onInserted(0, newList.size)
                null
            }
            // fast path for some items -> no items
            newList.size == 0 -> {
                differCallback.onRemoved(0, previousList.size)
                null
            }
            else -> {
                val diffResult = withContext(workerDispatcher) {
                    previousList.computeDiff(newList, diffCallback)
                }
                previousList.dispatchDiff(updateCallback, newList, diffResult)
                previousList.transformAnchorIndex(
                    diffResult = diffResult,
                    newList = newList,
                    oldPosition = lastAccessedIndex
                )
            }
        }
    

    第一次刷新如果有数据回来就是第一种情况,直接使用 differCallback 去通知有数据新增了,当然,这些都会通知到我们的适配器 PagingAdapter 去调用对应的方法。

    这里你可能会有一点疑问,适配器 PagingAdapter 并不持有任何数据,那它怎么获取到数据呢?

    其实 PagingAdapter 复写了 getItem 方法,去除层层嵌套,最后也使用了 PagingDataDiffer

    abstract class PagingDataDiffer<T : Any>(
        private val differCallback: DifferCallback,
        private val mainDispatcher: CoroutineDispatcher = Dispatchers.Main
    ) {
        
        // ... 
        operator fun get(@IntRange(from = 0) index: Int): T? {
            lastAccessedIndexUnfulfilled = true
            lastAccessedIndex = index
    
            receiver?.accessHint(presenter.viewportHintForPresenterIndex(index))
            return presenter.get(index)
        }
    }
    

    所以 getItem 方法还是通过数据管理者 PagePresenter 实现的,除此以外,每次获取数据的时候,都会调用 UiReceiver#accessHint 的方法,当你仍有数据需要加载并且当前展示位置距数据末尾小于一定的阈值的时候,这时会触发 doLoad 方法,这会让 Pager 加载更多的数据。

    1.2 通知数据状态更新

    回到 PagingDataDiffer#collect 方法中, 处理完上述的事以后,会更新状态:

    abstract class PagingDataDiffer<T : Any>(
        private val differCallback: DifferCallback,
        private val mainDispatcher: CoroutineDispatcher = Dispatchers.Main
    ) {
        // ...
        private fun dispatchLoadStates(states: CombinedLoadStates) {
            if (combinedLoadStates.snapshot() == states) return
    
            combinedLoadStates.set(states)
            loadStateListeners.forEach { it(states) }
        }
    }
    

    这些状态可以用来干嘛呢?可以处理与用户的状态,比如,刷新错误可以切换错误的界面等等。

    1.3 计算数据加载位置是否发生变化

    如果不是第一次刷新,并且一些数据源发生变化的时候,比如删除或者新增数据,原来的一些位置信息就不准确了,则需要让调用 receiver.accessHint 方法发送通知。

    Refresh 的情况最终所做的事情跟 Refresh 类似,就不再赘述了。

    总结一下消费过程,用一张图描述:

    数据消费

    两张图总结一下,我们开始的第一、三问题就清晰了:

    整个 Paging 3 的流程都是围绕这个 Flow<PageEvent<T>> 的,状态和数据变化了通过它发送,UI则通过它监听到数据,最后通知到数据和状态的观察者。

    总结

    看完 Paging 3 的源码,感受有两点:

    • 第一点:原来协程可以使用的这么出神入化,第一次感觉到,我使用的协程和大佬使用协程不是一个东西,哭了~
    我和大佬
    • 第二点:整个协程中的消息传送都是通过 Channel 中实现的,结合 Flow 来写,确实也比较简洁流畅。

    下一篇文章中,我会和大家讨论我是如何在起点读书的业务中使用 Paging 3 的。

    感谢阅读,如果你有其他的看法,欢迎下方留言讨论,如果觉得本文不错,三连是对作者最大的鼓励~

    相关文章

      网友评论

          本文标题:花式Kotlin协程用法 - Paging 3源码进阶

          本文链接:https://www.haomeiwen.com/subject/ctsbiktx.html