美文网首页ShardingSphere
Sharding-JDBC 执行整体流程

Sharding-JDBC 执行整体流程

作者: 晴天哥_王志 | 来源:发表于2020-06-07 21:53 被阅读0次

    Sharding-JDBC系列

    开篇

    • 这篇文章主要是通过例子来讲解Sharding-JDBC的执行流程,但是因为每个流程都过于复杂,所以这里只能针对宏观的流程进行分析,后面等个人理解深入后再针对各个过程单独分析。
    • JDBC执行的流程需要参考具体的demo,案例可以参考shardingsphere-example
    • 执行前需要本机安装个mysql,步骤可以参考sharding-sphere学习

    数据源配置

    public final class ShardingTablesConfigurationPrecise implements ExampleConfiguration {
        
        @Override
        public DataSource getDataSource() throws SQLException {
            ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
            shardingRuleConfig.getTableRuleConfigs().add(getOrderTableRuleConfiguration());
            shardingRuleConfig.getTableRuleConfigs().add(getOrderItemTableRuleConfiguration());
            shardingRuleConfig.getBindingTableGroups().add("t_order, t_order_item");
            shardingRuleConfig.getBroadcastTables().add("t_address");
            shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("order_id", new PreciseModuloShardingTableAlgorithm()));
            return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties());
        }
    
        
        private static TableRuleConfiguration getOrderTableRuleConfiguration() {
            TableRuleConfiguration result = new TableRuleConfiguration("t_order", "demo_ds.t_order_${[0, 1]}");
            result.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "order_id", getProperties()));
            return result;
        }
        
        private static TableRuleConfiguration getOrderItemTableRuleConfiguration() {
            TableRuleConfiguration result = new TableRuleConfiguration("t_order_item", "demo_ds.t_order_item_${[0, 1]}");
            result.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "order_item_id", getProperties()));
            return result;
        }
        
        private static Map<String, DataSource> createDataSourceMap() {
            Map<String, DataSource> result = new HashMap<>();
            result.put("demo_ds", DataSourceUtil.createDataSource("demo_ds"));
            return result;
        }
        
        private static Properties getProperties() {
            Properties result = new Properties();
            result.setProperty("worker.id", "123");
            return result;
        }
    }
    
    • t_order和t_order_item表按照分表的逻辑进行保存,在同一个database当中保存2个table。
    • 我们这里暂时分析t_order表的插入过程,核心应该在于选择t_order_0还是t_order_1。

    SQL执行流程

    public class OrderRepositoryImpl implements OrderRepository {
    
        private final DataSource dataSource;
        // OrderRepositoryImpl的构造函数参数为ShardingDataSource
        public OrderRepositoryImpl(final DataSource dataSource) {
            this.dataSource = dataSource;
        }
    
        @Override
        public Long insert(final Order order) throws SQLException {
            String sql = "INSERT INTO t_order (user_id, address_id, status) VALUES (?, ?, ?)";
    
            // 1、从dataSource中获取Connection对象,dataSource是ShardingDataSource对象
            try (Connection connection = dataSource.getConnection();
                // 2、通过connection准备PreparedStatement对象
                PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
                // 3、设置preparedStatement的参数
                preparedStatement.setInt(1, order.getUserId());
                preparedStatement.setLong(2, order.getAddressId());
                preparedStatement.setString(3, order.getStatus());
                // 4、执行preparedStatement的executeUpdate
                preparedStatement.executeUpdate();
                // 5、处理ResultSet对象
                try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
                    if (resultSet.next()) {
                        order.setOrderId(resultSet.getLong(1));
                    }
                }
            }
            return order.getOrderId();
        }
    }
    
    • 1、从dataSource中获取Connection对象,dataSource是ShardingDataSource对象。
    • 2、通过connection准备PreparedStatement对象。
    • 3、设置preparedStatement的参数。
    • 4、执行preparedStatement的executeUpdate。
    • 5、处理ResultSet对象。
    • mysql的核心执行流程就是按照上面的步骤执行。

    核心变量介绍

    ShardingRuntimeContext

    public final class ShardingRuntimeContext extends AbstractRuntimeContext<ShardingRule> {
        
        private final DatabaseMetaData cachedDatabaseMetaData;
        // 核心变量ShardingSphereMetaData
        private final ShardingSphereMetaData metaData;
        private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;
        
        public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule rule, final Properties props, final DatabaseType databaseType) throws SQLException {
            super(rule, props, databaseType);
            cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap, rule);
            metaData = createMetaData(dataSourceMap, rule, databaseType);
            shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();
            shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
        }
    }
    
    public abstract class AbstractRuntimeContext<T extends BaseRule> implements RuntimeContext<T> {
        
        private final T rule;
        private final ShardingProperties props;
        private final DatabaseType databaseType;
        private final ShardingExecuteEngine executeEngine;
        private final SQLParseEngine parseEngine;
        
        protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
            this.rule = rule;
            this.props = new ShardingProperties(null == props ? new Properties() : props);
            this.databaseType = databaseType;
            executeEngine = new ShardingExecuteEngine(this.props.<Integer>getValue(ShardingPropertiesConstant.EXECUTOR_SIZE));
            parseEngine = SQLParseEngineFactory.getSQLParseEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
            ConfigurationLogger.log(rule.getRuleConfiguration());
            ConfigurationLogger.log(props);
        }
        
        @Override
        public void close() throws Exception {
            executeEngine.close();
        }
    }
    
    • ShardingRuntimeContext作为分片上下文包含各类核心变量。
    • ShardingSphereMetaData包含分片元数据。
    • executeEngine为执行引擎。
    • parseEngine为解析引擎。

    ShardingDataSource

    @Getter
    public class ShardingDataSource extends AbstractDataSourceAdapter {
        
        private final ShardingRuntimeContext runtimeContext;
        
        public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
            super(dataSourceMap);
            checkDataSourceType(dataSourceMap);
            runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
        }
        
        @Override
        public final ShardingConnection getConnection() {
            return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
        }
    }
    
    • ShardingDataSource的getConnection返回ShardingConnection对象。

    ShardingConnection

    public final class ShardingConnection extends AbstractConnectionAdapter {
        
        private final Map<String, DataSource> dataSourceMap;
        private final ShardingRuntimeContext runtimeContext;
        private final TransactionType transactionType;
        private final ShardingTransactionManager shardingTransactionManager;
        
        public ShardingConnection(final Map<String, DataSource> dataSourceMap, final ShardingRuntimeContext runtimeContext, final TransactionType transactionType) {
            this.dataSourceMap = dataSourceMap;
            this.runtimeContext = runtimeContext;
            this.transactionType = transactionType;
            shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);
        }
    
        @Override
        public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
            return new ShardingPreparedStatement(this, sql, autoGeneratedKeys);
        }
    }
    
    • ShardingConnection的prepareStatement返回ShardingPreparedStatement对象。

    ShardingPreparedStatement

    public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
        
        @Getter
        private final ShardingConnection connection;
        // 待执行的SQL
        private final String sql;
        // 路由引擎
        private final PreparedQueryShardingEngine shardingEngine;
        // 执行引擎
        private final PreparedStatementExecutor preparedStatementExecutor;
        private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
        private SQLRouteResult sqlRouteResult;
        private ResultSet currentResultSet;
    
        public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
            this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys);
        }
    
        private ShardingPreparedStatement(
                final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys)
                throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
    
            this.connection = connection;
            this.sql = sql;
    
            ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
            // 核心的是PreparedQueryShardingEngine对象
            shardingEngine = new PreparedQueryShardingEngine(sql, runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine());
            preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
            batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
        }
    }
    
    • ShardingPreparedStatement对象的包含传入的sql和connection对象。
    • ShardingPreparedStatement的核心变量包含shardingEngine、preparedStatementExecutor、batchPreparedStatementExecutor。
    • shardingEngine负责执行分片选择对应的库表。
    • preparedStatementExecutor负责执行具体的SQL。

    PreparedQueryShardingEngine

    public final class PreparedQueryShardingEngine extends BaseShardingEngine {
        
        private final PreparedStatementRoutingEngine routingEngine;
        
        public PreparedQueryShardingEngine(final String sql, 
                                           final ShardingRule shardingRule, final ShardingProperties shardingProperties, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
            super(shardingRule, shardingProperties, metaData);
            routingEngine = new PreparedStatementRoutingEngine(sql, shardingRule, metaData, sqlParseEngine);
        }
        
        @Override
        protected List<Object> cloneParameters(final List<Object> parameters) {
            return new ArrayList<>(parameters);
        }
        
        @Override
        protected SQLRouteResult route(final String sql, final List<Object> parameters) {
            return routingEngine.route(parameters);
        }
    }
    
    • PreparedQueryShardingEngine包含PreparedStatementRoutingEngine对象。
    • PreparedQueryShardingEngine负责执行路由功能,核心通过routingEngine来执行。

    PreparedStatementRoutingEngine

    public final class PreparedStatementRoutingEngine {
        
        private final String logicSQL;
        private final ShardingRouter shardingRouter;
        private final ShardingMasterSlaveRouter masterSlaveRouter;
        private SQLStatement sqlStatement;
        
        public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
            this.logicSQL = logicSQL;
            shardingRouter = new ShardingRouter(shardingRule, metaData, sqlParseEngine);
            masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
        }
    
        public SQLRouteResult route(final List<Object> parameters) {
            if (null == sqlStatement) {
                sqlStatement = shardingRouter.parse(logicSQL, true);
            }
            return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
        }
    }
    
    • PreparedStatementRoutingEngine包含shardingRouter和masterSlaveRouter。
    • shardingRouter负责执行路由功能。

    变量之间关系

    • ShardingDataSource负责提供ShardingConnection。
    • ShardingConnection负责提供ShardingPreparedStatement。
    • ShardingPreparedStatement负责提供PreparedQueryShardingEngine和PreparedStatementExecutor。
    • PreparedQueryShardingEngine负责提供PreparedStatementRoutingEngine。
    • PreparedStatementRoutingEngine负责提供ShardingRouter。

    Statement执行过程

    public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
        
        @Getter
        private final ShardingConnection connection;
        private final String sql;
        private final PreparedQueryShardingEngine shardingEngine;    
        private final PreparedStatementExecutor preparedStatementExecutor;
        private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
        private SQLRouteResult sqlRouteResult;
        private ResultSet currentResultSet;
    
        @Override
        public int executeUpdate() throws SQLException {
            try {
                // 1、清空之前的查询
                clearPrevious();
                // 2、执行shard操作
                shard();
                // 3、初始化PreparedStatementExecutor
                initPreparedStatementExecutor();
                // 4、执行preparedStatementExecutor的executeUpdate方法
                return preparedStatementExecutor.executeUpdate();
            } finally {
                clearBatch();
            }
        }
    
        private void clearPrevious() throws SQLException {
            preparedStatementExecutor.clear();
        }
    
        private void shard() {
            // shardingEngine为PreparedQueryShardingEngine
            sqlRouteResult = shardingEngine.shard(sql, getParameters());
        }
    
        private void initPreparedStatementExecutor() throws SQLException {
            preparedStatementExecutor.init(sqlRouteResult);
            setParametersForStatements();
            replayMethodForStatements();
        }
    }
    
    public final class PreparedStatementExecutor extends AbstractStatementExecutor {
        public int executeUpdate() throws SQLException {
            final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
            SQLExecuteCallback<Integer> executeCallback = SQLExecuteCallbackFactory.getPreparedUpdateSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
            List<Integer> results = executeCallback(executeCallback);
            if (isAccumulate()) {
                return accumulate(results);
            } else {
                return results.get(0);
            }
        }
    }
    
    • clearPrevious清空之前的查询。
    • shard通过shardingEngine执行分片操作,shardingEngine为PreparedQueryShardingEngine对象。
    • initPreparedStatementExecutor负责初始化PreparedStatementExecutor。
    • preparedStatementExecutor.executeUpdate执行preparedStatementExecutor的executeUpdate动作。
    • 核心关注路由过程和executeUpdate过程。

    路由过程

    • 整体路由顺序按照PreparedQueryShardingEngine
      => PreparedStatementRoutingEngine => ShardingRouter => StandardRoutingEngine 进行路由。

    BaseShardingEngine

    public abstract class BaseShardingEngine {
    
        public SQLRouteResult shard(final String sql, final List<Object> parameters) {
            List<Object> clonedParameters = cloneParameters(parameters);
            // 执行executeRoute
            SQLRouteResult result = executeRoute(sql, clonedParameters);
            result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));
            boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
            if (showSQL) {
                boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
                SQLLogger.logSQL(sql, showSimple, result.getSqlStatementContext(), result.getRouteUnits());
            }
            return result;
        }
    
        private SQLRouteResult executeRoute(final String sql, final List<Object> clonedParameters) {
            routingHook.start(sql);
            try {
                SQLRouteResult result = route(sql, clonedParameters);
                routingHook.finishSuccess(result, metaData.getTables());
                return result;
                // CHECKSTYLE:OFF
            } catch (final Exception ex) {
                // CHECKSTYLE:ON
                routingHook.finishFailure(ex);
                throw ex;
            }
        }
    }
    
    • BaseShardingEngine是PreparedQueryShardingEngine的基类。
    • BaseShardingEngine代表的是PreparedQueryShardingEngine对象。
    • executeRoute执行PreparedQueryShardingEngine的route方法。

    PreparedQueryShardingEngine

    public final class PreparedQueryShardingEngine extends BaseShardingEngine {
        
        private final PreparedStatementRoutingEngine routingEngine;
        
        public PreparedQueryShardingEngine(final String sql, 
                                           final ShardingRule shardingRule, final ShardingProperties shardingProperties, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
            super(shardingRule, shardingProperties, metaData);
            routingEngine = new PreparedStatementRoutingEngine(sql, shardingRule, metaData, sqlParseEngine);
        }
        
        @Override
        protected SQLRouteResult route(final String sql, final List<Object> parameters) {
            return routingEngine.route(parameters);
        }
    }
    
    • PreparedQueryShardingEngine的routingEngine为PreparedStatementRoutingEngine对象。
    • 执行PreparedStatementRoutingEngine的route方法。

    PreparedStatementRoutingEngine

    public final class PreparedStatementRoutingEngine {
        
        private final String logicSQL;
        private final ShardingRouter shardingRouter;
        private final ShardingMasterSlaveRouter masterSlaveRouter;
        private SQLStatement sqlStatement;
        
        public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
            this.logicSQL = logicSQL;
            shardingRouter = new ShardingRouter(shardingRule, metaData, sqlParseEngine);
            masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
        }
    
        public SQLRouteResult route(final List<Object> parameters) {
            if (null == sqlStatement) {
                sqlStatement = shardingRouter.parse(logicSQL, true);
            }
            return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
        }
    }
    
    • PreparedStatementRoutingEngine的shardingRouter为ShardingRouter对象。
    • 执行ShardingRouter的route方法。

    ShardingRouter

    public final class ShardingRouter {
    
        public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
            Optional<ShardingStatementValidator> shardingStatementValidator = ShardingStatementValidatorFactory.newInstance(sqlStatement);
            if (shardingStatementValidator.isPresent()) {
                shardingStatementValidator.get().validate(shardingRule, sqlStatement, parameters);
            }
            // 1、创建SQLStatementContext
            SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getRelationMetas(), logicSQL, parameters, sqlStatement);
            Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
                    ? GeneratedKey.getGenerateKey(shardingRule, metaData.getTables(), parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();
            //2、创建ShardingConditions
            ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, generatedKey.orNull(), metaData.getRelationMetas());
            boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext);
            if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
                checkSubqueryShardingValues(sqlStatementContext, shardingConditions);
                mergeShardingConditions(shardingConditions);
            }
            //3、创建RoutingEngine对象,为StandardRoutingEngine对象。
            RoutingEngine routingEngine = RoutingEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions);
            //4、 执行流程,核心走的StandardRoutingEngine路由规则
            RoutingResult routingResult = routingEngine.route();
            if (needMergeShardingValues) {
                Preconditions.checkState(1 == routingResult.getRoutingUnits().size(), "Must have one sharding with subquery.");
            }
            //5、创建SQLRouteResult对象
            SQLRouteResult result = new SQLRouteResult(sqlStatementContext, shardingConditions, generatedKey.orNull());
            result.setRoutingResult(routingResult);
    
            if (sqlStatementContext instanceof InsertSQLStatementContext) {
                setGeneratedValues(result);
            }
            return result;
        }
    }
    
    • 1、创建SQLStatementContext对象sqlStatementContext。
    • 2、创建ShardingConditions对象shardingConditions。
    • 3、创建RoutingEngine对象routingEngine,为StandardRoutingEngine对象。
    • 4、执行routingEngine.route()并生成RoutingResult对象routingResult。
    • 5、生成SQLRouteResult对象并返回。
    • 重点关注RoutingEngine.route()的方法。

    StandardRoutingEngine

    public final class StandardRoutingEngine implements RoutingEngine {
    
        private final ShardingRule shardingRule;
        private final String logicTableName;
        private final SQLStatementContext sqlStatementContext;
        private final ShardingConditions shardingConditions;
    
        @Override
        public RoutingResult route() {
            if (isDMLForModify(sqlStatementContext.getSqlStatement()) && !sqlStatementContext.getTablesContext().isSingleTable()) {
                throw new ShardingException("Cannot support Multiple-Table for '%s'.", sqlStatementContext.getSqlStatement());
            }
            return generateRoutingResult(getDataNodes(shardingRule.getTableRule(logicTableName)));
        }
    
        private Collection<DataNode> getDataNodes(final TableRule tableRule) {
            // 根据Hint去路由
            if (isRoutingByHint(tableRule)) {
                return routeByHint(tableRule);
            }
            // 根据条件去路由
            if (isRoutingByShardingConditions(tableRule)) {
                return routeByShardingConditions(tableRule);
            }
            // 混个条件去路由
            return routeByMixedConditions(tableRule);
        }
    
        private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
            // 执行routeByShardingConditionsWithCondition方法
            return shardingConditions.getConditions().isEmpty()
                    ? route0(tableRule, Collections.<RouteValue>emptyList(), Collections.<RouteValue>emptyList()) : routeByShardingConditionsWithCondition(tableRule);
        }
    
        private Collection<DataNode> routeByShardingConditionsWithCondition(final TableRule tableRule) {
            Collection<DataNode> result = new LinkedList<>();
            for (ShardingCondition each : shardingConditions.getConditions()) {
                // Database维度的分片值,table维度的分片值
                Collection<DataNode> dataNodes = route0(tableRule, getShardingValuesFromShardingConditions(shardingRule.getDatabaseShardingStrategy(tableRule).getShardingColumns(), each),
                        getShardingValuesFromShardingConditions(shardingRule.getTableShardingStrategy(tableRule).getShardingColumns(), each));
                each.getDataNodes().addAll(dataNodes);
                result.addAll(dataNodes);
            }
            return result;
        }
    
        private Collection<DataNode> route0(final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
            // 1、先routeDataSources,tableRule是表规则,databaseShardingValues是分库的数据值
            Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);
            Collection<DataNode> result = new LinkedList<>();
            // 2、再执行routeTables,根据tableRule、dataSource、tableShardingValues是分表的数据值
            for (String each : routedDataSources) {
                result.addAll(routeTables(tableRule, each, tableShardingValues));
            }
            return result;
        }
    
        private Collection<String> routeDataSources(final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
            if (databaseShardingValues.isEmpty()) {
                return tableRule.getActualDatasourceNames();
            }
            // 执行DatabaseShardingStrategy的doSharding
            Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues));
            Preconditions.checkState(!result.isEmpty(), "no database route info");
            Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result), 
                    "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
            return result;
        }
    
        private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
            Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
            // 执行TableShardingStrategy的doSharding
            Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                    : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));
            Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
            Collection<DataNode> result = new LinkedList<>();
            for (String each : routedTables) {
                result.add(new DataNode(routedDataSource, each));
            }
            return result;
        }
    
        private RoutingResult generateRoutingResult(final Collection<DataNode> routedDataNodes) {
            RoutingResult result = new RoutingResult();
            for (DataNode each : routedDataNodes) {
                RoutingUnit routingUnit = new RoutingUnit(each.getDataSourceName());
                routingUnit.getTableUnits().add(new TableUnit(logicTableName, each.getTableName()));
                result.getRoutingUnits().add(routingUnit);
            }
            return result;
        }
    }
    
    • StandardRoutingEngine的route的核心逻辑在route0方法当中。
    • route0的核心逻辑:1、先根据database的分片值和分片规则确定database,在database确定的前提下根据table的分片值和分片规则确定table。
    • 结合database和table的分片结果组装返回结果。
    • routeDataSources负责实现dataSource的路由。
    • routeTables负责实现table的路由。

    executeUpdate过程

    public final class PreparedStatementExecutor extends AbstractStatementExecutor {
    
        public void init(final SQLRouteResult routeResult) throws SQLException {
            setSqlStatementContext(routeResult.getSqlStatementContext());
            // 初始化PreparedStatementExecutor的executeGroups对象
            getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
            cacheStatements();
        }
        
        private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
            return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {
                
                @Override
                public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                    return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
                }
                
                @Override
                public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
                    return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
                }
            });
        }
    
        public int executeUpdate() throws SQLException {
            final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
            // 生成SQLExecuteCallback对象
            SQLExecuteCallback<Integer> executeCallback = SQLExecuteCallbackFactory.getPreparedUpdateSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
            List<Integer> results = executeCallback(executeCallback);
            if (isAccumulate()) {
                return accumulate(results);
            } else {
                return results.get(0);
            }
        }
    }
    
    
    public abstract class AbstractStatementExecutor {
    
        private final SQLExecuteTemplate sqlExecuteTemplate;
    
        private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups = new LinkedList<>();
    
        protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
            // 通过sqlExecuteTemplate的executeGroup继续执行
            List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
            refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
            return result;
        }
    }
    
    public final class SQLExecuteCallbackFactory {
        
        public static SQLExecuteCallback<Integer> getPreparedUpdateSQLExecuteCallback(final DatabaseType databaseType, final boolean isExceptionThrown) {
            return new SQLExecuteCallback<Integer>(databaseType, isExceptionThrown) {
                
                @Override
                protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                    return ((PreparedStatement) statement).executeUpdate();
                }
            };
        }
    
        public static SQLExecuteCallback<Boolean> getPreparedSQLExecuteCallback(final DatabaseType databaseType, final boolean isExceptionThrown) {
            return new SQLExecuteCallback<Boolean>(databaseType, isExceptionThrown) {
                
                @Override
                protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                    return ((PreparedStatement) statement).execute();
                }
            };
        }
    }
    
    • PreparedStatementExecutor通过sqlExecuteTemplate.executeGroup执行操作。
    public final class SQLExecuteTemplate {
    
        private final ShardingExecuteEngine executeEngine;    
        private final boolean serial;
    
        public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups, final SQLExecuteCallback<T> callback) throws SQLException {
            return executeGroup(sqlExecuteGroups, null, callback);
        }
    
        public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups,
                                        final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
            try {
                return executeEngine.groupExecute((Collection) sqlExecuteGroups, firstCallback, callback, serial);
            } catch (final SQLException ex) {
                ExecutorExceptionHandler.handleException(ex);
                return Collections.emptyList();
            }
        }
    }
    
    • SQLExecuteTemplate通过ShardingExecuteEngine的groupExecute继续执行。
    public final class ShardingExecuteEngine implements AutoCloseable {
    
        public <I, O> List<O> groupExecute(
            final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback, final boolean serial)
            throws SQLException {
            if (inputGroups.isEmpty()) {
                return Collections.emptyList();
            }
            return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
        }
    
        private <I, O> List<O> serialExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
                                             final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
            Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
            ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
            List<O> result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback));
            for (ShardingExecuteGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
                result.addAll(syncGroupExecute(each, callback));
            }
            return result;
        }
    
        private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
            return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap());
        }
    
        private <I, O> List<O> parallelExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
                                               final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
            Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
            ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
            Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);
            return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
        }
    
        private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) {
            Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
            for (ShardingExecuteGroup<I> each : inputGroups) {
                result.add(asyncGroupExecute(each, callback));
            }
            return result;
        }
    
        private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
            final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
            return executorService.submit(new Callable<Collection<O>>() {
                
                @Override
                public Collection<O> call() throws SQLException {
                    return callback.execute(inputGroup.getInputs(), false, dataMap);
                }
            });
        }
    }
    

    分库分表中间结果

    t_address t_order t_order_item

    相关文章

      网友评论

        本文标题:Sharding-JDBC 执行整体流程

        本文链接:https://www.haomeiwen.com/subject/rhxsahtx.html