CachingExecutor

作者: 93张先生 | 来源:发表于2020-09-11 09:26 被阅读0次

    CachingExecutor

    CachingExecutor 是一个 Executor 接口的装饰器,它为 Executor 对象增加了二级缓存的相关功能。
    它封装了一个用于执行数据库操作的 Executor 对象,以及一个用于管理缓存的 TransactionalCacheManager 对象。
    TransactionalCache 和 TransactionalCacheManager 是 CachingExecutor 依赖的两个组件。

    组件以及二级缓存关系

    • CachingExecutor 为 Executor 装饰了二级缓存功能
    • TransactionalCacheManager 管理了所有的 以 namespace 为存储单位的二级缓存
    • TransactionalCache 是二级缓存 Cache 的代理类,对外提供了二级缓存的功能,里面包装了二级缓存 Cache 对象(delegate),这个delegate 提供了真实的数据缓存功能。
    image.png
    public class CachingExecutor implements Executor {
    
      private final Executor delegate;
      private final TransactionalCacheManager tcm = new TransactionalCacheManager();
    
      public CachingExecutor(Executor delegate) {
        this.delegate = delegate;
        delegate.setExecutorWrapper(this);
      }
    
      @Override
      public Transaction getTransaction() {
        return delegate.getTransaction();
      }
    
      @Override
      public void close(boolean forceRollback) {
        try {
          //issues #499, #524 and #573
          if (forceRollback) {
            tcm.rollback();
          } else {
            tcm.commit();
          }
        } finally {
          delegate.close(forceRollback);
        }
      }
    
      @Override
      public boolean isClosed() {
        return delegate.isClosed();
      }
    
      @Override
      public int update(MappedStatement ms, Object parameterObject) throws SQLException {
        flushCacheIfRequired(ms);
        return delegate.update(ms, parameterObject);
      }
    
      /**
       * 查询过程
       * (1)获取 BoundSql 对象,创建查询语句对应的 CacheKey 对象
       * (2)检测是否开启了二级缓存,如果没有开启二级缓存,则直接调用底层 Executor 对象的 query() 方法查询数据库。如果开启了二级缓存,则继续后面的步骤
       * (3)检测查询操作是否包含输出类型的参数,如果是这种情况,则报错
       * (4)调用 TransactionalCacheManager.getObject()方法查询二级缓存,如果二级缓存中查找到相应的结果对象,则直接将该结果对象返回。
       * (5)如果二级缓存没有相应的结果对象,则调用底层 Executor 对象的 query() 方法,正如前面介绍的 ,它会先查询一级缓存,一级缓存未命中时,才会查询数据库。
       * 最后还会将得到的结果对象放入 TransactionalCache.entriesToAddOnCommit 集合中保存。
       *
       * @param ms
       * @param parameterObject
       * @param rowBounds
       * @param resultHandler
       * @param <E>
       * @return
       * @throws SQLException
       */
    
      @Override
      public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
        // 步骤1:获取 BoundSql 对象,解析 BoundSql
        BoundSql boundSql = ms.getBoundSql(parameterObject);
        // 创建 CacheKey 对象
        CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
        return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
      }
    
      @Override
      public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
        flushCacheIfRequired(ms);
        return delegate.queryCursor(ms, parameter, rowBounds);
      }
    
      @Override
      public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
          throws SQLException {
        // 获取查询语句所在命名空间对应的二级缓存
        Cache cache = ms.getCache();
        // 步骤2:是否开启了二级缓存功能
        if (cache != null) {
          flushCacheIfRequired(ms);   // 根据 <select> 节点的配置,决定是否需要清空二级缓存
          // 检测 SQL 节点的 useCache 配置以及是否使用了 resultHandler 配置
          if (ms.isUseCache() && resultHandler == null) {
            //步骤3: 二级缓存不能保存输出类型的参数 如果查询操作调用了包含输出参数的存储过程,则报错
            ensureNoOutParams(ms, boundSql);
            // 步骤4:查询二级缓存
            @SuppressWarnings("unchecked")
            List<E> list = (List<E>) tcm.getObject(cache, key);
            if (list == null) {
              // 步骤5:二级缓存没用相应的结果对象,调用封装的 Executor 对象的 query() 方法,这个 query() 方法会先查询一级缓存
              list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
              // 将查询结果保存到 TransactionalCache.entriesToAddOnCommit 集合中
              tcm.putObject(cache, key, list); // issue #578 and #116
            }
            return list;
          }
        }
        // 没有启动二级缓存,直接调用底层 Executor 执行数据数据库查询操作
        return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
      }
    
      /**
       * 调用底层 Executor 的 flushStatements() 方法
       * @return
       * @throws SQLException
       */
      @Override
      public List<BatchResult> flushStatements() throws SQLException {
        return delegate.flushStatements();
      }
    
      /**
       * 首先调用底层 Executor 对象对应的方法完成事务的提交,然后再调用 TransactionalCacheManager 的对应方法完成对二级缓存的相应操作
       * @param required
       * @throws SQLException
       */
      @Override
      public void commit(boolean required) throws SQLException {
        delegate.commit(required);  // 调用底层的 Executor 提交事务
        tcm.commit();   //遍历所有相关的 TransactionalCache 对象执行 commit() 方法
      }
      /**
       * 首先调用底层 Executor 对象对应的方法完成事务的回滚,然后再调用 TransactionalCacheManager 的对应方法完成对二级缓存的相应操作
       * @param required
       * @throws SQLException
       */
      @Override
      public void rollback(boolean required) throws SQLException {
        try {
          delegate.rollback(required); // 调用底层的 Executor 回滚事务
        } finally {
          if (required) {
            tcm.rollback(); //遍历所有相关的 TransactionalCache 对象执行 rollback() 方法
          }
        }
      }
    
      /**
       * 二级缓存不能保存输出类型的参数 如果查询操作调用了包含输出参数的存储过程,则报错
       * @param ms
       * @param boundSql
       */
      private void ensureNoOutParams(MappedStatement ms, BoundSql boundSql) {
        if (ms.getStatementType() == StatementType.CALLABLE) {
          for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
            if (parameterMapping.getMode() != ParameterMode.IN) {
              throw new ExecutorException("Caching stored procedures with OUT params is not supported.  Please configure useCache=false in " + ms.getId() + " statement.");
            }
          }
        }
      }
    
      @Override
      public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
        return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
      }
    
      @Override
      public boolean isCached(MappedStatement ms, CacheKey key) {
        return delegate.isCached(ms, key);
      }
    
      @Override
      public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
        delegate.deferLoad(ms, resultObject, property, key, targetType);
      }
    
      /**
       * 清空一级缓存
       */
      @Override
      public void clearLocalCache() {
        delegate.clearLocalCache();
      }
    
      /**
       * 是否要求刷新 一级和二级缓存
       * @param ms
       */
      private void flushCacheIfRequired(MappedStatement ms) {
        // 获取 二级缓存
        Cache cache = ms.getCache();
        if (cache != null && ms.isFlushCacheRequired()) {
          tcm.clear(cache);
        }
      }
    
      @Override
      public void setExecutorWrapper(Executor executor) {
        throw new UnsupportedOperationException("This method should not be called");
      }
    
    }
    

    TransactionalCacheManager

    TransactionalCacheManager 用于管理 CachingExecutor 使用的二级缓存对象,其中只定义了一个 transactionalCaches,它的 key 是对应的 CachingExecutor 使用的二级缓存对象,value 是相应的 TransactionalCache 对象,在该 TransactionalCache 中封装了对应的二级缓存对象,也就是这里的 key。

    public class TransactionalCacheManager {
    
      private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
    
      /**
       * 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
       * @param cache
       */
      public void clear(Cache cache) {
        getTransactionalCache(cache).clear();
      }
    
      /**
       * 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
       * @param cache
       */
      public Object getObject(Cache cache, CacheKey key) {
        return getTransactionalCache(cache).getObject(key);
      }
    
      /**
       * 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
       * @param cache
       * @param key
       * @param value
       */
      public void putObject(Cache cache, CacheKey key, Object value) {
        getTransactionalCache(cache).putObject(key, value);
      }
    
      /**
       * 遍历 transactionalCaches 集合,调用 TransactionalCache 相应的方法
       */
      public void commit() {
        for (TransactionalCache txCache : transactionalCaches.values()) {
          txCache.commit();
        }
      }
      /**
       * 遍历 transactionalCaches 集合,调用 TransactionalCache 相应的方法
       */
      public void rollback() {
        for (TransactionalCache txCache : transactionalCaches.values()) {
          txCache.rollback();
        }
      }
    
      /**
       * 如果集合中不包含 TransactionalCache,则创建一个,并放入 transactionalCaches 中
       * @param cache
       * @return
       */
      private TransactionalCache getTransactionalCache(Cache cache) {
        return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
      }
    
    }
    

    TransactionalCache

    TransactionalCache 主要用于保存 某个 SqlSession 的 某个事务中需要向 某个二级缓存中添加的缓存 的数据。
    名字为 TransactionalCache 主要是针对解决了二级缓存中一些事务的问题,所以起名字为 TransactionalCache。

    The 2nd level cache transactional buffer.
    This class holds all cache entries that are to be added to the 2nd level cache during a Session.

    public class TransactionalCache implements Cache {
    
      private static final Log log = LogFactory.getLog(TransactionalCache.class);
      // 底层封装的二级缓存所对应的 Cache 对象,以存储 namespace 为单位的 Cache 对象,默认为 PerpetualCache
      private final Cache delegate;
      // 当改字段为 true 时,则表示当前 TransactionalCache 不可查询,且提交事务时会将底层 Cache 清空
      private boolean clearOnCommit;
      // 暂时记录添加到 TransactionalCache 中的数据。在事务提交时,会将其中的数据添加到二级缓存中
      private final Map<Object, Object> entriesToAddOnCommit;
      // 记录缓存未命中的 CacheKey 对象
      private final Set<Object> entriesMissedInCache;
    
      public TransactionalCache(Cache delegate) {
        this.delegate = delegate;
        this.clearOnCommit = false;
        this.entriesToAddOnCommit = new HashMap<>();
        this.entriesMissedInCache = new HashSet<>();
      }
    
      @Override
      public String getId() {
        return delegate.getId();
      }
    
      @Override
      public int getSize() {
        return delegate.getSize();
      }
    
      /**
       * 它首先会查询底层的二级缓存,并将为命中的 key 记录到 entriesMissedInCache,之后根据 clearOnCommit 字段的值决定具体的返回值
       * @param key The key
       * @return
       */
      @Override
      public Object getObject(Object key) {
        // issue #116
        // 查询底层的 Cache 是否包含了指定的 key
        Object object = delegate.getObject(key);
        // 如果底层缓存对象中不包含改缓存项,则将该 key 记录到 entriesMissedInCache 集合中
        if (object == null) {
          entriesMissedInCache.add(key);
        }
        // issue #146
        if (clearOnCommit) {
          return null;
        } else {
          // 返回层底层 Cache 中查询到的对象
          return object;
        }
      }
    
      /**
       * 该方法并没有直接将结果对象记录到其封装的二级缓存中,而是暂时保存在 entriesToAddOnCommit 集合中,
       * 在事务提交时才会将这些结果对象从 entriesToAddOnCommit 集合 添加到二级缓存中。
       * @param key Can be any object but usually it is a {@link CacheKey}
       * @param object
       */
      @Override
      public void putObject(Object key, Object object) {
        entriesToAddOnCommit.put(key, object);
      }
    
      @Override
      public Object removeObject(Object key) {
        return null;
      }
    
      /**
       * 清空 entriesToAddOnCommit 集合,并设置 clearOnCommit 为 true
       */
      @Override
      public void clear() {
        clearOnCommit = true;
        entriesToAddOnCommit.clear();
      }
    
      /**
       * 会根据 clearOnCommit 字段的值决定是否清空二级缓存,然后调用 flushPendingEntries() 方法将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
       */
      public void commit() {
        // 在事务提交前,清空二级缓存
        if (clearOnCommit) {
          delegate.clear();
        }
        // 将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
        flushPendingEntries();
        // 重置 clearOnCommit 值,清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
        reset();
      }
    
      /**
       * 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除,并清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
       */
      public void rollback() {
        // 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除
        unlockMissedEntries();
        // 清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
        reset();
      }
    
      /**
       * 重置 clearOnCommit 值,清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
       */
      private void reset() {
        clearOnCommit = false;
        entriesToAddOnCommit.clear();
        entriesMissedInCache.clear();
      }
    
      /**
       * 将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
       */
      private void flushPendingEntries() {
        for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
          delegate.putObject(entry.getKey(), entry.getValue());
        }
        // 遍历 entriesMissedInCache 集合,缓存为命中的,且 entriesToAddOnCommit 集合中不包含的缓存项, 添加到二级缓冲中
        for (Object entry : entriesMissedInCache) {
          if (!entriesToAddOnCommit.containsKey(entry)) {
            delegate.putObject(entry, null);
          }
        }
      }
    
      /**
       * 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除
       */
      private void unlockMissedEntries() {
        for (Object entry : entriesMissedInCache) {
          try {
            delegate.removeObject(entry);
          } catch (Exception e) {
            log.warn("Unexpected exception while notifiying a rollback to the cache adapter."
                + "Consider upgrading your cache adapter to the latest version.  Cause: " + e);
          }
        }
      }
    
    }
    
    线程安全

    不同的 CachingExecutor 对象由不同的线程操作,二级缓存会不会存在线程不安全的问题呢?

    CacheBuilder.build() 方法,其中回调用 setStandardDecorators() 方法,为 PerpetualCache 类型的 Cache 对象添加 SynchronizedCache 装饰器,从而保证了二级线程安全。

    事务提交

    为什么要在事务提交时才将 TransactionalCache. entriesToAddOnCommit 集合中缓存的数据写入到二级缓存,而不是像一级缓存那样,将每次查询结果都直接写入二级缓存呢?

    这是为了防止出现“脏读” 情况 最终实现的效果有点类似于“不可重复读”的事务隔离级别。假设当前数据库的隔离级别是“不可重复读”,先
    后开启 Tl、T2 两个事务,如图 3- 52 示,在事务 Tl 添加了记录A, 之后查询A记录,最后提交事务,事务T2 会查询记录A。 如果事务 Tl 查询记录A时,就将A对应的结果对象放人二级缓存,则在事务 T2 第一次查询记录A时即可从二级缓存中直接获取其对应的结果对象。此时 Tl 仍然未提交,这就出现了"脏读"的情况,显然不是用户期的结果。


    image.png

    按照 CacheExecutor 的本身实现,事务T1查询记录A时二级缓存未命中,会查询数据库,因为是同一事务,所以可以查询到记录A 并得到相应的结果对象,并且会将记录 A 保存到 TransactionalCache .entriesToAddOnCommit 集合中。而事务T2第一次查询记录A时,二级缓存未命中,则会访问数据库,因为是不同个事务,数据库的“不可重复读”隔离级别会保证事务T2无法查询到记录A,这样就避免了上面的“脏读”的场景。在图 3-52 中,事务T1提交时会将 entriesToAddOnCommit 集合中的数据添加到二级缓存中,所以事务T2第二次查询记录A时,二级缓存才会命中,这就导致了同一事务中多次读取的结果不一致,也就是 “不可重复读”的场景。

    相关文章

      网友评论

        本文标题:CachingExecutor

        本文链接:https://www.haomeiwen.com/subject/cjtlektx.html