美文网首页Mybatis
MyBatis原理(二)——执行器Executor

MyBatis原理(二)——执行器Executor

作者: Lnstark | 来源:发表于2021-11-30 23:16 被阅读0次

    一、 执行器的分类

    Executor类图.png

    Mybatis里的执行器主要是一个接口Executor,一个抽象类BaseExecutor,以及3个实现类:

    • SimpleExecutor:每次执行SQL都会预编译
    • ReuseExecutor:可以重用编译过的SQL语句(用完后不关闭statement,用map缓存)
    • BatchExecutor:里面可缓存多个statement,可执行批量更新操作,但需要在结尾执行doFlushStatements然后提交,才能生效。

    Executor提供了查询、更新(包含增删改)、提交、回滚、获取事务、关闭等方法。
    抽象类BaseExecutor里实现了获取连接、一级缓存。
    如果开了二级缓存,那么SqlSession会用CachingExecutor来处理二级缓存的逻辑,它里面包含一个BaseExecutor delegate,来处理正常SQL逻辑。即装饰者模式。

    二、 缓存

    我们知道myBatis里有一级缓存和二级缓存,一级缓存是会话级的,存在内存里,无法跨线程使用,二级缓存是应用级的,是可以跨线程的,可以存在内存里,也可以存在硬盘,或者第三方集成。开了二级缓存的话,会先使用二级缓存,再使用一级。

    • 一级缓存

      我们看下BaseExecutor的query方法
    @Override
    public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
      ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
      if (closed) {
        throw new ExecutorException("Executor was closed.");
      }
      // 如果查询栈是0且设置了每次查询前清空缓存,那么这里会执行清理一级缓存
      if (queryStack == 0 && ms.isFlushCacheRequired()) {
        clearLocalCache();
      }
      List<E> list;
      try {
        queryStack++;
        // 从缓存中拿数据
        list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
        if (list != null) {
          // 处理存储过程的出参逻辑
          handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
        } else {
          // 如果缓存没数据,那就去查库,并且将结果放到缓存里
          list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
        }
      } finally {
        queryStack--;
      }
      if (queryStack == 0) {
        for (DeferredLoad deferredLoad : deferredLoads) {
          deferredLoad.load();
        }
        deferredLoads.clear();
        // 如果localCacheScope设置为statement的话,那么每次查询完都会清空一级缓存
        if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
          clearLocalCache();
        }
      }
      return list;
    }
    

    我们可以看到1级缓存默认是存在的,且无法关闭,但我们可以设置localCacheScope为statement来变相的禁用1级缓存。
    一级缓存命中条件:

    运行时参数

    1. 同一个会话
    2. SQL语句、参数相同
    3. 相同的statementID
    4. RowBounds相同

    操作相关

    1. 未手动清空缓存
    2. 未配置flushCache=true
    3. 未执行update语句
    4. 缓存作用域不是statement
    • 二级缓存

    二级缓存的设计牵扯到很多缓存相关的特点,比如溢出淘汰、过期清理、线程安全、序列化等。这么多功能如何设计呢?MyBatis把每个功能写成一个组件类,然后用装饰者加责任链的模式,将各个组件进行串联。在执行缓存的基本功能时,其它的缓存逻辑会沿着这个责任链依次往下传递。


    二级缓存架构.png

    如何启用?
    我们看Configuration的newExecutor方法

    public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
      executorType = executorType == null ? defaultExecutorType : executorType;
      executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
      Executor executor;
      if (ExecutorType.BATCH == executorType) {
        executor = new BatchExecutor(this, transaction);
      } else if (ExecutorType.REUSE == executorType) {
        executor = new ReuseExecutor(this, transaction);
      } else {
        executor = new SimpleExecutor(this, transaction);
      }
      if (cacheEnabled) {
        executor = new CachingExecutor(executor);
      }
      executor = (Executor) interceptorChain.pluginAll(executor);
      return executor;
    }
    

    可以看到设置cacheEnabled字段可以控制SqlSession的执行器是BaseExecutor还是CachingExecutor,cacheEnabled默认为true。开了cacheEnabled然后还要指定缓存空间,要么mapper.xml里加<cache/>或<cache-ref/>元素,要么mapper.java上加@CacheNamespace或@CacheNamespaceRef注解。一个mapper对应一个缓存空间。

    如何用自定义缓存?
    二级缓存的默认实现是PerpetualCache(其实一级缓存也是),里面就是简单的用HashMap来做。我们要实现自定义缓存的话,先创建一个类如RedisCache实现Cache接口,然后配置
    <cache type="packagename.RedisCache">或者@CacheNamespace(implementation = RedisCache.class)就可以了。

    命中条件
    二级缓存的命中场景与一级缓存类似,不同点在于他不用在一个会话内,必须提交之后才生效。为什么呢?比如两个会话在修改同一数据,当会话二修改后,再将其查询出来,假如它实时填充到二级缓存,而会话一就能获取修改之后的数据,但实际上修改的数据回滚了,并没真正的提交到数据库。类似数据库的脏读。

    为何二级缓存能够会话间共享?
    我们先了解下缓存过程,首先看具体的CachingExecutor的query方法

    @Override
    public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
        throws SQLException {
      // 先从MappedStatement里取出二级缓存,这里的cache就是mapper对应的全局缓存空间
      Cache cache = ms.getCache();
      // 如果有的话
      if (cache != null) {
        // 如果设置了每次查询前清空缓存,那么这里会执行清理二级缓存
        flushCacheIfRequired(ms);
        // 如果开启了缓存并且自定义结果处理器为空,那么继续走缓存逻辑
        if (ms.isUseCache() && resultHandler == null) {
          ensureNoOutParams(ms, boundSql);
          @SuppressWarnings("unchecked")
          // 从缓存事务管理器里取出真正的缓存数据
          List<E> list = (List<E>) tcm.getObject(cache, key);
          // 如果没有的话那么执行查询
          if (list == null) {
            list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
            tcm.putObject(cache, key, list); // issue #578 and #116
          }
          return list;
        }
      }
      // 如果MappedStatement没有缓存的话就直接查询
      return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
    }
    

    可以看到缓存由事务缓存管理器TransactionalCacheManager(简称tcm)来管理。

    public class TransactionalCacheManager {
    
      private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
    
      // 根据全局缓存空间拿到暂存空间,并清楚暂存数据
      public void clear(Cache cache) {
        getTransactionalCache(cache).clear();
      }
    
      // 根据全局缓存空间拿到暂存空间,然后取出数据
      public Object getObject(Cache cache, CacheKey key) {
        return getTransactionalCache(cache).getObject(key);
      }
    
      // 根据全局缓存空间拿到暂存空间,然后存进去数据
      public void putObject(Cache cache, CacheKey key, Object value) {
        getTransactionalCache(cache).putObject(key, value);
      }
    
      // 会话提交时执行CachingExecutor的commit,它里面执行tcm的这个方法
      // TransactionalCache会把暂存缓存刷到全局缓存空间
      public void commit() {
        for (TransactionalCache txCache : transactionalCaches.values()) {
          txCache.commit();
        }
      }
    
      // ...
    
      // 根据全局缓存空间拿到TransactionalCache
      // 没有的话创建一个TransactionalCache(delegate为全局缓存空间),它里面有暂存空间
      private TransactionalCache getTransactionalCache(Cache cache) {
        return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
      }
    
    }
    

    它里面包含了一个Map<Cache, TransactionalCache>,key其实就是全局缓存空间。TransactionalCache就是我们上面提到的传递链模式中的一环

    它里面有一个Cache delegate(即下一环,这里其实就是全局缓存空间),一个Map<Object, Object> entriesToAddOnCommit,即暂存空间,等会话提交或者关闭时,这里的数据会flush到delegate里,即全局缓存空间里,也就是二级缓存。

    public class TransactionalCache implements Cache {
      private static final Log log = LogFactory.getLog(TransactionalCache.class);
    
      // 下一环,现在这里就是全局缓存空间
      private final Cache delegate;
      private boolean clearOnCommit;
      // 暂存空间
      private final Map<Object, Object> entriesToAddOnCommit;
      private final Set<Object> entriesMissedInCache;
    
      // ...
    
      // tcm的commit会执行这个方法
      public void commit() {
        if (clearOnCommit) {
          delegate.clear();
        }
        flushPendingEntries();
        reset();
      }
    
      // ...
      // 将暂存空间刷到delegate里
      private void flushPendingEntries() {
        for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
          delegate.putObject(entry.getKey(), entry.getValue());
        }
        for (Object entry : entriesMissedInCache) {
          if (!entriesToAddOnCommit.containsKey(entry)) {
            delegate.putObject(entry, null);
          }
        }
      }
    }
    

    大致结构如下图:

    二级缓存结构及原理.png
    何时清空二级缓存?
    mapper里的SQL语句有个叫flushCache的配置,他对应MappedStatement的flushCacheRequired字段,表示是否清空缓存。查询语句默认false,增删改语句默认true。上面可以看到在query方法里有一行flushCacheIfRequired(ms),如果查询开启了这个字段,那么每次查询前都会清空缓存。更新的时候也会调用flushCacheIfRequired方法。
    private void flushCacheIfRequired(MappedStatement ms) {
      Cache cache = ms.getCache();
      // 如果缓存不为空,且开启了flushCache,那就清空tcm里对应的暂存区
      if (cache != null && ms.isFlushCacheRequired()) {
        tcm.clear(cache);
      }
    }
    

    再来看tcm里的clear:

    public void clear(Cache cache) {
      // 获取TransactionalCache然后clear
      getTransactionalCache(cache).clear();
    }
    

    再看TransactionalCache的clear方法:

    public void clear() {
      clearOnCommit = true;
      entriesToAddOnCommit.clear();
    }
    

    它设置clearOnCommit 为true,然后清空暂存区。等到提交的时候会将全局缓存空间里的内容清除掉。

    public void commit() {
      // 上面将clearOnCommit 设置为true,这里就清除delegate的缓存了
      if (clearOnCommit) {
        delegate.clear();
      }
      flushPendingEntries();
      // reset方法再将clearOnCommit设置为false
      reset();
    }
    

    另外有一个细节,在查询二级缓存的时候,假如clearOnCommit为true,那么就不返回数据,因为该缓存即将被清除。

    public Object getObject(Object key) {
      // issue #116
      Object object = delegate.getObject(key);
      if (object == null) {
        entriesMissedInCache.add(key);
      }
      // issue #146
      // clearOnCommit为true说明提交后要清除缓存,直接返回null
      if (clearOnCommit) {
        return null;
      } else {
        return object;
      }
    }
    

    参考资料

    B站——MyBatis源码解析大合集
    源码阅读网
    深入浅出mybatis之缓存机制

    相关文章

      网友评论

        本文标题:MyBatis原理(二)——执行器Executor

        本文链接:https://www.haomeiwen.com/subject/gupjxrtx.html