美文网首页
Android Datastore 动态创建与源码解析

Android Datastore 动态创建与源码解析

作者: 一个冬季 | 来源:发表于2023-10-03 15:04 被阅读0次

    涉及到的知识点

    1、协程原理---->很好的博客介绍,一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?
    2、Channel知识点---->Android—kotlin-Channel超详细讲解
    3、Coroutines : CompletableDeferred and structured concurrency

    封装的DataStoreUtils工具--->gitHub

    本篇博客目的

    公司使用SharedPreferences容易导致ANR,调研能否使用DataStore替换公司目前的SharedPreferences解决ANR问题,所以需要先研究一下源码

    目录

    • 版本引入
    • 迁移SharedPreferences数据到dataStore
    • 动态创建DataStore
    • 存储参数
    • 总结

    版本引入

    implementation "androidx.datastore:datastore-preferences:1.0.0"
    

    迁移SharedPreferences数据到dataStore

    既然是迁移数据,那么需要将SharedPreferences已存储的数据迁移到dataStore,所以需要先构建dataStore。
    目前网上构建迁移DataStore的案例Demo如下

    //迁移使用
    private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(
        name = "userSharePreFile",
        produceMigrations = { context ->
            listOf(
                SharedPreferencesMigration(
                    context,
                     "userSharePreFile"
                )
            )
        }
    )
    
    //或 
    //这种构建DataStore写法是alpha版本有的,在1.0.0版本就找不到了
    var dataStore: DataStore<Preferences> = context.createDataStore(
            name = "userSharePreFile"
     )
    //或
    //直接构建
    private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(
            name = "userSharePreFile"
    )
    

    上面3种写法都是对Context进行扩展创建的DataStore,所以上面创建的方式,都有一个缺点,就是需要提前知道name才能创建,如果你之前创建SharedPreferences的方式,是通过外部传递进来name构建的话,上面直接创建DataStore方式就显然不适合你了。

    翻阅旧版本(alpha版本)源码,一探究竟如何构建DataStore
    //alpha版本构建方式
    var dataStore: DataStore<Preferences> = context.createDataStore(
            name = "userSharePreFile"
     )
    
    fun Context.createDataStore(
        name: String,
        corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
        //①
        migrations: List<DataMigration<Preferences>> = listOf(),
        //②
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    ): DataStore<Preferences> =
        PreferenceDataStoreFactory.create(
            //③
            produceFile = {
                File(this.filesDir, "datastore/$name.preferences_pb")
            },
            corruptionHandler = corruptionHandler,
            migrations = migrations,
            scope = scope
        )
    

    可以明显看到是使用PreferenceDataStoreFactory.create返回DataStore<Preferences>
    ① 是构建需要迁移SharedPreferences文件名称
    ② 指明协程是在IO运行
    ③ 新文件存储的位置
    再看看另外一种通过 by preferencesDataStore 创建DataStore方式

    private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(
            name = "userSharePreFile"
    )
    
    public fun preferencesDataStore(
        name: String,
        corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
        //①
        produceMigrations: (Context) -> List<DataMigration<Preferences>> = { listOf() },
        //②
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    ): ReadOnlyProperty<Context, DataStore<Preferences>> {
        return PreferenceDataStoreSingletonDelegate(name, corruptionHandler, produceMigrations, scope)
    }
    
    internal class PreferenceDataStoreSingletonDelegate internal constructor(
        private val name: String,
        private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
        private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
        private val scope: CoroutineScope
    ) : ReadOnlyProperty<Context, DataStore<Preferences>> {
    
        private val lock = Any()
    
        @GuardedBy("lock")
        @Volatile
        private var INSTANCE: DataStore<Preferences>? = null
    
        override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
            return INSTANCE ?: synchronized(lock) {
                if (INSTANCE == null) {
                    val applicationContext = thisRef.applicationContext
    
                    INSTANCE = PreferenceDataStoreFactory.create(
                        corruptionHandler = corruptionHandler,
                        migrations = produceMigrations(applicationContext),
                        scope = scope
                    ) {
                        applicationContext.preferencesDataStoreFile(name)
                    }
                }
                INSTANCE!!
            }
        }
    }
    
    //文件存储位置
    public fun Context.preferencesDataStoreFile(name: String): File =
        this.dataStoreFile("$name.preferences_pb")
    

    题外话:这里有利用kotlin委托属性by关键字语法
    ① 需要迁移的SharedPreferences文件
    ② 协程运行在IO

    可以看出旧版本(alpha) 与 by preferencesDataStore 2种方案,都最终通过PreferenceDataStoreFactory.create,返回DataStore,我们就继续再看看PreferenceDataStoreFactory.kt的具体实现逻辑

    //PreferenceDataStoreFactory.kt
     public fun create(
            corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
            //迁移的share文件集合
            migrations: List<DataMigration<Preferences>> = listOf(),
             //IO
            scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
            //dataStore文件存储的目录位置
            produceFile: () -> File 
        ): DataStore<Preferences> {
            val delegate = DataStoreFactory.create(//创建SingleProcessDataStore
                serializer = PreferencesSerializer,
                corruptionHandler = corruptionHandler,
                migrations = migrations,
                scope = scope
            ) {
                //省略代码
            } 
            //传入SingleProcessDataStore
            return PreferenceDataStore(delegate)
        }
    
    //这里有主动的去调用updateData 方法,如果不去主动调用,就不会触发迁移的逻辑
    //下文的扩展函数DataStore<Preferences>.edit会说到这里
    internal class PreferenceDataStore(private val delegate: DataStore<Preferences>) :
        DataStore<Preferences> by delegate {
        override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):
            Preferences {
                return delegate.updateData {
                    val transformed = transform(it)
                    (transformed as MutablePreferences).freeze()
                    transformed
                }
            }
    }
    
    

    继续看DataStoreFactory.create

    //DataStoreFactory.kt
    fun <T> create(
            produceFile: () -> File,
            serializer: Serializer<T>,
            corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
            migrations: List<DataMigration<T>> = listOf(),
            scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
        ): DataStore<T> =
            //找到最终创建的类
            SingleProcessDataStore(
                produceFile = produceFile,
                serializer = serializer,
                corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
                initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
                scope = scope
            )
    

    到目前为止已经知道真相了,最终是通过SingleProcessDataStore返回DataStore。

    下面我们通过一张图片来小结一下,旧版本alpha版本的创建与新版本 by preferencesDataStore的调用逻辑链

    DataStore.jpg

    好,已经知道这么多了,那么我们就开始动态构建DataStore

    动态创建DataStore

     fun preferencesMigrationDataStore(sharedPreferName: String) {
        val dataStore = PreferenceDataStoreFactory.create(
          corruptionHandler =  ReplaceFileCorruptionHandler<Preferences>(
            produceNewData = { emptyPreferences() }
          ),
        //需要迁移的sharePrefer文件的名称
         migrations = listOf(SharedPreferencesMigration(mContext, sharedPreferName)),
        //IO
         scope = CoroutineScope(Dispatchers.IO + SupervisorJob())) {
        //dataStore文件名称
         mContext.preferencesDataStoreFile(sharedPreferName)
         }
      
        runBlocking {
            //必须要执行这行代码,否是不会走迁移的逻辑
             dataStore.updateData {
                  it.toPreferences()
               }
          }
        }
    

    migrations:表示你要迁移的sharedPreference文件
    scope :表示写数据是在IO
    执行完上述代码后,.xml就会消失,然后会在files目录下多出一个/datastore/xxx.preferences_pb文件
    切勿重复对某个SharedPreferences执行文件迁移方案,否则会报错。比如你前一秒在执行迁移,后一秒又继续执行迁移


    SharedPrefs.png
    dataStore_migrate.jpg

    存储参数

    /**
     * @key 参数
     * @value 具体的值
     */
     private fun putInt(key:String, value: Int) {
        runBlocking {
             dataStore.edit {//①
                    it[intPreferencesKey(key)] = value
              }
           }
       }
    //类似的还有如下,这些都是google提供的参数
    intPreferencesKey
    doublePreferencesKey
    stringPreferencesKey
    ....
    

    看①详情,点击edit,发现他是一个扩展函数

    public suspend fun DataStore<Preferences>.edit(
        transform: suspend (MutablePreferences) -> Unit
    ): Preferences {
        return this.updateData {//调用的是PreferenceDataStore.updateData()
            //it.toMutablePreferences() 返回类似map
            it.toMutablePreferences().apply { transform(this) }
        }
    }
    

    transform 就是调用者{}里面的内容,接下来我们看看 PreferenceDataStore 类的代码

    //由前部分的代码,可以得知,delegate = SingleProcessDataStore 
    internal class PreferenceDataStore(private val delegate: DataStore<Preferences>) :
        DataStore<Preferences> by delegate {
        override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):
            Preferences {
                return delegate.updateData {//调用SingleProcessDataStore.updateData 
                    //返回给上一个{}也就是  it.toMutablePreferences().apply { transform(this) }
                    val transformed = transform(it)
                    (transformed as MutablePreferences).freeze()
                    transformed //拿到用户的需要更改的内容数据
                }
            }
    }
    

    代码里调用了delegate.updateData(), 所以继续看SingleProcessDataStore的updateData

    SingleProcessDataStore.kt
     override suspend fun updateData(transform: suspend (t: T) -> T): T {
            val ack = CompletableDeferred<T>()
            val currentDownStreamFlowState = downstreamFlow.value
            //协程体封装进Message.Update,coroutineContext 是协程的上下文,就是我们的 runBlocking 启动的线程,我这里是main
            val updateMsg = Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
            //对消息进行分发,他的类是 SimpleActor
            actor.offer(updateMsg)
            //这里会拿到Preferences,如何拿?后面会有一个update.ack.completeWith方法,会返回回来
            return ack.await()
        }
    
    internal class SimpleActor<T>(
        private val scope: CoroutineScope,//Dispatchers.IO + SupervisorJob()
        onComplete: (Throwable?) -> Unit,
        onUndeliveredElement: (T, Throwable?) -> Unit,
        private val consumeMessage: suspend (T) -> Unit
    ) {
        private val messageQueue = Channel<T>(capacity = UNLIMITED)
        private val remainingMessages = AtomicInteger(0)
        //......  省去
        //这里就是将刚刚封装的消息体,添加进这里
        fun offer(msg: T) {
            check(
                //发送封装的消息体
                messageQueue.trySend(msg)
                    .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
                    .isSuccess
            )
            if (remainingMessages.getAndIncrement() == 0) {
                scope.launch {
                    check(remainingMessages.get() > 0)
                    do {
                       // scope = Dispatchers.IO + SupervisorJob()
                        scope.ensureActive()
                        //取出封装的消息体,然后进行任务处理
                        consumeMessage(messageQueue.receive())
                    } while (remainingMessages.decrementAndGet() != 0)
                }
            }
        }
    }
    

    tip:这里有利用Channel进行协程通信,Channel是可以处理并发的情况
    到这里,我们可以知道,我们由runBlocking(main主线程) 协程 到 Dispatchers.IO的任务分发

    private val actor = SimpleActor<Message<T>>(
            scope = scope,// CoroutineScope(Dispatchers.IO + SupervisorJob())
            onComplete = {//.....省略},
            onUndeliveredElement = { msg, ex ->
              //.....省略
          ) { msg ->
            //处理分发的任务,msg 为刚刚封装的updateMsg 
            when (msg) { 
                is Message.Read -> {//读取
                    handleRead(msg)
                }
                is Message.Update -> {//更新
                    handleUpdate(msg)
                }
            }
        }
    
     private suspend fun handleUpdate(update: Message.Update<T>) {
            update.ack.completeWith(
                runCatching {
                    when (val currentState = downstreamFlow.value) {
                        is Data -> {
                            //写数据到file
                            transformAndWrite(update.transform, update.callerContext)
                        }
                        is ReadException, is UnInitialized -> {
                            if (currentState === update.lastState) {           
                                //读取file文件      ①          
                                readAndInitOrPropagateAndThrowFailure()
                                //写数据到file       ②
                                transformAndWrite(update.transform, update.callerContext)
                            } else {
                                throw (currentState as ReadException).readException
                            }
                        }
    
                        is Final -> throw currentState.finalException // won't happen
                    }
                }
            )
        }
    

    第一次使用 downstreamFlow.value = UnInitialized 。
    这里要注意一下update.ack.completeWith这个函数,他是拿到结果成功返回

    这里再次展示出来,是告诉大家,在哪里会等待结果返回
     override suspend fun updateData(transform: suspend (t: T) -> T): T {
            val ack = CompletableDeferred<T>()
            val currentDownStreamFlowState = downstreamFlow.value
            val updateMsg =
                Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
            actor.offer(updateMsg)
            return ack.await() //这里就是等待 update.ack.completeWith的结果返回,所以如果不加这行,是不会卡主线程的
        }
    

    所以使用runBlocking是会卡主线程的,如果你还有UI刷新情况,严重的情况会导致ANR问题

    不扯之前的了,我们继续继续,看① 的读取

     private suspend fun readAndInitOrPropagateAndThrowFailure() {
            try {
                readAndInit()
            } catch (throwable: Throwable) {
                downstreamFlow.value = ReadException(throwable)
                throw throwable
            }
        }
    
     private suspend fun readAndInit() {
            check(downstreamFlow.value == UnInitialized || downstreamFlow.value is ReadException)
            //这个是锁,协程里面专有的,详情可以看 https://www.kotlincn.net/docs/reference/coroutines/shared-mutable-state-and-concurrency.html
            val updateLock = Mutex()
            //读取dataStore文件
            var initData = readDataOrHandleCorruption()
            var initializationComplete: Boolean = false
            
            //这里就是shareprefence转dataStore
            val api = object : InitializerApi<T> {
                override suspend fun updateData(transform: suspend (t: T) -> T): T {
                    return updateLock.withLock() {
                        if (initializationComplete) {
                            throw IllegalStateException(
                                "InitializerApi.updateData should not be " +
                                    "called after initialization is complete."
                            )
                        }
                        //transform里面就是去迁移数据的方法
                        val newData = transform(initData)
                        //这里有做,新 旧值比较,如果不同,就去写入
                        if (newData != initData) {
                            //写文件
                            writeData(newData)
                            initData = newData
                        }
                        initData
                    }
                }
            }
            //initTasks 里面装的就是需要转换的 SharedPreferences集合
            initTasks?.forEach { it(api) }
            initTasks = null
            updateLock.withLock {
                initializationComplete = true
            }
            //这里有将迁移完成后的数据,存储在flow.value里面
            downstreamFlow.value = Data(initData, initData.hashCode())
        }
    
    //读取dataStore文件
    private suspend fun readDataOrHandleCorruption(): T {
            try {
                return readData()
            } catch (ex: CorruptionException) {
                val newData: T = corruptionHandler.handleCorruption(ex)
                try {
                    writeData(newData)
                } catch (writeEx: IOException) {
                    ex.addSuppressed(writeEx)
                    throw ex
                }
                return newData
            }
        }
    
     private suspend fun readData(): T {
            try {
                FileInputStream(file).use { stream ->
                    return serializer.readFrom(stream)
                }
            } catch (ex: FileNotFoundException) {
                if (file.exists()) {
                    throw ex
                }
                return serializer.defaultValue
            }
        }
    

    file就是我们存储的dataStore,目录是在 "datastore/$name.preferences_pb"

    看完了①,再来看看② 写入数据到file,写数据的方法是 transformAndWrite()

    //....
    transformAndWrite(update.transform, update.callerContext)
    //...
    
     private suspend fun transformAndWrite(
             //来源于 Message.Update.transform封装
            transform: suspend (t: T) -> T,
            //来源于 Message.Update.callerContext封装
            callerContext: CoroutineContext
        ): T {
            val curDataAndHash = downstreamFlow.value as Data<T>
            curDataAndHash.checkHashCode()
    
            val curData = curDataAndHash.value
            //这里callerContext  就是我们的 runBlocking,main(主线程)
            //这里是将旧的值给回调用者,然后从调用者获取到新参数
            val newData = withContext(callerContext) { transform(curData) }
    
            curDataAndHash.checkHashCode()
            //这里有做数据比较
            return if (curData == newData) {
                curData
            } else {
                //写入数据
                writeData(newData)
                //保存到flow.value里面
                downstreamFlow.value = Data(newData, newData.hashCode())
                newData
            }
        }
    
    private val SCRATCH_SUFFIX = ".tmp"
    //写入数据
    internal suspend fun writeData(newData: T) {
            file.createParentDirectories()
            //这里创建出来的文件是"datastore/$name.preferences_pb.tmp"
            val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
            try {
                FileOutputStream(scratchFile).use { stream ->
                    serializer.writeTo(newData, UncloseableOutputStream(stream))
                    stream.fd.sync()
                }
                //重新命名回去file,这里的file是我们目标的文件dataStore名称
                if (!scratchFile.renameTo(file)) {
                    //重新命名失败,抛出异常
                    throw IOException(
                        "Unable to rename $scratchFile." +
                            "This likely means that there are multiple instances of DataStore " +
                            "for this file. Ensure that you are only creating a single instance of " +
                            "datastore for this file."
                    )
                }
            } catch (ex: IOException) {
                if (scratchFile.exists()) {
                    scratchFile.delete() 
                }
                throw ex
            }
        }
    

    到此,更新值的操作,我们已经全部走完了流程

    总结

    1、文件的写入是发生在IO层面
    2、使用runBlocking是会卡主线程,如果此时存在需要刷新UI的情况,严重会ANR

    
    /**
     * @key 参数
     * @value 具体的值
     */
     private fun putInt(key:String, value: Int) {
        runBlocking {
             dataStore.edit {
                    it[intPreferencesKey(key)] = value
              }
           }
       }
    
    public suspend fun DataStore<Preferences>.edit(
        transform: suspend (MutablePreferences) -> Unit
    ): Preferences {
        return this.updateData {
            it.toMutablePreferences().apply { transform(this) }
        }
    }
    
    //更新逻辑
     private suspend fun handleUpdate(update: Message.Update<T>) {
            update.ack.completeWith(//通知结果回调
                //.....省去
            )
        }
    
    //transform 就是上面的{}里面的内容
     override suspend fun updateData(transform: suspend (t: T) -> T): T {
            val ack = CompletableDeferred<T>()
            val currentDownStreamFlowState = downstreamFlow.value
            val updateMsg =
                Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
            actor.offer(updateMsg)
            return ack.await() //这里就是等待 update.ack.completeWith的结果返回,所以如果不加这行,是不会卡主线程的
            //题外话不加ack.await() 代码也会执行
        }
    

    所以,可以考虑使用withContext(IO){读取/更新等待操作}

    3、更新参数的时候,是会跟旧的值比较,如果值相同就不写入了,否则就写入到文件里面,并且更新flow.value的值

     return if (curData == newData) {
                curData
            } else {
                writeData(newData)
                downstreamFlow.value = Data(newData, newData.hashCode())
                newData
            }
    

    4、解决并发问题,使用channel解决协程之间沟通与并发,单线程的IO更新文件与并发

    5、如果已将SharedPreference迁移到DataStore,你就不要继续使用SharedPreferences了,如果继续使用SharedPreferences,会与DataStore的值不同了

    相关文章

      网友评论

          本文标题:Android Datastore 动态创建与源码解析

          本文链接:https://www.haomeiwen.com/subject/vemkbdtx.html