美文网首页
JetPack DataStore 源码解析

JetPack DataStore 源码解析

作者: Knight_Davion | 来源:发表于2021-11-29 14:23 被阅读0次

    DataStore 是一种数据存储解决方案,使用协议缓冲区存储键值对或类型化对象。DataStore 使用 Kotlin 协程和 Flow 以异步、一致的事务方式存储数据。

    如果您当前在使用SharedPreferences 存储数据,请考虑迁移到 DataStore。

    Preferences DataStore 和 Proto DataStore

    • Preferences DataStore 使用键存储和访问数据。此实现不需要预定义的架构,也不确保类型安全。
    • Proto DataStore 将数据作为自定义数据类型的实例进行存储。此实现要求您使用协议缓冲区来定义架构,但可以确保类型安全。

    基本使用

    引入

    implementation("androidx.datastore:datastore:1.0.0")
    implementation("androidx.datastore:datastore-preferences:1.0.0")
    

    Preferences DataStore 的使用

    (1)创建datasource实例

    val dataStore: DataStore<Preferences> by preferencesDataStore(name = "settings")
    

    datasouce会将数据保存在内部存储的以下目录

    [图片上传失败...(image-caad2-1638167023752)]

    其中,settings为上边name中设置的值,后缀名是preferences_pb文件

    注意,在实际的开发中,建议将上述的dataStore设置为单例模式。

    (2)写数据

    datastore 使用edit方法以异步的方式保存数据。

    val EXAMPLE_COUNTER = intPreferencesKey("example_counter")
    
    dataStore.edit { settings ->
        val currentCounterValue = settings[EXAMPLE_COUNTER] ?: 0
        settings[EXAMPLE_COUNTER] = currentCounterValue + 1
    }
    

    其中EXAMPLE_COUNTER 为key值,并且必须是一个Preferences.Key,datastore目前支持以下几种类型的key:

    • intPreferencesKey

    • doublePreferencesKey

    • stringPreferencesKey

    • booleanPreferencesKey

    • floatPreferencesKey

    • longPreferencesKey

    (3)读数据

    datastore以flow的方式观察数据的变化。

    val exampleCounterFlow: Flow<Int> = dataStore.data
        .map { preferences ->
            preferences[EXAMPLE_COUNTER] ?: 0
        }
    
    GlobalScope.launch {
        exampleCounterFlow.collectLatest {
            Log.i(TAG, "read value:$it")
        }
    }
    

    Proto DataStore 的使用

    Proto DataStore 用于保存实例对象,使用之前需要先了解Proto协议及在Android下的基本使用方式,这里不再赘述。
    Proto DataStore 的使用和 Preferences DataStore基本类似,在写入数据时使用updateData提交数据

    dataStore.updateData { settings ->
        settings.toBuilder()
            .setExampleCounter(settings.exampleCounter + 1)
            .setId(currentSettings.id + 1)
            .build()
    }
    

    读取方式和Preferences DataStore一样这里也不再赘述。

    使用方式就先介绍到这里,下面来分析下DataStore的源码

    源码分析

    我们以Preferences DataStore 的读写为例,Proto DataStore的过程类似Preferences DataStore。

    先从datastore的实例化入手

    val dataStore: DataStore<Preferences> by preferencesDataStore(name = "settings")
    

    dataStore初始化以委托的方式调用 preferencesDataStore 函数 返回一个 DataStore<Preferences>实例

    我们来看preferencesDataStore的实现

    public fun preferencesDataStore(
        name: String,
        corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
        produceMigrations: (Context) -> List<DataMigration<Preferences>> = { listOf() },
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    ): ReadOnlyProperty<Context, DataStore<Preferences>> {
        return PreferenceDataStoreSingletonDelegate(name, corruptionHandler, produceMigrations, scope)
    }
    

    preferencesDataStore 函数中,

    • name 上边解释过,就是保存的文件的名字

    • corruptionHandler 中包含了读写失败时的错误。

    • produceMigrations 用于数据迁移,当我们需要从sp中将数据迁移到datastore时需要此参数。

    • scope 指定了线程调度器,默认是在IO线程中。

    preferencesDataStore 函数返回了一个PreferenceDataStoreSingletonDelegate的实例,我们来看PreferenceDataStoreSingletonDelegate的具体实现

    internal class PreferenceDataStoreSingletonDelegate internal constructor(
        private val name: String,
        private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
        private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
        private val scope: CoroutineScope
    ) : ReadOnlyProperty<Context, DataStore<Preferences>> {
    
        private val lock = Any()
    
        @GuardedBy("lock")
        @Volatile
        private var INSTANCE: DataStore<Preferences>? = null
    
        /**
         * Gets the instance of the DataStore.
         *
         * @param thisRef must be an instance of [Context]
         * @param property not used
         */
        override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
            return INSTANCE ?: synchronized(lock) {
                if (INSTANCE == null) {
                    val applicationContext = thisRef.applicationContext
    
                    INSTANCE = PreferenceDataStoreFactory.create(
                        corruptionHandler = corruptionHandler,
                        migrations = produceMigrations(applicationContext),
                        scope = scope
                    ) {
                        applicationContext.preferencesDataStoreFile(name)
                    }
                }
                INSTANCE!!
            }
        }
    }
    

    代码很容易理解,PreferenceDataStoreSingletonDelegate 实现了 ReadOnlyProperty 接口,在ReadOnlyProperty接口中 有一个重载的getValue方法

    public fun interface ReadOnlyProperty<in T, out V> {
        public operator fun getValue(thisRef: T, property: KProperty<*>): V
    }
    

    PreferenceDataStoreSingletonDelegate 中getValue方法返回了一个DataStore<Preferences>的单例对象。

    INSTANCE通过 PreferenceDataStoreFactory.create 函数创建,我们先来看这句

    applicationContext.preferencesDataStoreFile(name)
    

    preferencesDataStoreFile 是context的一个扩展,需要我们前边传入的name参数,猜测这里是设置保存文件路径。

    public fun Context.preferencesDataStoreFile(name: String): File =
        this.dataStoreFile("$name.preferences_pb")
        
    public fun Context.dataStoreFile(fileName: String): File =
        File(applicationContext.filesDir, "datastore/$fileName")
    
    

    preferencesDataStoreFile 进一步调用了dataStoreFile函数,dataStoreFile函数中设置了保存文件的具体路径,

    到这里我们知道了,DataSource是将数据保存在了

    data/data/包名/files/datastore/xxx.preferences_pb
    

    文件中。我们回到PreferenceDataStoreSingletonDelegate类中,继续看 INSTANCE的创建过程。

    PreferenceDataStoreFactory.kt
    
        public fun create(
            corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
            migrations: List<DataMigration<Preferences>> = listOf(),
            scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
            produceFile: () -> File
        ): DataStore<Preferences> {
            val delegate = DataStoreFactory.create(
                serializer = PreferencesSerializer,
                corruptionHandler = corruptionHandler,
                migrations = migrations,
                scope = scope
            ) {
                val file = produceFile()
                check(file.extension == PreferencesSerializer.fileExtension) {
                    "File extension for file: $file does not match required extension for" +
                        " Preferences file: ${PreferencesSerializer.fileExtension}"
                }
                file
            }
            return PreferenceDataStore(delegate)
        }
    }
    

    PreferenceDataStoreFactory的create方法没有太多逻辑,先是继续调用DataStoreFactory.create方法,返回一个DataStore<T>的代理,同时,检查创建的文件名称合法性。最后返回一个PreferenceDataStore实例。

    我们先看DataStoreFactory.create方法

        public fun <T> create(
            serializer: Serializer<T>,
            corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
            migrations: List<DataMigration<T>> = listOf(),
            scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
            produceFile: () -> File
        ): DataStore<T> =
            SingleProcessDataStore(
                produceFile = produceFile,
                serializer = serializer,
                corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
                initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
                scope = scope
            )
    }
    

    我们先看DataStoreFactory 从名称上看像是一个工厂类,其实他只是个单例对象,create方法直接返回了一个
    SingleProcessDataStore的实例。

    SingleProcessDataStore是最后真正的datastore实例化的类DataStore的读写关键逻辑也是在这里实现。

    到这里,datastore的实例化基本分析完毕,下面来看数据的读写过程。

    datastore通过edit方式实现数据的更新

    dataStore.edit { settings ->
        val currentCounterValue = settings[EXAMPLE_COUNTER] ?: 0
        settings[EXAMPLE_COUNTER] = currentCounterValue + 1
    }
    

    在edit函数的实现中,直接调用了datastore的updateData函数,

    
    public suspend fun DataStore<Preferences>.edit(
        transform: suspend (MutablePreferences) -> Unit
    ): Preferences {
        return this.updateData {
            // It's safe to return MutablePreferences since we freeze it in
            // PreferencesDataStore.updateData()
            it.toMutablePreferences().apply { transform(this) }
        }
    }
    

    在上面datastore的实例化时我们知道,datastore对象实际上是一个SingleProcessDataStore的实例化对象,那我们直接看SingleProcessDataStore 中的updateData方法:

    override suspend fun updateData(transform: suspend (t: T) -> T): T {
    
        val ack = CompletableDeferred<T>()
        val currentDownStreamFlowState = downstreamFlow.value
    
        val updateMsg =
            Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
    
        actor.offer(updateMsg)
    
        return ack.await()
    }
    

    updateData 函数利用了协程中的处理并发时的 Actors解决方案,如果你对Actors不了解可以看下这里https://www.kotlincn.net/docs/reference/coroutines/shared-mutable-state-and-concurrency.html ,总之在这里只要记住这是为了处理写同步的操作即可。

    actor.offer(updateMsg) 这句将任务提交到actor中,updateMsg 中携带了数据及状态信息,我们来看actor中实现

    private val actor = SimpleActor<Message<T>>(
        scope = scope,
        onComplete = {
            it?.let {
                downstreamFlow.value = Final(it)
            }
            // We expect it to always be non-null but we will leave the alternative as a no-op
            // just in case.
    
            synchronized(activeFilesLock) {
                activeFiles.remove(file.absolutePath)
            }
        },
        onUndeliveredElement = { msg, ex ->
            if (msg is Message.Update) {
                // TODO(rohitsat): should we instead use scope.ensureActive() to get the original
                //  cancellation cause? Should we instead have something like
                //  UndeliveredElementException?
                msg.ack.completeExceptionally(
                    ex ?: CancellationException(
                        "DataStore scope was cancelled before updateData could complete"
                    )
                )
            }
        }
    ) { msg ->
        when (msg) {
            is Message.Read -> {
                handleRead(msg)
            }
            is Message.Update -> {
                handleUpdate(msg)
            }
        }
    }
    

    actor 根据消息类型,进行读写操作,由于上一步我们传递的是Message.Update类型,因此会调用handleUpdate(msg)函数,我们继续进入handleUpdate(msg)函数中

    private suspend fun handleUpdate(update: Message.Update<T>) {
        // All branches of this *must* complete ack either successfully or exceptionally.
        // We must *not* throw an exception, just propagate it to the ack.
        update.ack.completeWith(
            runCatching {
    
                when (val currentState = downstreamFlow.value) {
                    is Data -> {
                        // We are already initialized, we just need to perform the update
                        transformAndWrite(update.transform, update.callerContext)
                    }
                    is ReadException, is UnInitialized -> {
                        if (currentState === update.lastState) {
                            // we need to try to read again
                            readAndInitOrPropagateAndThrowFailure()
    
                            // We've successfully read, now we need to perform the update
                            transformAndWrite(update.transform, update.callerContext)
                        } else {
                            // Someone else beat us to read but also failed. We just need to
                            // signal the writer that is waiting on ack.
                            // This cast is safe because we can't be in the UnInitialized
                            // state if the state has changed.
                            throw (currentState as ReadException).readException
                        }
                    }
    
                    is Final -> throw currentState.finalException // won't happen
                }
            }
        )
    }
    

    我们先看主线逻辑,在handleUpdate中 根据状态类型进一步跳转,如果我们上来就执行写操作,downstreamFlow.value的初始状态是UnInitialized ,那么会执行这里的逻辑

    if (currentState === update.lastState) {
        // we need to try to read again
        readAndInitOrPropagateAndThrowFailure()
    
        // We've successfully read, now we need to perform the update
        transformAndWrite(update.transform, update.callerContext)
    }
    

    我们看在进一步看transformAndWrite 方法

    private suspend fun transformAndWrite(
        transform: suspend (t: T) -> T,
        callerContext: CoroutineContext
    ): T {
        // value is not null or an exception because we must have the value set by now so this cast
        // is safe.
        val curDataAndHash = downstreamFlow.value as Data<T>
        curDataAndHash.checkHashCode()
    
        val curData = curDataAndHash.value
        val newData = withContext(callerContext) { transform(curData) }
    
        // Check that curData has not changed...
        curDataAndHash.checkHashCode()
    
        return if (curData == newData) {
            curData
        } else {
            writeData(newData)
            downstreamFlow.value = Data(newData, newData.hashCode())
            newData
        }
    }
    

    关键逻辑在这里

    return if (curData == newData) {
        curData
    } else {
        writeData(newData)
        downstreamFlow.value = Data(newData, newData.hashCode())
        newData
    }
    

    如果要更新的值和当前值相等,就不再继续执行,否则,执行writeData 然后更新downstreamFlow.value的状态为Data,继续看writeData 函数

    internal suspend fun writeData(newData: T) {
        file.createParentDirectories()
    
        val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
        try {
            FileOutputStream(scratchFile).use { stream ->
                serializer.writeTo(newData, UncloseableOutputStream(stream))
                stream.fd.sync()
                // TODO(b/151635324): fsync the directory, otherwise a badly timed crash could
                //  result in reverting to a previous state.
            }
    
            if (!scratchFile.renameTo(file)) {
                throw IOException(
                    "Unable to rename $scratchFile." +
                        "This likely means that there are multiple instances of DataStore " +
                        "for this file. Ensure that you are only creating a single instance of " +
                        "datastore for this file."
                )
            }
        } catch (ex: IOException) {
            if (scratchFile.exists()) {
                scratchFile.delete() // Swallow failure to delete
            }
            throw ex
        }
    }
    

    这里真正执行了数据的保存操作,而且是先将数据写到了一个.tmp的临时文件中,然后调用

    scratchFile.renameTo(file)
    

    将scratchFile文件重命名为file文件。到这里数据就保存成功了。

    这里有两个地方需要注意下

     stream.fd.sync()
    

    (1)这句代码是通过文件描述符刷新数据,执行这句之后,内存中的数据会立即同步到文件中,这是linux的机制,知道即可。

    scratchFile.renameTo(file)
    

    (2)renameTo方法是将文件重命名,测试时发现在Android平台下不论目标文件是否存在,均会执行成功,除非scratchFile 不存在,这可能和Java中的不一致,具体还需要看下renameTo的源码。

    我们再来看下数据的读取过程。

    在上面的示例中我们知道 读的过程就是观察dataStore.data流的过程,我们继续查看SingleProcessDataStore相关代码

    override val data: Flow<T> = flow {
        val currentDownStreamFlowState = downstreamFlow.value
        if (currentDownStreamFlowState !is Data) {
            // We need to send a read request because we don't have data yet.
            actor.offer(Message.Read(currentDownStreamFlowState))
        }
        emitAll(
            downstreamFlow.dropWhile {
                if (currentDownStreamFlowState is Data<T> ||
                    currentDownStreamFlowState is Final<T>
                ) {
                    false
                } else {
                    it === currentDownStreamFlowState
                }
            }.map {
                when (it) {
                    is ReadException<T> -> throw it.readException
                    is Final<T> -> throw it.finalException
                    is Data<T> -> it.value
                    is UnInitialized -> error(
                        "This is a bug in DataStore. Please file a bug at: " +
                            "https://issuetracker.google.com/issues/new?" +
                            "component=907884&template=1466542"
                    )
                }
            }
        )
    }
    

    如果读之前没有写操作或者第一次会先执行

    if (currentDownStreamFlowState !is Data) {
        // We need to send a read request because we don't have data yet.
        actor.offer(Message.Read(currentDownStreamFlowState))
    }
    

    downstreamFlow.value的初始值为*UnInitialized

    但是如果之前有过写操作,就可以直接从缓存中读取最新值,因为在写完时downstreamFlow.value中保存了最新值

    private suspend fun transformAndWrite(
        transform: suspend (t: T) -> T,
        callerContext: CoroutineContext
    ): T {
        ...
        return if (curData == newData) {
            curData
        } else {
            writeData(newData)
            downstreamFlow.value = Data(newData, newData.hashCode())
            ...
        }
    }
    

    我们来看下actor中执行读的相关操作,直接查看handlRead函数

    private suspend fun handleRead(read: Message.Read<T>) {
        when (val currentState = downstreamFlow.value) {
            ...
            UnInitialized -> {
                readAndInitOrPropagateFailure()
            }
            ...
        }
    }
    

    在进入readAndInitOrPropagateFailure函数中

    private suspend fun readAndInitOrPropagateFailure() {
        try {
            readAndInit()
        } catch (throwable: Throwable) {
            downstreamFlow.value = ReadException(throwable)
        }
    }
    

    在readAndInit继续调用 readDataOrHandleCorruption执行读操作

    var initData = readDataOrHandleCorruption()
    
    private suspend fun readDataOrHandleCorruption(): T {
        try {
            return readData()
        } catch (ex: CorruptionException) {
        ...
        }
    }
    

    最后在readData中执行了真正的读操作。

    private suspend fun readData(): T {
        try {
            FileInputStream(file).use { stream ->
                return serializer.readFrom(stream)
            }
        } catch (ex: FileNotFoundException) {
            if (file.exists()) {
                throw ex
            }
            return serializer.defaultValue
        }
    }
    

    在获取到数据后downstreamFlow.value变为Data

    private suspend fun readAndInit() {
        ...
        downstreamFlow.value = Data(initData, initData.hashCode())
    }
    

    最后通过emitAll发射出去

    emitAll(
        downstreamFlow.dropWhile {
            ...
        }.map {
            when (it) {
                ...
                is Data<T> -> it.value
                ...
            }
        }
    )
    

    以上就是 DataStore读写的主要流程,涉及到的其他细节由于篇幅原因这里就不展开了,感兴趣的小伙伴们可以自己阅读。

    相关文章

      网友评论

          本文标题:JetPack DataStore 源码解析

          本文链接:https://www.haomeiwen.com/subject/hgclxrtx.html