美文网首页
从Room源码看抽象与封装——数据流

从Room源码看抽象与封装——数据流

作者: 珞泽珈群 | 来源:发表于2019-07-09 18:46 被阅读0次

    目录

    源码解析目录
    从Room源码看抽象与封装——SQLite的抽象
    从Room源码看抽象与封装——数据库的创建
    从Room源码看抽象与封装——数据库的升降级
    从Room源码看抽象与封装——Dao
    从Room源码看抽象与封装——数据流

    前言

    之前关于Room源码分析的四篇文章基本上涵盖了Room使用的全流程,从抽象基础到数据库创建再到增删改查的实现,可谓很全面了。你以为这就完了吗?Too young!如果Room只是这些内容的话,那只能说它是个还不错的ORM框架,对SQLite进行了良好的抽象,并且效率也很高。Room最大的不同在于它是Jetpack包的一部分,它与Jetpack包中其它部分有着非常好的配合,尤其是LiveData。以之前定义的Dao为例:

    @Dao
    interface UserDao {
        @Query("SELECT * FROM user WHERE uid = :userId")
        fun getUserById(userId: Int): User?
    
        @Query("SELECT * FROM user WHERE uid = :userId")
        fun getUserLiveDataById(userId: Int): LiveData<User>
        
        @Query("SELECT * FROM user WHERE uid = :userId")
        fun getUserObservableById(userId: Int): Observable<User>
        
        @Query("SELECT * FROM user WHERE uid = :userId")
        fun getUserFlowableById(userId: Int): Flowable<User>
    }
    

    如上所示,Dao中的查询不仅可以返回Entity对象User,还可以以数据流的形式返回LiveData或者是RxJava中的Observable,Flowable。当我们“增删改”User表时,如果在此之前以如上形式获取到了User数据流,那么该User数据流就会“更新”,即相应查询会重新执行一遍,以把最新的User数据传递出去,至于User是否真的有变化,这需要我们自行判断。
    这篇文章会介绍Room是如何实现数据流的。

    1. 概述

    如果说之前对Room方方面面的解析只是“繁”的话,那么Room对于数据流的实现真的是有点“难”了。整个流程比较复杂,为了防止你看的云里雾里,我先大致描述一下整个流程,然后再对各个部分进行详细讲解。
    Room实现数据流大致包含如下几个步骤:

    1. 创建一个数据库临时表(表名为room_table_modification_log,保存在内存中,非本地存储),表中包含两列table_idinvalidated,这个表记录了当前数据库哪个Table被修改了(“增删改”都属于修改)。
    2. 如果Dao中的查询涉及到了数据流返回,那么生成代码就会帮我们向数据库中添加一个关于当前被查询表(对于上面的例子来说就是User这个表,可能会涉及到多个表)的TRIGGER(触发器),这个触发器的主要任务是当有操作修改了被查询表中的数据时,就把room_table_modification_log表中对应的table_idinvalidated置为1,表示“失效”了。
    3. Room中所有“增删改”都是放在Transaction中的,在endTransaction时,Room都会查看room_table_modification_log表中是否有invalidated为1的,如果有,再次执行相应查询(invalidated会被重置为0),并把查询的数据通过相应数据流传递出去。
    4. 在数据流不再使用时(例如Observable被dispose),相应触发器也会被丢弃,这样就不会再对这个表进行“追踪”了。

    2. 创建临时表

    Room中关于数据流的实现几乎全部包含在InvalidationTracker类中,这个类是在RoomDatabase中被创建的,只是在之前的分析中被刻意忽略了。

    public abstract class RoomDatabase {
        private final InvalidationTracker mInvalidationTracker;
        
        public RoomDatabase() {
            mInvalidationTracker = createInvalidationTracker();
        }
        
        /**
         * 由生成代码实现
         */
        @NonNull
        protected abstract InvalidationTracker createInvalidationTracker();
        
        /**
         * 由生成代码调用,初始化 InvalidationTracker
         */
        protected void internalInitInvalidationTracker(@NonNull SupportSQLiteDatabase db) {
            mInvalidationTracker.internalInit(db);
        }
    
        @NonNull
        public InvalidationTracker getInvalidationTracker() {
            return mInvalidationTracker;
        }
    }
    
    public final class AppDatabase_Impl extends AppDatabase {
        @Override
        protected SupportSQLiteOpenHelper createOpenHelper(DatabaseConfiguration configuration) {
            final SupportSQLiteOpenHelper.Callback _openCallback = new RoomOpenHelper(configuration, new RoomOpenHelper.Delegate(1) {
                @Override
                public void onOpen(SupportSQLiteDatabase _db) {
                    mDatabase = _db;
                    _db.execSQL("PRAGMA foreign_keys = ON");
                    //数据库打开时初始化 InvalidationTracker
                    internalInitInvalidationTracker(_db);
                    //...
                }
                //...
            });
            //...
        }
          
        @Override
        protected InvalidationTracker createInvalidationTracker() {
            //...
            return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "User"); //User表示要观察的表名,可能包含很多个
        }
    }
    

    注解处理器生成了AppDatabase_Impl,其中不仅实现了createOpenHelper,还实现了createInvalidationTracker,并且在数据库打开时(onOpen回调被调用),InvalidationTracker被初始化,也是在初始化时,临时表被创建,踏出万里长征第一步。

    public class InvalidationTracker {
    
        private static final String UPDATE_TABLE_NAME = "room_table_modification_log";
    
        private static final String TABLE_ID_COLUMN_NAME = "table_id";
    
        private static final String INVALIDATED_COLUMN_NAME = "invalidated";
    
        //建表
        private static final String CREATE_TRACKING_TABLE_SQL = "CREATE TEMP TABLE " + UPDATE_TABLE_NAME
                + "(" + TABLE_ID_COLUMN_NAME + " INTEGER PRIMARY KEY, "
                + INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0)";
    
        //重置
        static final String RESET_UPDATED_TABLES_SQL = "UPDATE " + UPDATE_TABLE_NAME
                + " SET " + INVALIDATED_COLUMN_NAME + " = 0 WHERE " + INVALIDATED_COLUMN_NAME + " = 1 ";
    
        //找出哪个表改变了
        static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
                + " WHERE " + INVALIDATED_COLUMN_NAME + " = 1;";
                
        //根据表名找表名对应的ID
        final ArrayMap<String, Integer> mTableIdLookup;
        //哪些表需要被观察
        final String[] mTableNames;
        
        //tableNames是可变参数,表示可能有多个要被“追踪”的表
        public InvalidationTracker(RoomDatabase database, Map<String, String> shadowTablesMap,
                Map<String, Set<String>> viewTables, String... tableNames) {
            //shadowTablesMap, viewTables可以忽略,一般都为空
            final int size = tableNames.length;
            mTableNames = new String[size];
            for (int id = 0; id < size; id++) {
                final String tableName = tableNames[id].toLowerCase(Locale.US);
                //表对应的ID其实就是它的位置,因为是临时表存储在内存中,所以这个ID只需要能分别谁是谁就可以了
                mTableIdLookup.put(tableName, id);
                mTableNames[id] = tableName;
            }
            //...
        }
        
        /**
         * 初始化临时表
         */
        void internalInit(SupportSQLiteDatabase database) {
            synchronized (this) {
                database.execSQL("PRAGMA temp_store = MEMORY;"); //临时表保存在内存中
                database.execSQL("PRAGMA recursive_triggers='ON';");
                //创建临时表
                database.execSQL(CREATE_TRACKING_TABLE_SQL);
                syncTriggers(database);
                //重置临时表是需要经常执行的,把它编译成SQLiteStatement方便执行
                mCleanupStatement = database.compileStatement(RESET_UPDATED_TABLES_SQL);
            }
        }
    }
    

    临时表的表名是room_table_modification_log,创建它的目的是追踪我们需要观察的那些表中(例如User表)数据是否发生了变化。把建表的SQL翻译以下就是

    CREATE TEMP TABLE room_table_modification_log
        (table_id INTEGER PRIMARY KEY,  invalidated INTEGER NOT NULL DEFAULT 0)
    

    就像下面这样

    table_id invalidated
    0 0

    创建表之后不会有数据,数据(0,0)只是方便你理解。

    因为这个表是临时表,并且在创建它之前设置了PRAGMA temp_store = MEMORY,因此room_table_modification_log表被放在了内存中,并不会被保存在本地文件中(当然也没有必要)。因为要放在内存中,所以说room_table_modification_log选择使用把要追踪的表名转成ID进行存储,这样节省内存,而这个ID并没有什么特殊含义,只需要跟要追踪的表名一一对应就可以,所以使用0,1,2这样的位置作为ID就可以。
    需要明确一点,room_table_modification_log创建后,其中是没有数据的。创建InvalidationTracker时传入的tableNames参数只是表明,返回数据流的查询涉及到了这些表,但是在数据库使用过程中,可能根本就没有执行相关查询,所以也就不需要追踪。也就是说room_table_modification_log的追踪是“懒”的,直到返回数据流的查询方法被调用时,相应数据才会被插入到room_table_modification_log表中,开启追踪。至于数据是如何被插入的,invalidated又是如何在01之间翻转的,下面会介绍。

    3. 添加/移除触发器

    触发器是数据库中的一个概念,你可以把它简单理解为在特定条件下“触发”的一系列行为,这种特定条件在我们这里就是UPDATE, DELETE, INSERT这三种操作,而“一系列行为”就是把room_table_modification_log表中的invalidated置为1。看来触发器的确非常适合于追踪表中数据被修改这种情形。
    那么触发器是由谁添加的呢?源头就是返回数据流的查询方法,当这些方法被调用时,触发器就会被添加。然后,从Dao中查询方法被调用,到触发器被添加,这中间又经历了很多转换,我们暂且把这流程省略直接看触发器相关的内容。

    public class InvalidationTracker {
        /**
         * An observer that can listen for changes in the database.
         */
        public abstract static class Observer {
            final String[] mTables;
    
            public Observer(@NonNull String[] tables) {
                mTables = Arrays.copyOf(tables, tables.length);
            }
    
            public abstract void onInvalidated(@NonNull Set<String> tables);
        }
    }
    

    别管我们查询是以怎样的数据流返回,最终都是向数据库添加了一个观察者,这个观察者的onInvalidated会在我们观察的Table“失效”时被回调,因此,我们可以再次进行查询,进而把最新的数据传递出去,形成数据流。而onInvalidated之所以被回调,依赖的就是room_table_modification_log表。
    当向数据库添加一个观察者时(其实是向InvalidationTracker添加),就会看要观察者提供的mTables中是否还存在没有被“追踪”的Table,如果有则会向room_table_modification_log表中插入一条数据,以开启对该Table的“追踪”,另外还会添加一个触发器,在UPDATE, DELETE, INSERT时设置对应table_idinvalidated为1。

    public class InvalidationTracker {
        //主要职责是记录哪些 Table 已经被“追踪”了
        private ObservedTableTracker mObservedTableTracker;
    
        final SafeIterableMap<Observer, ObserverWrapper> mObserverMap = new SafeIterableMap<>();
    
        //添加观察者
        @WorkerThread
        public void addObserver(@NonNull Observer observer) {
            //一般情况下,与 mTables一致
            final String[] tableNames = resolveViews(observer.mTables);
            int[] tableIds = new int[tableNames.length];
            final int size = tableNames.length;
    
            for (int i = 0; i < size; i++) {
                //查找 Table对应的ID
                Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
                if (tableId == null) {
                    throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
                }
                tableIds[i] = tableId;
            }
            //会进行一下包装
            ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames);
            ObserverWrapper currentObserver;
            synchronized (mObserverMap) {
                //保存下来,如果之前添加过,啥也不干
                currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
            }
            //如果之前没有添加过该观察者,并且 tableId之前没有添加过,就会添加触发器
            if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
                //需要“同步”
                syncTriggers();
            }
        }
        
        //移除观察者,在Observable dispose,LiveData被回收时调用
        @WorkerThread
        public void removeObserver(@NonNull final Observer observer) {
            ObserverWrapper wrapper;
            synchronized (mObserverMap) {
                wrapper = mObserverMap.remove(observer);
            }
            if (wrapper != null && mObservedTableTracker.onRemoved(wrapper.mTableIds)) {
                //需要“同步”
                syncTriggers();
            }
        }
    }
    

    InvalidationTracker添加观察者的主要流程是,根据Observer要观察的mTables,转换成对应的tableIds,之后把Observer包装成ObserverWrapperObserverWrapper的主要作用是记录下被观察表的信息,诸如tableIds, tableNames。最后,把观察者保存下来,并且在必要时syncTriggers,顾名思义就是同步触发器的意思(可能添加也可能移除触发器)。
    之前说过,只有在被观察的表还未被“追踪”时,才会添加触发器,如果某个表在之前就已经被“追踪”了,那自然不需要重复添加触发器。而哪些表已经被“追踪”,是否需要停止追踪,这些信息保存在ObservedTableTracker中,这里就不展开其代码了。
    移除观察者就更简单了,一目了然,就不废话了。
    我接着来看主线流程syncTriggers

    public class InvalidationTracker {
        void syncTriggers() {
            if (!mDatabase.isOpen()) {
                return;
            }
            syncTriggers(mDatabase.getOpenHelper().getWritableDatabase());
        }
        
        void syncTriggers(SupportSQLiteDatabase database) {
            if (database.inTransaction()) {
                return;
            }
            try {
                // This method runs in a while loop because while changes are synced to db, another
                // runnable may be skipped. If we cause it to skip, we need to do its work.
                while (true) {
                    Lock closeLock = mDatabase.getCloseLock();
                    closeLock.lock();
                    try {
                        //上文提到的 ObservedTableTracker
                        //返回为null,证明没有要同步的,否则就是“有活要干”
                        final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
                        if (tablesToSync == null) {
                            return;
                        }
                        final int limit = tablesToSync.length;
                        database.beginTransaction();
                        try {
                            for (int tableId = 0; tableId < limit; tableId++) {
                                switch (tablesToSync[tableId]) {
                                    //活来了,可能是开始追踪表
                                    case ObservedTableTracker.ADD:
                                        startTrackingTable(database, tableId);
                                        break;
                                    //也可能是停止追踪表
                                    case ObservedTableTracker.REMOVE:
                                        stopTrackingTable(database, tableId);
                                        break;
                                }
                            }
                            database.setTransactionSuccessful();
                        } finally {
                            database.endTransaction();
                        }
                        mObservedTableTracker.onSyncCompleted();
                    } finally {
                        closeLock.unlock();
                    }
                }
            } catch (IllegalStateException | SQLiteException exception) {
                Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
                        exception);
            }
        }
    }
    

    其实Room实现数据流的整体流程并不复杂,关键就是其中需要处理非常多的线程安全的问题,导致源码很复杂。syncTriggers就是这样的,其主干就是startTrackingTable或者stopTrackingTable或者啥也不干,但是考虑到线程安全的问题,就需要加锁,循环查询等等。总之,开启追踪是在startTrackingTable,停止追踪是在stopTrackingTable

    public class InvalidationTracker {
        private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
        
        //开启追踪,向room_table_modification_log插入数据,并添加触发器
        private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
            writableDb.execSQL(
                    "INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
            final String tableName = mTableNames[tableId];
            StringBuilder stringBuilder = new StringBuilder();
            for (String trigger : TRIGGERS) {
                stringBuilder.setLength(0);
                stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
                appendTriggerName(stringBuilder, tableName, trigger);
                stringBuilder.append(" AFTER ")
                        .append(trigger)
                        .append(" ON `")
                        .append(tableName)
                        .append("` BEGIN UPDATE ")
                        .append(UPDATE_TABLE_NAME)
                        .append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
                        .append(" WHERE ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
                        .append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
                        .append("; END");
                writableDb.execSQL(stringBuilder.toString());
            }
        }
        
        //停止追踪,只是丢弃触发器,并不会删除room_table_modification_log中的数据
        private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
            final String tableName = mTableNames[tableId];
            StringBuilder stringBuilder = new StringBuilder();
            for (String trigger : TRIGGERS) {
                stringBuilder.setLength(0);
                stringBuilder.append("DROP TRIGGER IF EXISTS ");
                appendTriggerName(stringBuilder, tableName, trigger);
                writableDb.execSQL(stringBuilder.toString());
            }
        }
        
        //触发器的名称
        private static void appendTriggerName(StringBuilder builder, String tableName,
                String triggerType) {
            builder.append("`")
                    .append("room_table_modification_trigger_")
                    .append(tableName)
                    .append("_")
                    .append(triggerType)
                    .append("`");
        }
    }
    

    以User表为例(其table_id为0),那么开启“追踪”:

    INSERT OR IGNORE INTO room_table_modification_log VALUES(0, 0)
    
    CREATE TEMP TRIGGER IF NOT EXISTS `room_table_modification_trigger_user_UPDATE`
    AFTER UPDATE ON `user`
    BEGIN
    UPDATE room_table_modification_log
    SET invalidated = 1 WHERE table_id = 0 AND invalidated = 0;
    END
    

    会为UPDATE, DELETE, INSERT分别创建TRIGGER,TIRGGER中的内容都是一样的。

    停止“追踪”:

    DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_UPDATE`
    DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_DELETE`
    DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_INSERT`
    

    并不会把room_table_modification_log中相应的数据也删了,说不定下次还能用到,只是不再更新invalidated

    4. 数据流

    有了上面这些基础,在看数据流的实现就比较简单了。下面会以RxJava Observable数据流为例,看看数据流究竟是如何实现的。

    @Dao
    interface UserDao {
        @Query("SELECT * FROM user WHERE uid = :userId")
        fun getUserObservableById(userId: Int): Observable<User>
    }
    

    会生成如下代码:

    public final class UserDao_Impl implements UserDao {
      @Override
      public Observable<User> getUserObservableById(final int userId) {
        final String _sql = "SELECT * FROM user WHERE uid = ?";
        final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1);
        int _argIndex = 1;
        _statement.bindLong(_argIndex, userId);
        //这里是关键
        return RxRoom.createObservable(__db, false, new String[]{"user"}, new Callable<User>() {
          @Override
          public User call() throws Exception {
            //以下是正常的数据库查询流程
            final Cursor _cursor = DBUtil.query(__db, _statement, false);
            try {
              final int _cursorIndexOfUid = CursorUtil.getColumnIndexOrThrow(_cursor, "uid");
              final int _cursorIndexOfFirstName = CursorUtil.getColumnIndexOrThrow(_cursor, "first_name");
              final int _cursorIndexOfLastName = CursorUtil.getColumnIndexOrThrow(_cursor, "last_name");
              final User _result;
              if(_cursor.moveToFirst()) {
                final int _tmpUid;
                _tmpUid = _cursor.getInt(_cursorIndexOfUid);
                final String _tmpFirstName;
                _tmpFirstName = _cursor.getString(_cursorIndexOfFirstName);
                final String _tmpLastName;
                _tmpLastName = _cursor.getString(_cursorIndexOfLastName);
                _result = new User(_tmpUid,_tmpFirstName,_tmpLastName);
              } else {
                _result = null;
              }
              return _result;
            } finally {
              _cursor.close();
            }
          }
    
          @Override
          protected void finalize() {
            _statement.release();
          }
        });
      }
    }
    

    跟普通查询最大的不同就是以RxRoom.createObservable创建了Observable。来看一下RxRoom

    public class RxRoom {
        //其中参数callable就是要进行的查询
        public static <T> Observable<T> createObservable(final RoomDatabase database,
                final boolean inTransaction, final String[] tableNames, final Callable<T> callable) {
            //不用太在意这个scheduler,就是一个线程池,不同情况下用不同的线程池
            Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
            //fromCallable会天然地阻止 null,如果callable返回null会被过滤掉
            final Maybe<T> maybe = Maybe.fromCallable(callable);
            return createObservable(database, tableNames)
                    .subscribeOn(scheduler)
                    .unsubscribeOn(scheduler)
                    .observeOn(scheduler)
                    .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                        @Override
                        public MaybeSource<T> apply(Object o) throws Exception {
                            return maybe;
                        }
                    });
        }
        
        public static Observable<Object> createObservable(final RoomDatabase database,
                final String... tableNames) {
            return Observable.create(new ObservableOnSubscribe<Object>() {
                @Override
                public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                    //新建一个InvalidationTracker.Observer
                    final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                            tableNames) {
                        //在观察的表“失效”时回调
                        @Override
                        public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                            //只是发射一个“信号”,表明需要重新查询
                            emitter.onNext(NOTHING);
                        }
                    };
                    //向InvalidationTracker添加观察者
                    database.getInvalidationTracker().addObserver(observer);
                    emitter.setDisposable(Disposables.fromAction(new Action() {
                        @Override
                        public void run() throws Exception {
                            //dispose时移除观察者
                            database.getInvalidationTracker().removeObserver(observer);
                        }
                    }));
    
                    // emit once to avoid missing any data and also easy chaining
                    emitter.onNext(NOTHING);
                }
            });
        }
    }
    

    很简单,就是向InvalidationTracker添加观察者,在Observabledispose的时候移除观察者。因为RxJava 2不喜欢null,所以RxRoom的做法是仅仅发送数据“失效”的信号,然后使用flatMapMaybe操作符结合Maybe.fromCallable天然地过滤掉了null,很巧妙(巧妙也不是一天达成的,在之前的版本中没有使用这个操作符,还是使用我们总是用的那种“笨拙”的方法过滤null)。


    还差最后一个环节就能形成闭环了,那就是啥时候去查询room_table_modification_log表,看看哪个被“追踪”的表“失效”了。最合适的时机就是“增删改”结束的时候:

    public abstract class RoomDatabase {
        public void beginTransaction() {
            assertNotMainThread();
            SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();
            //Transaction开始时“同步”Trigger
            mInvalidationTracker.syncTriggers(database);
            database.beginTransaction();
        }
    
        //Transaction结束时refreshVersionsAsync
        public void endTransaction() {
            mOpenHelper.getWritableDatabase().endTransaction();
            if (!inTransaction()) {
                // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
                // endTransaction call to do it.
                mInvalidationTracker.refreshVersionsAsync();
            }
        }
    }
    
    
    public class InvalidationTracker {
        public void refreshVersionsAsync() {
            if (mPendingRefresh.compareAndSet(false, true)) {
                mDatabase.getQueryExecutor().execute(mRefreshRunnable);
            }
        }
        
        Runnable mRefreshRunnable = new Runnable() {
            @Override
            public void run() {
                final Lock closeLock = mDatabase.getCloseLock();
                Set<Integer> invalidatedTableIds = null;
                try {
                    closeLock.lock();
    
                    if (!ensureInitialization()) {
                        return;
                    }
    
                    if (!mPendingRefresh.compareAndSet(true, false)) {
                        // no pending refresh
                        return;
                    }
    
                    if (mDatabase.inTransaction()) {
                        // current thread is in a transaction. when it ends, it will invoke
                        // refreshRunnable again. mPendingRefresh is left as false on purpose
                        // so that the last transaction can flip it on again.
                        return;
                    }
    
                    //查询room_table_modification_log,获取“失效”的表
                    invalidatedTableIds = checkUpdatedTable();
                } catch (IllegalStateException | SQLiteException exception) {
                    // may happen if db is closed. just log.
                    Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
                            exception);
                } finally {
                    closeLock.unlock();
                }
                //如果有“失效”的表
                if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
                    synchronized (mObserverMap) {
                        for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
                            //通知观察者,如果是观察者“关心”的表,就会回调 onInvalidated
                            entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
                        }
                    }
                }
            }
    
            //查询出room_table_modification_log中invalidated为1的,然后把它重置为0;返回的是 table_id的集合
            private Set<Integer> checkUpdatedTable() {
                //主要就是执行下面SQL
                //SELECT * FROM room_table_modification_log WHERE invalidated = 1
                //UPDATE room_table_modification_log SET invalidated = 0 WHERE invalidated = 1
            }
        };
    
    }
    

    “增删改”都是放在Transaction中的,因此endTransaction是个查询room_table_modification_log的好时机。通过room_table_modification_log获取“失效”的表,如果InvalidationTracker的观察者观察的是“失效”表中的某一个或几个,就会回调观察者的onInvalidated方法。onInvalidated方法正如前面展示的那样,会发送一个信号,然后相应查询就会被再次执行,最新的数据就会被传递出去。至此,整个流程完整地闭合。

    某个数据表“失效”仅仅是说这个表中的数据被修改了,但是,是不是修改的我们查询关心的部分,其实并不知道,有可能重新查询的结果跟之前是一样的。如果不想接收到重复的数据,对于RxJava而言可以使用distinctUntilChanged来进行过滤,对于LiveData而言可以使用Transformations.distinctUntilChanged来过滤。

    以上仅以RxJava Observable为例展示了数据流是如何实现的,对于Flowable而言是类似的。但是对于LiveData而言,跟Observable就不太一样了,中间多了很多处理(主要是防止在我们没有保存LiveData的情况下,LiveData被意外回收),整个流程下来比较繁琐,但是思路是一样的,这里就省略了。

    5. 总结

    Room实现数据流的流程

    图中有一些不合适的地方,removeObserver并不是有Dao调用的,而是查询返回的RaJava Observable或者LiveData“不再用”了的时候被各自调用的。其中,“重新查询”的意思就是数据流被更新,数据流相关的RaJava Observable或者LiveData都没有在包含在图中。
    能从Room分析的第一篇文章看到这里的,应该颁发一个最佳坚持奖,请留言或者点赞让我看见你们的双手。这是这一系列文章的最后一篇,文章题目叫“从Room源码看抽象与封装”,主要是因为写第一篇文章的时候,想从“抽象与封装”作为切入点去分析Room的源码,没有想到会写这么长。前面几篇文章还是比较符合题目的,后面的文章基本上就剩源码解析了,不是说这里面不包含抽象封装的内容,而是源码已经比较复杂了,再去谈抽象封装只会更加混乱。总之,希望你有所收获,欢迎留言与我交流。

    相关文章

      网友评论

          本文标题:从Room源码看抽象与封装——数据流

          本文链接:https://www.haomeiwen.com/subject/hdpfkctx.html