美文网首页
mybatis源码解析七(DefaultSqlSession线程

mybatis源码解析七(DefaultSqlSession线程

作者: 为梦想前进 | 来源:发表于2020-04-03 16:26 被阅读0次

    最近在看源码发现一个问题,在看到DefaultSqlSession这个类的源码的时候,发现这个类上有一句注释,
    Note that this class is not Thread-Safe意思说说,此类不是线程安全的,及既然不是线程安全的,怎么还是默认实现那
    接下来,我们就一起从源码的角度分析一下,我们写一个小案例,然后通过案例一起分析下,这里我们以查询为主,代码很简单,就是一个简单的查询,我们定义了一个线程,通过countDownLauntch让他们同时请求,我们先执行下,看看结果
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DefaultSqlSessionTest {

    private static final int COUNT = 10;
    private static CountDownLatch count = new CountDownLatch(COUNT);
    private SqlSession sqlSession;
    
    @Autowired
    private SqlSessionFactory sqlSessionFactory;
    
    
    @Before
    public void init(){
                //这里的sqlSession是DefaultSqlSession中的sqlSession
        sqlSession = sqlSessionFactory.openSession();
    }
    
    @After
    public  void  destory(){
                //直接调用DefaultSqlSession,一定记得手动关闭下sqlSession
        sqlSession.close();
    }
    
    @Test
    public void defaultSqlSessionSafeTest() throws InterruptedException {
        for (int i = 0;i<10;i++){
            new Thread(() ->{
                try {
                    count.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                getAccount2();
            }).start();
            count.countDown();
        }
        Thread.sleep(5000);
    }
    
    private void getAccount1() {
        sqlSession.select("selectByPrimaryKey", 1,resultContext ->{
             RyxAccount ryxAccount = (RyxAccount)resultContext.getResultObject();
             System.out.println(ryxAccount);
        });
    }
    
    private void getAccount2(){
        sqlSession.selectList("selectByPrimaryKey",1);
    }
    

    执行完之后,我们看到的是,报错了,报的是一个强转异常,怎嘛会报这个强转异常嘞,我们器跟着源码,看看,根据打印的堆栈信息,我们进入到源码的DefaultSqlSession这个类一探究竟


    image.png

    当我们执行查询的时候,会调用DefaultSqlSession类下的selectList这个方法,我们接着往下看

     @Override
      public <E> List<E> selectList(String statement, Object parameter) {
        return this.selectList(statement, parameter, RowBounds.DEFAULT);
      }
    
    @Override
      public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
        try {
          //获取执行的sql语句
          MappedStatement ms = configuration.getMappedStatement(statement);
          //执行查询
          return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
        } catch (Exception e) {
          throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
        } finally {
          ErrorContext.instance().reset();
        }
      }
    
    
    进入到CacheExecuter类下的  query方法
     @Override
      public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
        //获取执行的sql语句
        BoundSql boundSql = ms.getBoundSql(parameterObject);
        //创建缓存,注意,这个地方,调用的BaseExecuter中的createCacheKey方法
        CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
        return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
      }
    
    这个方法具体就是将当前的sql语句,等一些类信息,按照指定规则拼装成一个key,然后返回,具体就不再分析了
     @Override
      public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
        if (closed) {
          throw new ExecutorException("Executor was closed.");
        }
        CacheKey cacheKey = new CacheKey();
        cacheKey.update(ms.getId());
        cacheKey.update(rowBounds.getOffset());
        cacheKey.update(rowBounds.getLimit());
        cacheKey.update(boundSql.getSql());
        List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
        TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
        // mimic DefaultParameterHandler logic
        for (ParameterMapping parameterMapping : parameterMappings) {
          if (parameterMapping.getMode() != ParameterMode.OUT) {
            Object value;
            String propertyName = parameterMapping.getProperty();
            if (boundSql.hasAdditionalParameter(propertyName)) {
              value = boundSql.getAdditionalParameter(propertyName);
            } else if (parameterObject == null) {
              value = null;
            } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
              value = parameterObject;
            } else {
              MetaObject metaObject = configuration.newMetaObject(parameterObject);
              value = metaObject.getValue(propertyName);
            }
            cacheKey.update(value);
          }
        }
        if (configuration.getEnvironment() != null) {
          // issue #176
          cacheKey.update(configuration.getEnvironment().getId());
        }
        return cacheKey;
      }
    

    接下来,我们拿到了一个这个key,接着往下看,delegate.<E> query这个方法,是真正的查询加添加到缓存中的方法实现,这段代码比较简单,就不做分析了,直接进入到下一个方法,BaseExecuter.query

     @Override
      public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
          throws SQLException {
        Cache cache = ms.getCache();
        if (cache != null) {
          flushCacheIfRequired(ms);
          if (ms.isUseCache() && resultHandler == null) {
            ensureNoOutParams(ms, parameterObject, boundSql);
            @SuppressWarnings("unchecked")
            List<E> list = (List<E>) tcm.getObject(cache, key);
            if (list == null) {
              list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
              tcm.putObject(cache, key, list); // issue #578 and #116
            }
            return list;
          }
        }
        return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
      }
    
    
    BaseExecuter类
    @SuppressWarnings("unchecked")
      @Override
      public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
        ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
        if (closed) {
          throw new ExecutorException("Executor was closed.");
        }
        if (queryStack == 0 && ms.isFlushCacheRequired()) {
          clearLocalCache();
        }
        List<E> list;
        try {
          queryStack++;
          //这是一个三元运算符,resultHandler 是否为空,如果为空,就去缓存中取内容,否则设置为null,
          list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
          if (list != null) {
            handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
          } else {
            list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
          }
        } finally {
          queryStack--;
        }
        if (queryStack == 0) {
          for (DeferredLoad deferredLoad : deferredLoads) {
            deferredLoad.load();
          }
          // issue #601
          deferredLoads.clear();
          if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
            // issue #482
            clearLocalCache();
          }
        }
        return list;
      }
    
    2.1
     private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
        List<E> list;
        localCache.putObject(key, EXECUTION_PLACEHOLDER);
        try {
          list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
        } finally {
          localCache.removeObject(key);
        }
        localCache.putObject(key, list);
        if (ms.getStatementType() == StatementType.CALLABLE) {
          localOutputParameterCache.putObject(key, parameter);
        }
        return list;
      }
    

    我们重点分析下,一个三元代码 list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
    试想一个场景,当两个线程现在开始执行查询账户的业务,线程T_SQL_1和T_SQL_2
    1:T_SQL_1先拿到线程执行权,会先调用createCacheKey,如果没有,则会创建这个key,此时假如是第一次查询,localCache.getObject(key)中还不存在key,则list为null
    2:执行queryFromDatabase(见2.1)方法,会在这里先给key添加一个默认占位符EXECUTION_PLACEHOLDER
    3:然后在这个时候,T_SQL_2获得了线程执行权,调用上面的localCache.getObject(key),获得value:EXECUTION_PLACEHOLDER
    4:localCache.getObject(key)此时就不是null了,然后程序开始转换啊,就会变成如下代码,我们模拟下这个解析过程,看个demo
    如图,简单模拟了下,如下过程,得到就是强转异常,说明问题就是出现在这里,由于线程争夺资源的问题,这里拿到的key其实是占位符
    而不是具体从数据库查询出来的值,谜底终于解开了,原来问题出现在这里,


    image.png

    我们继续研究这句代码list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
    resultHandler 这个参数,如果不为空,也就意味着,一级缓存也就失效了,也就不用去缓存中取找了,所以当你使用流式查询的时候,是不会出现这个问题的,因为就不会走缓存,都是查询数据库,不存在缓存的问题
    最终的结论是,我们最好不要自己轻易使用DefaultSqlseesion直接去调用查询sql,很容易因为并发问题导致转换异常
    当然,既然mybatis的源码大神们早都知道这个DefaultSqlSession这个类线程安全的问题,肯定要处理啊,我们接下来看看他们是怎么处理的,我们看源码中sqlSession接口实现类中看到了一共有三个实现类如图


    image.png

    分别是
    1:DefaultSqlSession(已分析)
    2:SqlSessionManager(mybatis处理DefaultSqlSession的线程安全管理类)
    3:SqlSessionTemplete(spring框架处理mybatis的线程安全的处理框架)

    我们先分析下SqlSessionManager,看一下这个类,我们截取一段代码
    我们又看到了熟悉的jdk代理技术,当调用SqlSessioManager的查询语句的时候,会先调用SqlSessionInterceptor
    这里翻译为拦截器很恰当,我们看到,会去ThreadLocal中获取sqSession,获取不到,就去创建一个DefaultSqSession对象
    这样的话,相当于每个线程持有自己的DefaultSqlSession对象,所以,当不同的线程访问的时候,一级缓存也就失效了,

    public class SqlSessionManager implements SqlSessionFactory, SqlSession {
    
      private final SqlSessionFactory sqlSessionFactory;
      private final SqlSession sqlSessionProxy;
    
      private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<SqlSession>();
    
      private SqlSessionManager(SqlSessionFactory sqlSessionFactory) {
        this.sqlSessionFactory = sqlSessionFactory;
        this.sqlSessionProxy = (SqlSession) Proxy.newProxyInstance(
            SqlSessionFactory.class.getClassLoader(),
            new Class[]{SqlSession.class},
            new SqlSessionInterceptor());
      }
    
      public static SqlSessionManager newInstance(Reader reader) {
        return new SqlSessionManager(new SqlSessionFactoryBuilder().build(reader, null, null));
      }
    .....省略
    
     private class SqlSessionInterceptor implements InvocationHandler {
        public SqlSessionInterceptor() {
            // Prevent Synthetic Access
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
          //去ThreadLocal中获取sqlSesison,如果获取不到,
          final SqlSession sqlSession = SqlSessionManager.this.localSqlSession.get();
          if (sqlSession != null) {
            try {
              return method.invoke(sqlSession, args);
            } catch (Throwable t) {
              throw ExceptionUtil.unwrapThrowable(t);
            }
          } else {
            //当qlSession为null,调用openSession方法,然后去调用sqlSessionFactory.openSession()
            //创建一个DefaultSqlSession的对象,
            final SqlSession autoSqlSession = openSession();
            try {
              final Object result = method.invoke(autoSqlSession, args);
              autoSqlSession.commit();
              return result;
            } catch (Throwable t) {
              autoSqlSession.rollback();
              throw ExceptionUtil.unwrapThrowable(t);
            } finally {
              autoSqlSession.close();
            }
          }
        }
      }
    
    SqlSessionFactory
    private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
        Transaction tx = null;
        try {
          final Environment environment = configuration.getEnvironment();
          final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
          tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
          final Executor executor = configuration.newExecutor(tx, execType);
          return new DefaultSqlSession(configuration, executor, autoCommit);
        } catch (Exception e) {
          closeTransaction(tx); // may have fetched a connection so lets call close()
          throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
        } finally {
          ErrorContext.instance().reset();
        }
      }
    

    分析了这么多,我们来个案例,来验证下SqlSessionManager,案例很简单,就不做分析了,我改了下源码,打印了日志,

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DefaultSqlSessionManagerTest {
    
    
        private static final int COUNT_THREAD = 10;
        private static CountDownLatch count = new CountDownLatch(COUNT_THREAD);
        private SqlSessionManager sqlSessionManager;
    
        @Autowired
        private SqlSessionFactory sqlSessionFactory;
    
        @Before
        public void init(){
            sqlSessionManager = SqlSessionManager.newInstance(sqlSessionFactory);
        }
    
    
    
        @Test
        public  void sqlSessionManagerTest() throws InterruptedException {
            for (int i = 0;i<COUNT_THREAD;i++){
                new Thread(() ->{
                    try {
                        count.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    getAccount1();
                }).start();
                count.countDown();
            }
            Thread.sleep(5000);
        }
    
        private void getAccount1() {
            sqlSessionManager.startManagedSession();
            sqlSessionManager.
            selectList("selectByPrimaryKey",1);
        }
    
    }
    
    image.png

    这是一部分日志,可以看到每个线程生成了新的Slqsession,所以也就保证了线程安全
    SqlSessionManager可以允许我们将sqlSession设置到ThreadLoacl中,这样也可以保证DefaultSqlSession线程安全
    具体就是添加一句如下代码sqlSessionManager.startManagedSession();这样,我们就为每个线程分配了一个SqlSession并存储到
    ThreadLocal中,这样也是一样的效果,通过ThreadLocal,get方法会获取到具体的sqlSession对象,但是这里有个问题,由于这个ThradLocal是私有的,set完之后,在关闭后,清除ThreadLocal中的内容实在关闭sqlSession后,就是在这里

     @Override
      public void close() {
        final SqlSession sqlSession = localSqlSession.get();
        if (sqlSession == null) {
          throw new SqlSessionException("Error:  Cannot close.  No managed session is started.");
        }
        try {
          sqlSession.close();
        } finally {
          //直接将ThreadLocal中的当前线程变量sqlSession设置为null
          localSqlSession.set(null);
        }
      }
    

    ok,SqlSessionManager就分析到这里,代码还是比较简单的,就到这里,下一期,我们一起看下spring到底是怎样保证defaultSqlSession线程安全的,
    Thanks!
    更多博客,请移步到博主技术博客https://renyuanxin.top

    相关文章

      网友评论

          本文标题:mybatis源码解析七(DefaultSqlSession线程

          本文链接:https://www.haomeiwen.com/subject/snycphtx.html