美文网首页Android开发Android开发经验谈
Room是怎样和LiveData结合使用的?(源码分析)

Room是怎样和LiveData结合使用的?(源码分析)

作者: 渔船Mr_Liu | 来源:发表于2019-03-14 00:18 被阅读10次

    前言

    之前写项目的时候,对于数据库的操作不是特别多,能避免就尽量避免,并且一直想不到比较好的方法去组织网络数据、本地数据的逻辑。所以在最近的面试中时,问及项目中的数据库实现,以及比较好用的数据库的框架及其实现原理时,我就只答道之前在《第一行代码》中看到了的LitePal,但源码就...所以这次来恶补一次数据库。几经搜索,云比较,比较青睐官方Jetpack组件中的Room

    Room简介

    Room框架是使用生成代码的方式在编译时生成CRUD的代码,因此性能是远远好过通过反射实现的ORM框架。但是事实上,Room最吸引我的地方不止是性能,Room对架构组件(LiveData)、RxJava等流行框架做了适配。例如,Room中的查询操作可以返回一个LiveData<XXX>,并且,每一次RUD操作,都会更新LiveData。这可以大大简化本地、内存、网络多级缓存的实现,具体官方也给出了一系列Demo,并且随时都在随着框架或者根据PR更新,强烈推荐研究这些Demo!

    本文主要是对Room中与LiveData的联动作出分析,阅读本文前建议先熟悉Room的基本使用,建议看一下与LiveData配合使用的Demo。

    正文

    创建相关类

    AppDatabase.kt

    @Database(entities = [Book::class], version = 1)
    abstract class AppDatabase : RoomDatabase() {
        abstract fun bookDao(): BookDao
    }
    

    Book.kt

    @Dao
    interface BookDao {
        @Insert
        fun insert(book: Book): Long
    
        @Delete
        fun delete(book: Book)
        
        @Query("select * from book where id = :id")
        fun queryById(id: Long): LiveData<Book>
    }
    

    使用数据库:

    val db = Room.databaseBuilder(applicationContext, AppDatabase::class.java, "test.db")
                .build()
            db.bookDao().queryById(1).observe(this, Observer {
                // do something when book update
            })
    

    这样在Observer里面就可以接收到任何时候数据库id=1的数据修改操作了。

    生成代码并分析

    Build -> Make Project 编译,会生成Room相关代码,如果是kapt的话,生成代码目录应该是{项目目录}/app/build/generated/source/kapt/debug/{包路径}/下。
    我们可以看到生成了AppDatabase_Impl和BookDao_Impl两个代码文件,分别对应前面贴出来的AppDatabase的实现类和BookDao的实现类。

    image

    AppDatabase_Impl则是表的创建、删除相关代码,Dao则是具体表的CRUD操作。这里我们重点关系生成的查询方法。
    BookDao_Impl#

    @Override
    public LiveData<Book> queryById(final long id) {
        final String _sql = "select * from book where id = ?";
        final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1);
        int _argIndex = 1;
        _statement.bindLong(_argIndex, id);
        return __db.getInvalidationTracker().createLiveData(new String[]{"book"}, new Callable<Book>() {
            @Override
            public Book call() throws Exception {
                final Cursor _cursor = DBUtil.query(__db, _statement, false);
                try {
                    final int _cursorIndexOfId = CursorUtil.getColumnIndexOrThrow(_cursor, "id");
                    final int _cursorIndexOfName = CursorUtil.getColumnIndexOrThrow(_cursor, "name");
                    final int _cursorIndexOfAuthor = CursorUtil.getColumnIndexOrThrow(_cursor, "author");
                    final int _cursorIndexOfPrice = CursorUtil.getColumnIndexOrThrow(_cursor, "price");
                    final Book _result;
                    if (_cursor.moveToFirst()) {
                        final long _tmpId;
                        _tmpId = _cursor.getLong(_cursorIndexOfId);
                        final String _tmpName;
                        _tmpName = _cursor.getString(_cursorIndexOfName);
                        final String _tmpAuthor;
                        _tmpAuthor = _cursor.getString(_cursorIndexOfAuthor);
                        final float _tmpPrice;
                        _tmpPrice = _cursor.getFloat(_cursorIndexOfPrice);
                        _result = new Book(_tmpId, _tmpName, _tmpAuthor, _tmpPrice);
                    } else {
                        _result = null;
                    }
                    return _result;
                } finally {
                    _cursor.close();
                }
            }
    
            @Override
            protected void finalize() {
                _statement.release();
            }
        });
    }
    

    注意这一行

    return __db.getInvalidationTracker().createLiveData(...);
    

    我们跟进去,最终创建的是一个RoomTrackingLiveData,是一个继承了LiveData的类。下面是它的构造方法。从构造方法来看,比较可疑的对象的是InvalidationTracker.Observer这个类,并且实现十有八九是观察者模式。而最后的回调也多半是onInvalidated方法。

    @SuppressLint("RestrictedApi")
    RoomTrackingLiveData(
            RoomDatabase database,
            InvalidationLiveDataContainer container,
            Callable<T> computeFunction,
            String[] tableNames) {
        mDatabase = database;
        mComputeFunction = computeFunction;
        mContainer = container;
        mObserver = new InvalidationTracker.Observer(tableNames) {
            @Override
            public void onInvalidated(@NonNull Set<String> tables) {
                ArchTaskExecutor.getInstance().executeOnMainThread(mInvalidationRunnable);
            }
        };
    }
    

    而在RoomTrackingLiveData中,重写了onActive方法。其中mContainer是InvalidationLiveDataContainer,文档上有写仅仅是维护LiveData的强引用,防止正在使用的LiveData被回收,跟本文目标没关系,可忽略。而后面的就有意思了,通过Excutor执行了一个任务,所以,我们来看一下这个任务把。

    @Override
    protected void onActive() {
        super.onActive();
        mContainer.onActive(this);
        mDatabase.getQueryExecutor().execute(mRefreshRunnable);
    }
    

    mRefreshRunnable#run()

    // mRegisteredObserver是否注册的标志
    if (mRegisteredObserver.compareAndSet(false, true)) {
        mDatabase.getInvalidationTracker().addWeakObserver(mObserver);
    }
    boolean computed;
    do {
        computed = false;
        if (mComputing.compareAndSet(false, true)) {
            try {
                T value = null;
                while (mInvalid.compareAndSet(true, false)) {
                    computed = true;
                    try {
                        // Dao实现类中返回LiveData时传入的一个参数,用于查询,并将数据组装成一个实体类
                        value = mComputeFunction.call();
                    } catch (Exception e) {
                        throw new RuntimeException("Exception while computing database"
                                + " live data.", e);
                    }
                }
                if (computed) {
                    postValue(value);
                }
            } finally {
                mComputing.set(false);
            }
        }
    } while (computed && mInvalid.get());
    

    这段代码后段通过CAS去完成一次数据库的查询,组装成实体类并postValue,即更新LiveData。
    注意到这个代码前段调用了InvalidationTracker的addWeakObserver,这个方法就应该就是订阅了。

    InvalidationTracker#addWeakObserver

    public void addWeakObserver(Observer observer) {
        addObserver(new WeakObserver(this, observer));
    }
    

    InvalidationTracker#addObserver

    public void addObserver(@NonNull Observer observer) {
        final String[] tableNames = resolveViews(observer.mTables);
        int[] tableIds = new int[tableNames.length];
        final int size = tableNames.length;
    
        for (int i = 0; i < size; i++) {
            Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
            if (tableId == null) {
                throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
            }
            tableIds[i] = tableId;
        }
        ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames);
        ObserverWrapper currentObserver;
        synchronized (mObserverMap) {
            currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
        }
        if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
            syncTriggers();
        }
    }
    

    InvalidationTracker$WeakObserver

    static class WeakObserver extends Observer {
        final InvalidationTracker mTracker;
        final WeakReference<Observer> mDelegateRef;
    
        WeakObserver(InvalidationTracker tracker, Observer delegate) {
            super(delegate.mTables);
            mTracker = tracker;
            mDelegateRef = new WeakReference<>(delegate);
        }
    
        @Override
        public void onInvalidated(@NonNull Set<String> tables) {
            final Observer observer = mDelegateRef.get();
            if (observer == null) {
                mTracker.removeObserver(this);
            } else {
                observer.onInvalidated(tables);
            }
        }
    }
    

    可以看到,WeakObserver就是对Observer一个弱引用的包装。而在addObserver中,根据observer中tableNames,对更新了InvalidationTracker的订阅记录。添加成功后,最后会调用onAdded。

    boolean onAdded(int... tableIds) {
        boolean needTriggerSync = false;
        synchronized (this) {
            for (int tableId : tableIds) {
                final long prevObserverCount = mTableObservers[tableId];
                mTableObservers[tableId] = prevObserverCount + 1;
                if (prevObserverCount == 0) {
                    mNeedsSync = true;
                    needTriggerSync = true;
                }
            }
        }
        return needTriggerSync;
    }
    

    这里mTableObservers是对每个table的observer进行计数。为什么要计数呢?我们接着看。在发现了订阅数从0->1的table时,这个方法会返回true,如果它返回true,会执行syncTriggers()方法,经过调用会执行这一段代码:

    final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
    if (tablesToSync == null) {
        return;
    }
    final int limit = tablesToSync.length;
    try {
        database.beginTransaction();
        for (int tableId = 0; tableId < limit; tableId++) {
            switch (tablesToSync[tableId]) {
                case ObservedTableTracker.ADD:
                    startTrackingTable(database, tableId);
                    break;
                case ObservedTableTracker.REMOVE:
                    stopTrackingTable(database, tableId);
                    break;
            }
        }
        database.setTransactionSuccessful();
    } finally {
        database.endTransaction();
    }
    

    InvalidationTracker#getTablesToSync()

    int[] getTablesToSync() {
        synchronized (this) {
            if (!mNeedsSync || mPendingSync) {
                return null;
            }
            final int tableCount = mTableObservers.length;
            for (int i = 0; i < tableCount; i++) {
                final boolean newState = mTableObservers[i] > 0;
                if (newState != mTriggerStates[i]) {
                    mTriggerStateChanges[i] = newState ? ADD : REMOVE;
                } else {
                    mTriggerStateChanges[i] = NO_OP;
                }
                mTriggerStates[i] = newState;
            }
            mPendingSync = true;
            mNeedsSync = false;
            return mTriggerStateChanges;
        }
    }
    

    这个getTablesToSync方法很短,但这里就体现了observer计数的作用,它遍历这个表,找出计数与之前不一样的,如果由一个大于0的数变为->0,表明现在没有observer订阅它,返回REMOVE,0->n,返回ADD,否则NO_OP。对于返回ADD的表,就应该是会监听变化的表了。它会执行startTrackingTable方法。

    private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
        final String tableName = mTableNames[tableId];
        StringBuilder stringBuilder = new StringBuilder();
        for (String trigger : TRIGGERS) {
            stringBuilder.setLength(0);
            stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
            appendTriggerName(stringBuilder, tableName, trigger);
            stringBuilder.append(" AFTER ")
                    .append(trigger)
                    .append(" ON `")
                    .append(tableName)
                    .append("` BEGIN INSERT OR REPLACE INTO ")
                    .append(UPDATE_TABLE_NAME)
                    .append(" VALUES(null, ")
                    .append(tableId)
                    .append("); END");
            writableDb.execSQL(stringBuilder.toString());
        }
    }
    

    到这里我们就很清楚了:实现监听修改的方法是触发器。 (不过我之前仅仅是听说过触发器,很少用过,如果不了解,这里有一份简易的教程)。而触发器关心的操作是这一些:

    private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
    

    对应着更新、删除、插入。当有这些操作时,根据上述触发器语句,会更新一个由InvalidationTracker维护的表"UPDATE_TABLE_NAME"。
    InvalidationTracker#UPDATE_TABLE_NAME

    private static final String UPDATE_TABLE_NAME = "room_table_modification_log";
    

    InvalidationTracker#internalInit

    void internalInit(SupportSQLiteDatabase database) {
        synchronized (this) {
            if (mInitialized) {
                Log.e(Room.LOG_TAG, "Invalidation tracker is initialized twice :/.");
                return;
            }
    
            database.beginTransaction();
            try {
                database.execSQL("PRAGMA temp_store = MEMORY;");
                database.execSQL("PRAGMA recursive_triggers='ON';");
                database.execSQL(CREATE_TRACKING_TABLE_SQL);
                database.setTransactionSuccessful();
            } finally {
                database.endTransaction();
            }
            syncTriggers(database);
            mCleanupStatement = database.compileStatement(RESET_UPDATED_TABLES_SQL);
            mInitialized = true;
        }
    }
    

    注意到表中有这样一列:

    INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0
    

    在触发器设置的是更新操作时会被设置为1。所以,应该就是检验这个值来判断表是否有更新。那么是哪里进行判断呢?我们可以从一个更新操作开始找,例如BookDao_Impl#insert()

    @Override
    public long insert(final Book book) {
        __db.beginTransaction();
        try {
            long _result = __insertionAdapterOfBook.insertAndReturnId(book);
            __db.setTransactionSuccessful();
            return _result;
        } finally {
            __db.endTransaction();
        }
    }
    

    最后发现在endTransaction中调用了InvalidationTracker的refreshVersionsAsync方法。而在这个方法中,最终会运行InvalidationTracker的mRefreshRunnable对象的run方法。(注意,和上文的mRefreshRunnbale属于不同类,不是同一个对象。)
    RoomDatabase#endTransaction()

    public void endTransaction() {
        mOpenHelper.getWritableDatabase().endTransaction();
        if (!inTransaction()) {
            // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
            // endTransaction call to do it.
            mInvalidationTracker.refreshVersionsAsync();
        }
    }
    

    InvalidationTracker#mRefreshRunnable#run()

    inal Lock closeLock = mDatabase.getCloseLock();
    boolean hasUpdatedTable = false;
    try {
        ... 省略
    
        if (mDatabase.mWriteAheadLoggingEnabled) {
            // This transaction has to be on the underlying DB rather than the RoomDatabase
            // in order to avoid a recursive loop after endTransaction.
            SupportSQLiteDatabase db = mDatabase.getOpenHelper().getWritableDatabase();
            db.beginTransaction();
            try {
                hasUpdatedTable = checkUpdatedTable();
                db.setTransactionSuccessful();
            } finally {
                db.endTransaction();
            }
        } else {
            hasUpdatedTable = checkUpdatedTable();
        }
    } catch (IllegalStateException | SQLiteException exception) {
        // may happen if db is closed. just log.
        Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
                exception);
    } finally {
        closeLock.unlock();
    }
    if (hasUpdatedTable) {
        // 分发给Observer,最终会更新LiveData
        synchronized (mObserverMap) {
            for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
                entry.getValue().notifyByTableVersions(mTableInvalidStatus);
            }
        }
        // Reset invalidated status flags.
        mTableInvalidStatus.clear();
    }
    

    注意,hasUpdatedTable = checkUpdatedTable();

    private boolean checkUpdatedTable() {
        boolean hasUpdatedTable = false;
        Cursor cursor = mDatabase.query(new SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL));
        //noinspection TryFinallyCanBeTryWithResources
        try {
            while (cursor.moveToNext()) {
                final int tableId = cursor.getInt(0);
                mTableInvalidStatus.set(tableId);
                hasUpdatedTable = true;
            }
        } finally {
            cursor.close();
        }
        if (hasUpdatedTable) {
            mCleanupStatement.executeUpdateDelete();
        }
        return hasUpdatedTable;
    }
    
    @VisibleForTesting
    static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
            + " WHERE " + INVALIDATED_COLUMN_NAME + " = 1;";
    

    果然,是查找"UPDATE_TABLE_NAME"这个表中"INVALIDATED_COLUMN_NAME"这列为1的记录,然后设置自己的状态。完成这个过程就分发给自己的Observers。

    void notifyByTableVersions(BitSet tableInvalidStatus) {
        ...
        if (invalidatedTables != null) {
            mObserver.onInvalidated(invalidatedTables);
        }
    }
    

    而在前文中有说到,注册的Observer实际上是RoomTrackingLiveData的mObserver的包装,最终会调用到它的onInvalidated。

    mObserver = new InvalidationTracker.Observer(tableNames) {
        @Override
        public void onInvalidated(@NonNull Set<String> tables) {
            ArchTaskExecutor.getInstance().executeOnMainThread(mInvalidationRunnable);
        }
    }
    
    final Runnable mInvalidationRunnable = new Runnable() {
        @MainThread
        @Override
        public void run() {
            boolean isActive = hasActiveObservers();
            if (mInvalid.compareAndSet(false, true)) {
                if (isActive) {
                    mDatabase.getQueryExecutor().execute(mRefreshRunnable);
                }
            }
        }
    };
    

    可见,最后会在线程池中执行RoomTrackingLiveData的mRefreshRunnable任务。这个任务前文已经分析过了,通过CAS的方式查询数据,并post给LiveData,这样就实现了数据更新的通知。到这里,Room和LiveData联动的工作原理就大致分析完毕。

    写文章不易,转载请注明出处@渔船Mr_Liu

    相关文章

      网友评论

        本文标题:Room是怎样和LiveData结合使用的?(源码分析)

        本文链接:https://www.haomeiwen.com/subject/hqhfmqtx.html