美文网首页
MyBatis印象阅读之Executor解析

MyBatis印象阅读之Executor解析

作者: 向光奔跑_ | 来源:发表于2019-08-09 10:47 被阅读0次

    在前面的内容中,我们分析了MyBatis解析资源文件的一些操作,接下来我们来看MyBatis这一套框架是如何运行的。

    首先我们需要再回顾下官网的入门实例:

    
    String resource = "org/mybatis/example/mybatis-config.xml";
    InputStream inputStream = Resources.getResourceAsStream(resource);
    SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
    
    SqlSession session = sqlSessionFactory.openSession();
    try {
      Blog blog = (Blog) session.selectOne("org.mybatis.example.BlogMapper.selectBlog", 101);
    } finally {
      session.close();
    }
    

    这里的流程是我们通过通过sqlSessionFactory获取SqlSession。下面我们进行分析。

    1. sqlSessionFactory源码分析

    sqlSessionFactory本身就是一个接口,他的内部方法也比较单一:

    public interface SqlSessionFactory {
    
      SqlSession openSession();
    
      SqlSession openSession(boolean autoCommit);
    
      SqlSession openSession(Connection connection);
    
      SqlSession openSession(TransactionIsolationLevel level);
    
      SqlSession openSession(ExecutorType execType);
    
      SqlSession openSession(ExecutorType execType, boolean autoCommit);
    
      SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level);
    
      SqlSession openSession(ExecutorType execType, Connection connection);
    
      Configuration getConfiguration();
    
    }
    

    我们拿他的实现类DefaultSqlSessionFactory来看他的最简单的一个openSession方法:

    
     //DefaultSqlSessionFactory
    
      @Override
      public SqlSession openSession() {
        return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
      }
    
    
    
      private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
        Transaction tx = null;
        try {
          final Environment environment = configuration.getEnvironment();
          final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
          tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
          // 默认execType为ExecutorType.SIMPLE
          final Executor executor = configuration.newExecutor(tx, execType);
          return new DefaultSqlSession(configuration, executor, autoCommit);
        } catch (Exception e) {
          closeTransaction(tx); // may have fetched a connection so lets call close()
          throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
        } finally {
          ErrorContext.instance().reset();
        }
      }
    

    这里最关键的就是根据ExecutorType生成对应的Executor,我们先来看看ExecutorType有哪些:

    public enum ExecutorType {
      SIMPLE, REUSE, BATCH
    }
    

    这三个type分别对应的Executor为:


    Executor部分子类

    那么这三个分别对应什么功能?

    • SimpleExecutor: 每次开始读或写操作,都创建对应的 Statement 对象。执行完成后,关闭该 Statement 对象。
    • ReuseExecutor: 每次开始读或写操作,优先从缓存中获取对应的 Statement 对象。如果不存在,才进行创建。执行完成后,不关闭该 Statement 对象。其它的,和 SimpleExecutor 是一致的。
    • BatchExecutor: 执行update(没有select,JDBC批处理不支持select),将所有sql都添加到批处理中(addBatch()),等待统一执行(executeBatch()),它缓存了多个Statement对象,每个Statement对象都是addBatch()完毕后,等待逐一执行executeBatch()批处理的;BatchExecutor相当于维护了多个桶,每个桶里都装了很多属于自己的SQL,就像苹果蓝里装了很多苹果,番茄蓝里装了很多番茄,最后,再统一倒进仓库。(可以是Statement或PrepareStatement对象)

    其实我们经常用的也就SimpleExecutor,所以我们拿这个来进行分析。

    2. Executor源码解析

    Executor是一个接口,包含了CRUD的基本数据库操作:

    
    public interface Executor {
    
      // 空 ResultHandler 对象的枚举
      ResultHandler NO_RESULT_HANDLER = null;
    
      // 更新 or 插入 or 删除,由传入的 MappedStatement 的 SQL 所决定
      int update(MappedStatement ms, Object parameter) throws SQLException;
    
      // 查询,带 ResultHandler + CacheKey + BoundSql
      <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
    
      // 查询,带 ResultHandler
      <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
    
      // 查询,返回值为 Cursor
      <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;
    
      // 刷入批处理语句
      List<BatchResult> flushStatements() throws SQLException;
    
      void commit(boolean required) throws SQLException;
    
      void rollback(boolean required) throws SQLException;
    
      CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
    
      boolean isCached(MappedStatement ms, CacheKey key);
    
      void clearLocalCache();
    
      void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
    
      Transaction getTransaction();
    
      void close(boolean forceRollback);
    
      boolean isClosed();
    
      void setExecutorWrapper(Executor executor);
    
    }
    
    

    下面一段我节选自芋道源码中的说明:

    每当我们使用 MyBatis 开启一次和数据库的会话,MyBatis 会创建出一个 SqlSession 对象表示一次数据库会话,而每个 SqlSession 都会创建一个 Executor 对象。
    在对数据库的一次会话中,我们有可能会反复地执行完全相同的查询语句,如果不采取一些措施的话,每一次查询都会查询一次数据库,而我们在极短的时间内做了完全相同的查询,那么它们的结果极有可能完全相同,由于查询一次数据库的代价很大,这有可能造成很大的资源浪费。
    为了解决这一问题,减少资源的浪费,MyBatis 会在表示会话的SqlSession 对象中建立一个简单的缓存,将每次查询到的结果结果缓存起来,当下次查询的时候,如果判断先前有个完全一样的查询,会直接从缓存中直接将结果取出,返回给用户,不需要再进行一次数据库查询了。😈 注意,这个“简单的缓存”就是一级缓存,且默认开启,无法关闭。
    如下图所示,MyBatis 会在一次会话的表示 —— 一个 SqlSession 对象中创建一个本地缓存( localCache ),对于每一次查询,都会尝试根据查询的条件去本地缓存中查找是否在缓存中,如果在缓存中,就直接从缓存中取出,然后返回给用户;否则,从数据库读取数据,将查询结果存入缓存并返回给用户。


    MyBatis一级缓存示意

    这里我们再来看创建:

    
     /**
       * 创建 Executor 对象
       */
      public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
        // 获得执行器类型,默认是SIMPLE
        executorType = executorType == null ? defaultExecutorType : executorType;
        executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
        Executor executor;
        if (ExecutorType.BATCH == executorType) {
          executor = new BatchExecutor(this, transaction);
        } else if (ExecutorType.REUSE == executorType) {
          executor = new ReuseExecutor(this, transaction);
        } else {
          executor = new SimpleExecutor(this, transaction);
        }
        if (cacheEnabled) {
          executor = new CachingExecutor(executor);
        }
        executor = (Executor) interceptorChain.pluginAll(executor);
        return executor;
      }
    

    这里代码逻辑比较简单,根据执行器类型创建执行器,默认是SimpleExecutor,如果开启了二级缓存,则用CachingExecutor来代理,最后是通过插件来封装。

    接下来我们先从最基础的BaseExecutor源码开始:

    public abstract class BaseExecutor implements Executor {
    
      private static final Log log = LogFactory.getLog(BaseExecutor.class);
    
      /**
       * 事务对象
       */
      protected Transaction transaction;
      /**
       * 包装的 Executor 对象
       */
      protected Executor wrapper;
    
      /**
       * DeferredLoad( 延迟加载 ) 队列
       */
      protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
      /**
       * 本地缓存,即一级缓存
       */
      protected PerpetualCache localCache;
      /**
       * 本地输出类型的参数的缓存
       */
      protected PerpetualCache localOutputParameterCache;
      protected Configuration configuration;
    
      /**
       * 记录嵌套查询的层级
       */
      protected int queryStack;
      private boolean closed;
    
      protected BaseExecutor(Configuration configuration, Transaction transaction) {
        this.transaction = transaction;
        this.deferredLoads = new ConcurrentLinkedQueue<>();
        this.localCache = new PerpetualCache("LocalCache");
        this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
        this.closed = false;
        this.configuration = configuration;
        this.wrapper = this;
      }
    }
    
    

    我们先来看下缓存类:PerpetualCache

    
    public class PerpetualCache implements Cache {
    
      private final String id;
    
      private Map<Object, Object> cache = new HashMap<>();
    
      public PerpetualCache(String id) {
        this.id = id;
      }
    
      @Override
      public String getId() {
        return id;
      }
    
      @Override
      public int getSize() {
        return cache.size();
      }
    
      @Override
      public void putObject(Object key, Object value) {
        cache.put(key, value);
      }
    
      @Override
      public Object getObject(Object key) {
        return cache.get(key);
      }
    
      @Override
      public Object removeObject(Object key) {
        return cache.remove(key);
      }
    
      @Override
      public void clear() {
        cache.clear();
      }
    
      @Override
      public ReadWriteLock getReadWriteLock() {
        return null;
      }
    
      @Override
      public boolean equals(Object o) {
        if (getId() == null) {
          throw new CacheException("Cache instances require an ID.");
        }
        if (this == o) {
          return true;
        }
        if (!(o instanceof Cache)) {
          return false;
        }
    
        Cache otherCache = (Cache) o;
        return getId().equals(otherCache.getId());
      }
    
      @Override
      public int hashCode() {
        if (getId() == null) {
          throw new CacheException("Cache instances require an ID.");
        }
        return getId().hashCode();
      }
    
    }
    

    下面介绍CRUD操作是如何的:
    下面是更新操作:

    
      @Override
      public int update(MappedStatement ms, Object parameter) throws SQLException {
        ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
        if (closed) {
          throw new ExecutorException("Executor was closed.");
        }
        clearLocalCache();
        return doUpdate(ms, parameter);
      }
      
        /**
       * 更新交给子类实现
       */
      protected abstract int doUpdate(MappedStatement ms, Object parameter)
          throws SQLException;
    

    这里的代码规范和spring很像,正在的操作方法都是do开头。

    我们再来看查询操作:

    
      @Override
      public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
        BoundSql boundSql = ms.getBoundSql(parameter);
        CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
        return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
      }
    
    

    其中创建CacheKey的方法是:

    
    @Override
      public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
        if (closed) {
          throw new ExecutorException("Executor was closed.");
        }
        CacheKey cacheKey = new CacheKey();
        cacheKey.update(ms.getId());
        cacheKey.update(rowBounds.getOffset());
        cacheKey.update(rowBounds.getLimit());
        cacheKey.update(boundSql.getSql());
        List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
        TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
        // mimic DefaultParameterHandler logic
        for (ParameterMapping parameterMapping : parameterMappings) {
          //关于存储过程,不做展开
          if (parameterMapping.getMode() != ParameterMode.OUT) {
            Object value;
            String propertyName = parameterMapping.getProperty();
            if (boundSql.hasAdditionalParameter(propertyName)) {
              value = boundSql.getAdditionalParameter(propertyName);
            } else if (parameterObject == null) {
              value = null;
            } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
              value = parameterObject;
            } else {
              MetaObject metaObject = configuration.newMetaObject(parameterObject);
              value = metaObject.getValue(propertyName);
            }
            cacheKey.update(value);
          }
        }
        if (configuration.getEnvironment() != null) {
          // issue #176
          cacheKey.update(configuration.getEnvironment().getId());
        }
        return cacheKey;
      }
    
    
    

    这里的逻辑还是比较清晰的,通过ms.getId()、rowBounds.getOffset()、rowBounds.getLimit()、boundSql.getSql()、configuration.getEnvironment().getId()组成cacheKey

    然后查询:

    
    @Override
      public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
        ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
        if (closed) {
          throw new ExecutorException("Executor was closed.");
        }
        if (queryStack == 0 && ms.isFlushCacheRequired()) {
          clearLocalCache();
        }
        List<E> list;
        try {
          queryStack++;
          list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
          if (list != null) {
            handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
          } else {
            list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
          }
        } finally {
          queryStack--;
        }
        if (queryStack == 0) {
          for (DeferredLoad deferredLoad : deferredLoads) {
            deferredLoad.load();
          }
          // issue #601
          deferredLoads.clear();
          if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
            // issue #482
            clearLocalCache();
          }
        }
        return list;
      }
    
    
    
      private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
        List<E> list;
        localCache.putObject(key, EXECUTION_PLACEHOLDER);
        try {
          list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
        } finally {
          localCache.removeObject(key);
        }
        localCache.putObject(key, list);
        if (ms.getStatementType() == StatementType.CALLABLE) {
          localOutputParameterCache.putObject(key, parameter);
        }
        return list;
      }
      
      
      
        protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
          throws SQLException;
    
    

    一样的套路,具体实现交给子类。只不过中间加入了一级缓存,而缓存是由简单的HashMap实现的。

    大概有了了解之后,我们来看实现类:
    我们先看SimpleExecutor类:

      @Override
      public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
        Statement stmt = null;
        try {
          Configuration configuration = ms.getConfiguration();
          StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
          stmt = prepareStatement(handler, ms.getStatementLog());
          return handler.query(stmt, resultHandler);
        } finally {
          closeStatement(stmt);
        }
      }
    

    这里是不是有点回归到我们使用传统JDBC调用数据库的感觉了。

    
      private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
        Statement stmt;
        Connection connection = getConnection(statementLog);
        stmt = handler.prepare(connection, transaction.getTimeout());
        handler.parameterize(stmt);
        return stmt;
      }
      
      
      protected Connection getConnection(Log statementLog) throws SQLException {
        Connection connection = transaction.getConnection();
        if (statementLog.isDebugEnabled()) {
          return ConnectionLogger.newInstance(connection, statementLog, queryStack);
        } else {
          return connection;
        }
      }
    

    这边我们看到引入了StatementHandler,具体这个类,我们先欠下,最后通过调用handler.query(stmt, resultHandler);

    今天我们只是做一个大概的了解,我们可以看到这里涉及到了很多新的技术债知识,之后我们会一一分析。

    3. 今日总结

    今天我们大概的看了关于Executor执行器是如何进行操作的流程,虽然其中很多细节我们目前还不太了解,但是今天也得有个MyBatis运行脉络的印象~~

    相关文章

      网友评论

          本文标题:MyBatis印象阅读之Executor解析

          本文链接:https://www.haomeiwen.com/subject/qfccjctx.html