美文网首页
ShardingSphere-jdbc sql执行过程

ShardingSphere-jdbc sql执行过程

作者: 甜甜起司猫_ | 来源:发表于2021-08-31 01:45 被阅读0次

    ShardingSphere-jdbc sql执行过程

    执行过程

    1. 调用ShardingSphereDataSource的getConnection方法
    2. 在DriverStateContext中的STATE容器中拿到之前在构造ShardingSphereDataSource时已存入的OKDriverState
    3. 通过OKDriverState的getConnection方法构造ShardingSphereConnection
    4. 通过调用ShardingSphereConnection中的createStatement方法,构造ShardingSphereStatement
    5. 在ShardingSpherePreparedStatement的构造方法中,完成多个变量的初始化
      5.1 jdbcExecutor,本质是ShardingSphereDataSource中生成的executorEngine,存放在metaDataContexts
      5.2 ShardingSphereSQLParserEngine,sql解析引擎,根据不同的数据库类型
      5.3 BatchPreparedStatementExecutor,封装需要执行的sql单元JDBCExecutionUnit
    6. 调用jdbcExecutor中的execute方法将sql的执行封装成任务由executorEngine中的线程池调度执行
    7. 调用ListenableFuture的get方法获取sql的执行结果

    方法解析

    example模块例子中,调用以下sql

    CREATE TABLE IF NOT EXISTS t_order (order_id BIGINT NOT NULL AUTO_INCREMENT, user_id INT NOT NULL, address_id BIGINT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_id))
    

    调用链如下:

        public Connection getConnection() {
            return DriverStateContext.getConnection(schemaName, getDataSourceMap(), contextManager, TransactionTypeHolder.get());
        }
    
        private static final Map<String, DriverState> STATES;
        
        static {
            // TODO add singleton cache with TypedSPI init
            ShardingSphereServiceLoader.register(DriverState.class);
            Collection<DriverState> driverStates = ShardingSphereServiceLoader.getSingletonServiceInstances(DriverState.class);
            STATES = new HashMap<>();
            for (DriverState each : driverStates) {
                STATES.put(each.getType(), each);
            }
        }
    
        public static Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
            return STATES.get(contextManager.getMetaDataContexts().getStateContext().getCurrentState()).getConnection(schemaName, dataSourceMap, contextManager, transactionType);
        }
    
    public final class OKDriverState implements DriverState {
        
        @Override
        public Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
            return new ShardingSphereConnection(schemaName, dataSourceMap, contextManager, TransactionTypeHolder.get());
        }
        
        @Override
        public String getType() {
            return "OK";
        }
    }
    

    DriverStateContext中的成员变量STATES,是在TypedSPI调用init的时候初始化,也就是在生成ShardingSphereDataSource的过程中,这里实际是从OKDriverState中getConnection,构建一个ShardingSphereConnection

        public PreparedStatement prepareStatement(final String sql) throws SQLException {
            return new ShardingSpherePreparedStatement(this, sql);
        }
    
        public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
            this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
        }
    
        private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
                                                final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
            this.connection = connection;
            metaDataContexts = connection.getContextManager().getMetaDataContexts();
            this.sql = sql;
            statements = new ArrayList<>();
            parameterSets = new ArrayList<>();
            ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine(
                    DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType()));
            sqlStatement = sqlParserEngine.parse(sql, true);
            parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
            statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
            JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
            driverJDBCExecutor = new DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
            rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
            // TODO Consider FederateRawExecutor
            federateExecutor = new FederateJDBCExecutor(connection.getSchemaName(), metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getProps(), jdbcExecutor);
            batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getSchemaName());
            kernelProcessor = new KernelProcessor();
        }
    

    通过调用ShardingSphereConnectionprepareStatement方法,构造一个ShardingSpherePreparedStatement,在ShardingSpherePreparedStatement的构造方法中初始化各种成员变量,与后续sql的执行有关

        public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
                                      final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
            if (executionGroupContext.getInputGroups().isEmpty()) {
                return Collections.emptyList();
            }
            return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback)
                    : parallelExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback);
        }
    
    • executionGroupContext--把需要执行的sql封装成JDBCExecutionUnit
    • callback--ShardingSphereStatement里通过createExecuteCallback生成的JDBCExecutorCallback
        private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback) throws SQLException {
            ExecutionGroup<I> firstInputs = executionGroups.next();
            Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(executionGroups, callback);
            return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
        }
    
        private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> callback) {
            Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
            while (executionGroups.hasNext()) {
                result.add(asyncExecute(executionGroups.next(), callback));
            }
            return result;
        }
    
        private <I, O> ListenableFuture<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final ExecutorCallback<I, O> callback) {
            Map<String, Object> dataMap = ExecutorDataMap.getValue();
            return executorServiceManager.getExecutorService().submit(() -> callback.execute(executionGroup.getInputs(), false, dataMap));
        }
    
        private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
            List<O> result = new LinkedList<>(firstResults);
            for (ListenableFuture<Collection<O>> each : restFutures) {
                try {
                    result.addAll(each.get());
                } catch (final InterruptedException | ExecutionException ex) {
                    return throwException(ex);
                }
            }
            return result;
        }
    
    1. 把执行sql封装成线程池任务丢入线程池中调度执行
    2. 把执行sql任务的调用结果封装成future收集起来
    3. 通过future,.get获取sql执行结果
        private <T> List<T> doExecute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final Collection<RouteUnit> routeUnits,
                                      final JDBCExecutorCallback<T> callback, final SQLStatement sqlStatement) throws SQLException {
            List<T> results = jdbcExecutor.execute(executionGroupContext, callback);
            refreshMetadata(sqlStatement, routeUnits);
            return results;
        }
    
    • jdbcExecutor.execute方法是真正执行sql的地方,在JDBCExecutor中存放了由ShardingSphereDataSource构造的executorEngine,所以本质上是由executorEngine来执行sql
    • refreshMetadata方法中,sqlStatement存放了sql语句中一些元数据信息(表字段,约束等),routeUnits存放了路由信息(数据源+字表)
    • 最后根据routeUnits的数量,返回相同数量的results,相当于在每个routeUnits中都执行一遍sql

    相关文章

      网友评论

          本文标题:ShardingSphere-jdbc sql执行过程

          本文链接:https://www.haomeiwen.com/subject/rtmailtx.html