AAC-Room 跨进程分析

作者: ukyoo | 来源:发表于2019-09-26 15:40 被阅读0次

SQLite是安卓数据持久化的重要手段. 为了便捷的操作sql, 衍生了很多ORM (Object/Relation Mapping 对象关系映射). Android端比较流行的有 GreenDao, Realm, LitePal等等.

Google在 AAC 中也推出了自己的 ORM 框架 Room. 除了基础的 runtime 包, apt注解包. 还提供了对 AAC LiveData 和老牌 RxJava 两种观察者模式框架的扩展.

dependencies {
  def room_version = "2.2.0-rc01"

  implementation "androidx.room:room-runtime:$room_version"  //Room

  //编译期注解, kotlin使用kapt代替annotationProcessor
  annotationProcessor "androidx.room:room-compiler:$room_version" 
  kapt "androidx.room:room-compiler:$rroom_version"  

  // ktx扩展和协程
  implementation "androidx.room:room-ktx:$room_version"

  // Room的RxJava扩展
  implementation "androidx.room:room-rxjava2:$room_version"

  // Room的的Google guava扩展
  implementation "androidx.room:room-guava:$room_version"

  // Test helpers
  testImplementation "androidx.room:room-testing:$room_version"
}

和别的 ORM 框架一样, Room 使用了大量编译时注解帮助我们生成模板代码 (Java/annotationProcesser 和 Kotlin/kapt). 包括主要的类注解 :

  • DataBase : 数据库实例
  • Dao : 数据访问对象, 维护了数据库增删改查的方法
  • Entity : 映射表结构的对象

使用方法和别的 ORM 差异不大, 具体参考官方文档. 这里介绍的是 Room 的数据跨进程共享的实现.

ContentProvider

提到数据库跨进程, 首先想到的肯定是ContentProvider, 他用了两套C/S模型:

  1. 多进程操作数据库
    多个进程(客户端)连接ContentProvider(服务端), ContentProvider 提供Binder实现给客户端调用.
  2. 跨进程数据共享
    当一个进程操作ContentProvider变更数据之后,可能希望其他进程能收到通知.
    客户端通过 getContentResolver().registerContentObserver() 注册ContentObserver, 他提供 Binder 并把 Binder 交给系统级别的服务 ContentService, 更新就能通过服务端分发到各个客户端.
Room

Room 也支持跨进程数据共享, 只需在构造对象的时候添加 enableMultiInstanceInvalidation() 选项

@Synchronized
fun getDataBase(): AppDataBase {
    return Room.databaseBuilder(
        App.instance(),
        AppDataBase::class.java, "databasenameXXXX"
    )
        .enableMultiInstanceInvalidation()
        .build()
}

他的实现原理是什么呢? 看官网的一段注释 :

Note: If your app runs in a single process, you should follow the singleton design pattern when instantiating an AppDatabase object. Each RoomDatabase instance is fairly expensive, and you rarely need access to multiple instances within a single process.

如果你的APP运行在单进程中, 你应该设置 Room 为单例, 因为获取 RoomDatabase 实例的成本较高, 且很少需要在一个进程中创建多个实例.

If your app runs in multiple processes, include enableMultiInstanceInvalidation() in your database builder invocation. That way, when you have an instance of AppDatabase in each process, you can invalidate the shared database file in one process, and this invalidation automatically propagates to the instances of AppDatabase within other processes.

如果你的APP是多进程的, 在构造 database 时配置enableMultiInstanceInvalidation()选项, 那么不同进程就会根据同一个db文件创建属于各自进程的单独实例. 并且当一个进程中的实例失效(发生变化)时, 会自动将失效传播到别的进程中.

初始化

db.init() 初始化时, 将多进程的处理交给 InvalidationTracker 对象.

public abstract class RoomDatabase {

    public static class Builder<T extends RoomDatabase> {

        private boolean mMultiInstanceInvalidation;
        @NonNull
        public Builder<T> enableMultiInstanceInvalidation() {
            mMultiInstanceInvalidation = mName != null;
            return this;
        }

        public T build() {
            ....
            //DatabaseConfiguration保存配置项
            DatabaseConfiguration configuration =
                    new DatabaseConfiguration(....mMultiInstanceInvalidation....);
            T db = Room.getGeneratedImplementation(mDatabaseClass, DB_IMPL_SUFFIX);
            db.init(configuration);  //初始化db
            return db;
        }
    }

    @CallSuper
    public void init(@NonNull DatabaseConfiguration configuration) {
        mOpenHelper = createOpenHelper(configuration);
        ....
        //如果配置了多进程选项
        if (configuration.multiInstanceInvalidation) {
            //使用 mInvalidationTracker 来处理多进程的情况
            mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,
                    configuration.name);
        }
    }
}

InvalidationTracker 在apt帮我们生成的 RoomDatabase 实现 AppDataBase_Impl 中创建. 他和 RoomDatabase 对象是一对一的关系.

public final class AppDataBase_Impl extends AppDataBase {
  @Override
  protected InvalidationTracker createInvalidationTracker() {
    final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0);
    HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0);
    return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "nodemodel","user_profile");
  }
}

mInvalidationTracker.startMultiInstanceInvalidation 中创建了 MultiInstanceInvalidationClient 对象.


public class InvalidationTracker {
    private MultiInstanceInvalidationClient mMultiInstanceInvalidationClient;
    //name是数据库的名字, executor是一个异步的线程池
    void startMultiInstanceInvalidation(Context context, String name) {
        mMultiInstanceInvalidationClient = new MultiInstanceInvalidationClient(context, name, this,
                mDatabase.getQueryExecutor());
    }
}

MultiInstanceInvalidationClient 的构造方法中使用了 bindService() 连接MultiInstanceInvalidationService, 这里使用了绑定服务来进行一个 Service 和多个进程的 Client 的IPC.

/**
 * Handles all the communication from {@link RoomDatabase} and {@link InvalidationTracker} to
 * {@link MultiInstanceInvalidationService}.
 */
class MultiInstanceInvalidationClient {

    MultiInstanceInvalidationClient(Context context, String name,
            InvalidationTracker invalidationTracker, Executor executor) {
        mContext = context.getApplicationContext();
        mName = name;
        mInvalidationTracker = invalidationTracker;
        mExecutor = executor;
        mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
            @Override
            public void onInvalidated(@NonNull Set<String> tables) {
                if (mStopped.get()) {
                    return;
                }
                try {
                    mService.broadcastInvalidation(mClientId,
                            tables.toArray(new String[0]));
                } catch (RemoteException e) {
                    Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
                }
            }

            @Override
            boolean isRemote() {
                return true;
            }
        };
        Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
        mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
    }
}

绑定服务介绍 :
创建提供绑定的服务时,您必须提供 IBinder,进而提供编程接口,以便客户端使用此接口与服务进行交互. 您可以通过三种方法定义接口 :

Room 中使用的是 AIDL, 使用必须执行以下步骤:

  1. 创建 aidl 接口文件, 在文件内声明方法
  2. 编译时会根据 adil 生成 XXInterface.java 接口文件 , 内部类 XXInterface.Stub 实现了 IBinder 接口并声明了 aidl 内定义的方法
  3. Service 的 public IBinder onBind(Intent intent) 方法 , 返回一个 XXInterface.Stub 的实现类实例, 其定义了服务的远程过程调用 (RPC) 接口, 给客户端调用.
  4. Client 调用 bindService() 连接 Service, 在连接成功的 onServiceConnected(ComponentName name, IBinder service) 中将 Service 的 IBinder 转换为 XXInterface 对象. 通过这个对象就可以调用 Service 中定义的方法实现多个 Client 和 Service 的跨进程通信.

MultiInstanceInvalidationClient

对于同一个db文件, 每个进程可以创建自己的 RoomDatabase 对象. 在 RoomDataBase 中创建了 InvalidationTracker, 在 InvalidationTracker 创建了 MultiInstanceInvalidationClient.

Client 的构造方法中通过 bindService() 绑定服务,Service#Binder 向 Client 提供跨进程调用的三个方法 :

  • 绑定服务成功后调用 Service#Binder 的 service.registerCallback() 方法.
  • 在构造中的 mObserver 里调用 mService.broadcastInvalidation().
  • 在客户端的 stop() 方法中调用 service.unregisterCallback(mCallback, mClientId)
class MultiInstanceInvalidationClient {
    @Nullable
    IMultiInstanceInvalidationService mService;  //Service中的Binder对象

    MultiInstanceInvalidationClient(Context context, String name,
            InvalidationTracker invalidationTracker, Executor executor) {
        mContext = context.getApplicationContext();
        mName = name;
        mInvalidationTracker = invalidationTracker;
        mExecutor = executor;
        mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
            @Override
            public void onInvalidated(@NonNull Set<String> tables) {
                if (mStopped.get()) {
                    return;
                }
                try {
                    //Client 跨进程调用 Service 的传播更新方法
                    mService.broadcastInvalidation(mClientId,
                            tables.toArray(new String[0]));
                } catch (RemoteException e) {
                    Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
                }
            }

            @Override
            boolean isRemote() {
                return true;
            }
        };
        //绑定服务
        Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
        mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
    }

    final ServiceConnection mServiceConnection = new ServiceConnection() {

        @Override
        public void onServiceConnected(ComponentName name, IBinder service) {
            mService = IMultiInstanceInvalidationService.Stub.asInterface(service);
            //连接服务端成功
            mExecutor.execute(mSetUpRunnable);
        }

        @Override
        public void onServiceDisconnected(ComponentName name) {
            mExecutor.execute(mRemoveObserverRunnable);
            mService = null;
            mContext = null;
        }
    };

    final Runnable mSetUpRunnable = new Runnable() {
        @Override
        public void run() {
            try {
                final IMultiInstanceInvalidationService service = mService;
                if (service != null) {
                    //调用Service#Binder的 registerCallback() 方法
                    mClientId = service.registerCallback(mCallback, mName);
                    //mInvalidationTracker添加Observer
                    mInvalidationTracker.addObserver(mObserver);
                }
            } catch (RemoteException e) {  
                Log.w(Room.LOG_TAG, "Cannot register multi-instance invalidation callback", e);
            }
        }
    };

    final Runnable mRemoveObserverRunnable = new Runnable() {
        @Override
        public void run() {
            //mInvalidationTracker移除Observer
            mInvalidationTracker.removeObserver(mObserver);
        }
    };
}

MultiInstanceInvalidationService

MultiInstanceInvalidationService 作为服务端可以向多个进程中的客户端MultiInstanceInvalidationClient 提供绑定服务.
IMultiInstanceInvalidationService.aidl 作为 AIDL 接口, 定义了三个接口方法给 Service 实现 :

interface IMultiInstanceInvalidationService {
    //注册回调
    int registerCallback(IMultiInstanceInvalidationCallback callback, String name);
    //反注册回调
    void unregisterCallback(IMultiInstanceInvalidationCallback callback, int clientId);
    //传播失效, 即进程间同步
    oneway void broadcastInvalidation(int clientId, in String[] tables);
}

服务端中aidl方法的实现都加上了 synchronized, 因为多个客户端调用服务端的方法存在竞态

1. registerCallback() 注册

registerCallback() 涉及到两个重要的变量 mCallbackListmClientNames.

  • mCallbackList 是个列表, 用于保存服务端对多个远程客户端的callBack. 整个通知更新的过程分为两步:
    Client 通过 Service#Binder 和 Service 建立连接并调用 Service#BinderbroadcastInvalidation()方法让 Service 来传播更新.
    Service 通知其他 Client 更新的过程也涉及到 IPC, 这次由每个 Client 提供 Client#Binder 供 Serivce 调用, mCallbackList 就是用于保存每个客户端的 Binder.
public class MultiInstanceInvalidationService extends Service {
    final RemoteCallbackList<IMultiInstanceInvalidationCallback> mCallbackList =
            new RemoteCallbackList<IMultiInstanceInvalidationCallback>() {
                @Override
                public void onCallbackDied(IMultiInstanceInvalidationCallback callback,
                        Object cookie) {
                    //如果Client的Binder挂掉了, 就从mClientNames移除
                    mClientNames.remove((int) cookie);
                }
            };
    
    // Service 提供给Client的 Binder : IMultiInstanceInvalidationService.Stub
    private final IMultiInstanceInvalidationService.Stub mBinder =
            new IMultiInstanceInvalidationService.Stub() {

                // Assigns a client ID to the client.
                @Override
                public int registerCallback(IMultiInstanceInvalidationCallback callback,
                        String name) {
                    if (name == null) {
                        return 0;
                    }
                    synchronized (mCallbackList) {
                        int clientId = ++mMaxClientId;
                        // Use the client ID as the RemoteCallbackList cookie.
                        if (mCallbackList.register(callback, clientId)) {
                            mClientNames.append(clientId, name);
                            return clientId;
                        } else {
                            --mMaxClientId;
                            return 0;
                        }
                    }
                }

}

// 客户端提供Binder: IMultiInstanceInvalidationCallback.Stub 给服务端调用
class MultiInstanceInvalidationClient {
    final IMultiInstanceInvalidationCallback mCallback =
            new IMultiInstanceInvalidationCallback.Stub() {
                @Override
                public void onInvalidation(final String[] tables) {
                    mExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            mInvalidationTracker.notifyObserversByTableNames(tables);
                        }
                    });
                }
            };
}

  • mClientNames 使用 SparseArrayCompat<String> 保存所有连接他的客户端名称, 调用 mClientNames.append(clientId, name) 把客户端加入集合.
    key : clientId 由Service分配, 每次+1.
    value : clientName 就是 RoomDatabase 数据库的名字.

2. unregisterCallback() 反注册

mCallbackList, mClientNames 集合移除对应的客户端.

@Override
public void unregisterCallback(IMultiInstanceInvalidationCallback callback,
        int clientId) {
    synchronized (mCallbackList) {
        mCallbackList.unregister(callback);
        mClientNames.remove(clientId);
    }
}

3. broadcastInvalidation 传播更新(将一个客户端的变化传播到别的客户端)

callback 作为 Client#Binder, 当 Client 调用过 Service#Binder 的 registerCallback() 后, 就将 callBack 添加到了 Service 的 mCallbackList 中.

当 Client 调用 Service#Binder 的 broadcastInvalidation() 传播更新时, 遍历 mCallbackList 调用 Client#Binder 的 callback.onInvalidation() 就将更新的逻辑通知到了各个 Client 的 onInvalidation() 实现.

// Broadcasts table invalidation to other instances of the same database file.
// The broadcast is not sent to the caller itself.
@Override
public void broadcastInvalidation(int clientId, String[] tables) {
    //服务端和客户端是1/N的关系, 存在竞态, 方法加锁
    synchronized (mCallbackList) {
        String name = mClientNames.get(clientId);
        if (name == null) {
            Log.w(Room.LOG_TAG, "Remote invalidation client ID not registered");
            return;
        }
        int count = mCallbackList.beginBroadcast();
        try {
            //遍历mCallbackList
            for (int i = 0; i < count; i++) {
                int targetClientId = (int) mCallbackList.getBroadcastCookie(i);
                String targetName = mClientNames.get(targetClientId);  
                //调用这个方法的客户端不需要参与同步, name不同即不是一个数据库文件的客户端不需要同步
                if (clientId == targetClientId // This is the caller itself.
                        || !name.equals(targetName)) { // Not the same file.
                    continue;
                }
                try {
                    //这里涉及到服务端调用客户端的过程, 也是通过aidl
                    IMultiInstanceInvalidationCallback callback =
                            mCallbackList.getBroadcastItem(i);
                    callback.onInvalidation(tables);
                } catch (RemoteException e) {
                    Log.w(Room.LOG_TAG, "Error invoking a remote callback", e);
                }
            }
        } finally {
            mCallbackList.finishBroadcast();
        }
    }
}

遍历时过滤掉不需要同步的客户端

  1. clientId == targetClientId
    发起传播的客户端肯定不需要通知自己
  2. !name.equals(targetName)
    一个APP可以有多个本地db文件, 我们需要同步的是同一个db文件在不同进程中的 RoomDatabase 对象对应的 MultiInstanceInvalidationClient (他的name就是数据库的名字).
    如果 name 不同, 意味着不是同一个db文件的Client, 不需要参与同步.

传播更新

传播更新由 MultiInstanceInvalidationService 调用 MultiInstanceInvalidationClient 中的 Binder 方法来实现, 同样是通过AIDL.

IMultiInstanceInvalidationCallback.aidl 只定义了一个方法 :

interface IMultiInstanceInvalidationCallback {
    oneway void onInvalidation(in String[] tables);
}

MultiInstanceInvalidationClient 中的实现 :

class MultiInstanceInvalidationClient {

    final IMultiInstanceInvalidationCallback mCallback =
            new IMultiInstanceInvalidationCallback.Stub() {
                @Override
                public void onInvalidation(final String[] tables) {
                    mExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            //将数据库中表的更新通知给观察者
                            mInvalidationTracker.notifyObserversByTableNames(tables);
                        }
                    });
                }
            };
}

所以 MultiInstanceInvalidationService # broadcastInvalidation()MultiInstanceInvalidationClient # callback.onInvalidation()mInvalidationTracker.notifyObserversByTableNames(tables), 最终更新会交给每个Client 对应的 InvalidationTracker 处理.

接收更新的MultiInstanceInvalidationClient

InvalidationTracker 中掉用 notifyObserversByTableNames(String... tables) 通知观察者的集合 mObserverMap.

public class InvalidationTracker {

    public void notifyObserversByTableNames(String... tables) {
        synchronized (mObserverMap) {
            //mObserverMap 中保存了当前进程的数据库观察者
            for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
                //!entry.getKey().isRemote() 排除了Client构造方法内的Observer
                if (!entry.getKey().isRemote()) {
                    //这里的tables就是当前进程修改过, 需要同步到别的进程的表
                    entry.getValue().notifyByTableNames(tables);
                }
            }
        }
    }

    static class ObserverWrapper {
        void notifyByTableNames(String[] tables) {
            Set<String> invalidatedTables = null;
            if (mTableNames.length == 1) {
                for (String table : tables) {
                    if (table.equalsIgnoreCase(mTableNames[0])) {
                        // Optimization for a single-table observer
                        invalidatedTables = mSingleTableSet;
                        break;
                    }
                }
            } else {
                ArraySet<String> set = new ArraySet<>();
                for (String table : tables) {  //遍历传播更新的Client修改过的表
                    for (String ourTable : mTableNames) { //遍历当前Client的所有表
                        if (ourTable.equalsIgnoreCase(table)) { //只把需要更新的表加入set集合
                            set.add(ourTable);
                            break;
                        }
                    }
                }
                if (set.size() > 0) {
                    invalidatedTables = set;
                }
            }
            if (invalidatedTables != null) {  //通知观察者更新表
                mObserver.onInvalidated(invalidatedTables);
            }
        }
    }
}

Q: 那么mObserverMap 中的 InvalidationTracker.Observer 在哪里被添加呢?**
A: InvalidationTracker.Observer 就是观察者, 他是一个抽象类, 在 LiveData 和 RxJava 两种观察者模式框架中都提供了他的实现.

我们以 RxJava 为例, 举个例子 :

@Dao
interface NodeModelDao {
    @Query("SELECT * from nodemodel")
    fun getAll(): Flowable<List<NodeModel>>
}

查看 kapt 编译后生成的 NodeModelDaoImpl.java 文件, 可以看到 Room 帮助我们在Callablecall() 方法中生成了查询的Sql操作.

public final class NodeModelDao_Impl implements NodeModelDao {
  @Override
  public Flowable<List<NodeModel>> getAll() {
    final String _sql = "SELECT * from nodemodel";
    final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0);
    return RxRoom.createFlowable(__db, new String[]{"nodemodel"}, new Callable<List<NodeModel>>() {
      @Override
      public List<NodeModel> call() throws Exception {
        final Cursor _cursor = DBUtil.query(__db, _statement, false);
        try {
            .....
            _item = new NodeModel(_tmpUid,_tmpId,_tmpName,_tmpTitle,_tmpTitleAlternative,_tmpUrl,_tmpTopics,_tmpHeader,_tmpFooter,_tmpIsCollected);
            _result.add(_item);
          return _result;
        } finally {
          _cursor.close();
        }
      }
  }
}

查看 RxRoom.createFlowable 方法, 他的内部会创建一个 InvalidationTracker.Observer 实例 observer, 接着调用 database.getInvalidationTracker().addObserver(observer)observer 添加到 InvalidationTracker 的 mObserverMap 中. observeronInvalidated() 方法会调用 emitter.onNext(NOTHING) 再次发射, 发射后会调用 flatMapMaybe() 转换流为 maybe 返回给观察者, 而 maybe = Maybe.fromCallable(callable), Callable 内的 call() 实现就是上面提到的Sql查询操作.

public class RxRoom {

    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    public static <T> Flowable<T> createFlowable(final RoomDatabase database,
            final String[] tableNames, final Callable<T> callable) {
        Scheduler scheduler = Schedulers.from(database.getQueryExecutor());
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createFlowable(database, tableNames)
                .observeOn(scheduler)
                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                    @Override
                    public MaybeSource<T> apply(Object o) throws Exception {
                        return maybe;
                    }
                });
    }

    public static Flowable<Object> createFlowable(final RoomDatabase database,
            final String... tableNames) {
        return Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
                //创建InvalidationTracker.Observer实例
                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                        tableNames) {
                    @Override
                    public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                        if (!emitter.isCancelled()) {
                            //再次发射
                            emitter.onNext(NOTHING);
                        }
                    }
                };
                if (!emitter.isCancelled()) {
                    database.getInvalidationTracker().addObserver(observer);
                    emitter.setDisposable(Disposables.fromAction(new Action() {
                        @Override
                        public void run() throws Exception {
                            database.getInvalidationTracker().removeObserver(observer);
                        }
                    }));
                }

                // emit once to avoid missing any data and also easy chaining
                if (!emitter.isCancelled()) {
                    emitter.onNext(NOTHING);
                }
            }
        }, BackpressureStrategy.LATEST);
    }
}

当别的进程的 Client 修改了数据, Service 会将更新传播到所有进程相同名字的 Client. 每个进程的 Client 对应的 InvalidationTracker 会将需要更新的表 tables 交给内部维护的集合 mObserverMap 处理.

InvalidationTracker.Observer 是个抽象类, 我们使用 LiveData/RxJava 作为 Room 查询的观察者实现时, 会实现 InvalidationTracker.Observer 对象并把它添加到 InvalidationTracker 的 mObserverMap 集合中. 遍历集合并调用 InvalidationTracker.Observer 的 onInvalidated() 实现, 就可以让上游再次发送消息, 再次触发下游观察者的订阅, 到这里更新的流程就结束了.

发起更新的MultiInstanceInvalidationClient

我们已经完成了 Service 通过 Client#Binder 向各个 Client 传播更新的IPC流程, 以及 每个 Client 的 InvalidationTracker 又通过 mObserverMap 通知自己的观察者的流程. 接下来我们只要知道 Service#broadcastInvalidation() 在什么情况下会被调用.

查看Client的构造方法, 这里初始化的 InvalidationTracker.Observer 的 onInvalidated() 中调用了 mService.broadcastInvalidation(). 所以只需要知道这个 observer 的方法什么时候被调用.

MultiInstanceInvalidationClient(Context context, String name,
            InvalidationTracker invalidationTracker, Executor executor) {
        mContext = context.getApplicationContext();
        mName = name;
        mInvalidationTracker = invalidationTracker;
        mExecutor = executor;
        mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
            @Override
            public void onInvalidated(@NonNull Set<String> tables) {
                if (mStopped.get()) {
                    return;
                }
                try {
                    mService.broadcastInvalidation(mClientId,
                            tables.toArray(new String[0]));
                } catch (RemoteException e) {
                    Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
                }
            }

            @Override
            boolean isRemote() {
                return true;
            }
        };
        Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
        mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
    }

查看apt编译后生成的XXDao_Impl, 可以看到包括插入,删除,修改等涉及到写的操作都是通过事务 transcation 来完成的. 这很好理解 写入操作要保证原子性.

public final class NodeModelDao_Impl implements NodeModelDao {
  @Override
  public void insertAll(List<NodeModel> userEntities) {
    __db.beginTransaction();
    try {
      __insertionAdapterOfNodeModel.insert(userEntities);
      __db.setTransactionSuccessful();
    } finally {
      __db.endTransaction();
    }
  }

  @Override
  public void deleteAll() {
    final SupportSQLiteStatement _stmt = __preparedStmtOfDeleteAll.acquire();
    __db.beginTransaction();
    try {
      _stmt.executeUpdateDelete();
      __db.setTransactionSuccessful();
    } finally {
      __db.endTransaction();
      __preparedStmtOfDeleteAll.release(_stmt);
    }
  }
}

RoomDatabase # endTransaction() 在事务结束的方法中, 调用了 mInvalidationTracker.refreshVersionsAsync()

public void endTransaction() {
    mOpenHelper.getWritableDatabase().endTransaction();
    if (!inTransaction()) {
        // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
        // endTransaction call to do it.
        mInvalidationTracker.refreshVersionsAsync();
    }
}

InvalidationTracker # refreshVersionsAsync() , 遍历 mObserverMap, map里包含两种Observer

  • Client 构造方法内用于IPC调用 mService.broadcastInvalidation() 的Observer
  • Client 内做查询操作的观察者的Observer
public void refreshVersionsAsync() {
    // TODO we should consider doing this sync instead of async.
    if (mPendingRefresh.compareAndSet(false, true)) {
        mDatabase.getQueryExecutor().execute(mRefreshRunnable);
    }
}

@VisibleForTesting
Runnable mRefreshRunnable = new Runnable() {
    @Override
    public void run() {
        final Lock closeLock = mDatabase.getCloseLock();
        boolean hasUpdatedTable = false;

        if (hasUpdatedTable) {
            synchronized (mObserverMap) {
                for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
                    entry.getValue().notifyByTableVersions(mTableInvalidStatus);
                }
            }
        }
    }
};

InvalidationTracker.ObserverWrapper # notifyByTableVersions()

void notifyByTableVersions(BitSet tableInvalidStatus) {
    Set<String> invalidatedTables = null;
    final int size = mTableIds.length;
    for (int index = 0; index < size; index++) {
        final int tableId = mTableIds[index];
        if (tableInvalidStatus.get(tableId)) {
            if (size == 1) {
                // Optimization for a single-table observer
                invalidatedTables = mSingleTableSet;
            } else {
                if (invalidatedTables == null) {
                    invalidatedTables = new ArraySet<>(size);
                }
                invalidatedTables.add(mTableNames[index]);
            }
        }
    }
    if (invalidatedTables != null) {
        mObserver.onInvalidated(invalidatedTables);
    }
}

验证

通过 android:process 我们可以最快的模拟跨进程的情况

<activity android:name=".ui.DetailActivity1"/>
<activity android:name=".ui.DetailActivity2"
    android:process=":Process2" />

如图, 当在Process2中对database进行 insert/delete 写入操作时, Process1中的观察者也能响应跨进程的更新, 从而更新UI.


总结

Room 跨进程共享数据使用了两套 C/S 模型, 涉及到两个aidl类. Service 提供 ServiceBinder, 每个 Client 也会提供 ClientBinder. 大概流程 :

  1. 所有 Client 初始化会调用 ServiceBinderregisterCallback() 方法将 ClientBinder 传递给 Service, Service 内维护了 mCallbackList 来保存所有的 ClientBinder .
    当某个 Client 更新时, 会调用 ServiceBinderbroadcastInvalidation() 传播更新.

  2. Service 的 broadcastInvalidation() 会遍历 ClientBinder 的集合 mCallbackList, 将更新交给每个 Client 对应的 InvalidationTracker 处理.

  3. InvalidationTracker 维护了对database数据的观察者集合 mObserverMap, 遍历map调用 observer#onInvalidated() 会触发再次查询的操作, 观察者就可以接收到新的数据.

相关文章

  • AAC-Room 跨进程分析

    SQLite是安卓数据持久化的重要手段. 为了便捷的操作sql, 衍生了很多ORM (Object/Relatio...

  • Android通信v2.0

    跨网络通信 Socket本地通信,进程从zygote进程fork的指令都是用socket本地通信的 跨应用/进程通...

  • 内容提供者与 SQlite 使用注意点笔记

    ContentProvider 是 Android 四大组件之一,方便我们跨应用(跨应用本身就是跨进程)、跨进程访...

  • 进程学习记录1

    跨平台的进程创建模块 1.方法一: 使用 Process 类创建子进程 支持跨平台:windows / linux...

  • 2018-04-15多线程-全局变量-互斥锁

    多线程----threadingthread跨平台较弱,threading可以跨平台调用理解线程:进程里面执行代码...

  • 进程的创建

    进程的创建-multiprocessing multiprocessing模块就是跨平台版本的多进程模块,提供了一...

  • Python多任务-进程

    进程的创建-multiprocessing multiprocessing模块就是跨平台版本的多进程模块,提供了一...

  • python学习日记-2016.7.25

    1.多进程 multiprocessing模块就是跨平台版本的多进程模块。multiprocessing模块提供了...

  • 1.进程-multiprocessing

    1.由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台...

  • windows 匿名管道

    1、匿名管道是进程间通信的一种技术。windows提供的匿名管道技术,不能够跨网络跨机器,只能在同一机器上不同进程...

网友评论

    本文标题:AAC-Room 跨进程分析

    本文链接:https://www.haomeiwen.com/subject/eyubyctx.html