美文网首页ShardingSphere
Sharding-JDBC 执行整体流程

Sharding-JDBC 执行整体流程

作者: 晴天哥_王志 | 来源:发表于2020-06-07 21:53 被阅读0次

Sharding-JDBC系列

开篇

  • 这篇文章主要是通过例子来讲解Sharding-JDBC的执行流程,但是因为每个流程都过于复杂,所以这里只能针对宏观的流程进行分析,后面等个人理解深入后再针对各个过程单独分析。
  • JDBC执行的流程需要参考具体的demo,案例可以参考shardingsphere-example
  • 执行前需要本机安装个mysql,步骤可以参考sharding-sphere学习

数据源配置

public final class ShardingTablesConfigurationPrecise implements ExampleConfiguration {
    
    @Override
    public DataSource getDataSource() throws SQLException {
        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
        shardingRuleConfig.getTableRuleConfigs().add(getOrderTableRuleConfiguration());
        shardingRuleConfig.getTableRuleConfigs().add(getOrderItemTableRuleConfiguration());
        shardingRuleConfig.getBindingTableGroups().add("t_order, t_order_item");
        shardingRuleConfig.getBroadcastTables().add("t_address");
        shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("order_id", new PreciseModuloShardingTableAlgorithm()));
        return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties());
    }

    
    private static TableRuleConfiguration getOrderTableRuleConfiguration() {
        TableRuleConfiguration result = new TableRuleConfiguration("t_order", "demo_ds.t_order_${[0, 1]}");
        result.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "order_id", getProperties()));
        return result;
    }
    
    private static TableRuleConfiguration getOrderItemTableRuleConfiguration() {
        TableRuleConfiguration result = new TableRuleConfiguration("t_order_item", "demo_ds.t_order_item_${[0, 1]}");
        result.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "order_item_id", getProperties()));
        return result;
    }
    
    private static Map<String, DataSource> createDataSourceMap() {
        Map<String, DataSource> result = new HashMap<>();
        result.put("demo_ds", DataSourceUtil.createDataSource("demo_ds"));
        return result;
    }
    
    private static Properties getProperties() {
        Properties result = new Properties();
        result.setProperty("worker.id", "123");
        return result;
    }
}
  • t_order和t_order_item表按照分表的逻辑进行保存,在同一个database当中保存2个table。
  • 我们这里暂时分析t_order表的插入过程,核心应该在于选择t_order_0还是t_order_1。

SQL执行流程

public class OrderRepositoryImpl implements OrderRepository {

    private final DataSource dataSource;
    // OrderRepositoryImpl的构造函数参数为ShardingDataSource
    public OrderRepositoryImpl(final DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public Long insert(final Order order) throws SQLException {
        String sql = "INSERT INTO t_order (user_id, address_id, status) VALUES (?, ?, ?)";

        // 1、从dataSource中获取Connection对象,dataSource是ShardingDataSource对象
        try (Connection connection = dataSource.getConnection();
            // 2、通过connection准备PreparedStatement对象
            PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
            // 3、设置preparedStatement的参数
            preparedStatement.setInt(1, order.getUserId());
            preparedStatement.setLong(2, order.getAddressId());
            preparedStatement.setString(3, order.getStatus());
            // 4、执行preparedStatement的executeUpdate
            preparedStatement.executeUpdate();
            // 5、处理ResultSet对象
            try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
                if (resultSet.next()) {
                    order.setOrderId(resultSet.getLong(1));
                }
            }
        }
        return order.getOrderId();
    }
}
  • 1、从dataSource中获取Connection对象,dataSource是ShardingDataSource对象。
  • 2、通过connection准备PreparedStatement对象。
  • 3、设置preparedStatement的参数。
  • 4、执行preparedStatement的executeUpdate。
  • 5、处理ResultSet对象。
  • mysql的核心执行流程就是按照上面的步骤执行。

核心变量介绍

ShardingRuntimeContext

public final class ShardingRuntimeContext extends AbstractRuntimeContext<ShardingRule> {
    
    private final DatabaseMetaData cachedDatabaseMetaData;
    // 核心变量ShardingSphereMetaData
    private final ShardingSphereMetaData metaData;
    private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;
    
    public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule rule, final Properties props, final DatabaseType databaseType) throws SQLException {
        super(rule, props, databaseType);
        cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap, rule);
        metaData = createMetaData(dataSourceMap, rule, databaseType);
        shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();
        shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
    }
}

public abstract class AbstractRuntimeContext<T extends BaseRule> implements RuntimeContext<T> {
    
    private final T rule;
    private final ShardingProperties props;
    private final DatabaseType databaseType;
    private final ShardingExecuteEngine executeEngine;
    private final SQLParseEngine parseEngine;
    
    protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
        this.rule = rule;
        this.props = new ShardingProperties(null == props ? new Properties() : props);
        this.databaseType = databaseType;
        executeEngine = new ShardingExecuteEngine(this.props.<Integer>getValue(ShardingPropertiesConstant.EXECUTOR_SIZE));
        parseEngine = SQLParseEngineFactory.getSQLParseEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
        ConfigurationLogger.log(rule.getRuleConfiguration());
        ConfigurationLogger.log(props);
    }
    
    @Override
    public void close() throws Exception {
        executeEngine.close();
    }
}
  • ShardingRuntimeContext作为分片上下文包含各类核心变量。
  • ShardingSphereMetaData包含分片元数据。
  • executeEngine为执行引擎。
  • parseEngine为解析引擎。

ShardingDataSource

@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter {
    
    private final ShardingRuntimeContext runtimeContext;
    
    public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
        super(dataSourceMap);
        checkDataSourceType(dataSourceMap);
        runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
    }
    
    @Override
    public final ShardingConnection getConnection() {
        return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
    }
}
  • ShardingDataSource的getConnection返回ShardingConnection对象。

ShardingConnection

public final class ShardingConnection extends AbstractConnectionAdapter {
    
    private final Map<String, DataSource> dataSourceMap;
    private final ShardingRuntimeContext runtimeContext;
    private final TransactionType transactionType;
    private final ShardingTransactionManager shardingTransactionManager;
    
    public ShardingConnection(final Map<String, DataSource> dataSourceMap, final ShardingRuntimeContext runtimeContext, final TransactionType transactionType) {
        this.dataSourceMap = dataSourceMap;
        this.runtimeContext = runtimeContext;
        this.transactionType = transactionType;
        shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);
    }

    @Override
    public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
        return new ShardingPreparedStatement(this, sql, autoGeneratedKeys);
    }
}
  • ShardingConnection的prepareStatement返回ShardingPreparedStatement对象。

ShardingPreparedStatement

public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    // 待执行的SQL
    private final String sql;
    // 路由引擎
    private final PreparedQueryShardingEngine shardingEngine;
    // 执行引擎
    private final PreparedStatementExecutor preparedStatementExecutor;
    private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
    private SQLRouteResult sqlRouteResult;
    private ResultSet currentResultSet;

    public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys);
    }

    private ShardingPreparedStatement(
            final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys)
            throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }

        this.connection = connection;
        this.sql = sql;

        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        // 核心的是PreparedQueryShardingEngine对象
        shardingEngine = new PreparedQueryShardingEngine(sql, runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine());
        preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
        batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
    }
}
  • ShardingPreparedStatement对象的包含传入的sql和connection对象。
  • ShardingPreparedStatement的核心变量包含shardingEngine、preparedStatementExecutor、batchPreparedStatementExecutor。
  • shardingEngine负责执行分片选择对应的库表。
  • preparedStatementExecutor负责执行具体的SQL。

PreparedQueryShardingEngine

public final class PreparedQueryShardingEngine extends BaseShardingEngine {
    
    private final PreparedStatementRoutingEngine routingEngine;
    
    public PreparedQueryShardingEngine(final String sql, 
                                       final ShardingRule shardingRule, final ShardingProperties shardingProperties, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
        super(shardingRule, shardingProperties, metaData);
        routingEngine = new PreparedStatementRoutingEngine(sql, shardingRule, metaData, sqlParseEngine);
    }
    
    @Override
    protected List<Object> cloneParameters(final List<Object> parameters) {
        return new ArrayList<>(parameters);
    }
    
    @Override
    protected SQLRouteResult route(final String sql, final List<Object> parameters) {
        return routingEngine.route(parameters);
    }
}
  • PreparedQueryShardingEngine包含PreparedStatementRoutingEngine对象。
  • PreparedQueryShardingEngine负责执行路由功能,核心通过routingEngine来执行。

PreparedStatementRoutingEngine

public final class PreparedStatementRoutingEngine {
    
    private final String logicSQL;
    private final ShardingRouter shardingRouter;
    private final ShardingMasterSlaveRouter masterSlaveRouter;
    private SQLStatement sqlStatement;
    
    public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
        this.logicSQL = logicSQL;
        shardingRouter = new ShardingRouter(shardingRule, metaData, sqlParseEngine);
        masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
    }

    public SQLRouteResult route(final List<Object> parameters) {
        if (null == sqlStatement) {
            sqlStatement = shardingRouter.parse(logicSQL, true);
        }
        return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
    }
}
  • PreparedStatementRoutingEngine包含shardingRouter和masterSlaveRouter。
  • shardingRouter负责执行路由功能。

变量之间关系

  • ShardingDataSource负责提供ShardingConnection。
  • ShardingConnection负责提供ShardingPreparedStatement。
  • ShardingPreparedStatement负责提供PreparedQueryShardingEngine和PreparedStatementExecutor。
  • PreparedQueryShardingEngine负责提供PreparedStatementRoutingEngine。
  • PreparedStatementRoutingEngine负责提供ShardingRouter。

Statement执行过程

public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    private final String sql;
    private final PreparedQueryShardingEngine shardingEngine;    
    private final PreparedStatementExecutor preparedStatementExecutor;
    private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
    private SQLRouteResult sqlRouteResult;
    private ResultSet currentResultSet;

    @Override
    public int executeUpdate() throws SQLException {
        try {
            // 1、清空之前的查询
            clearPrevious();
            // 2、执行shard操作
            shard();
            // 3、初始化PreparedStatementExecutor
            initPreparedStatementExecutor();
            // 4、执行preparedStatementExecutor的executeUpdate方法
            return preparedStatementExecutor.executeUpdate();
        } finally {
            clearBatch();
        }
    }

    private void clearPrevious() throws SQLException {
        preparedStatementExecutor.clear();
    }

    private void shard() {
        // shardingEngine为PreparedQueryShardingEngine
        sqlRouteResult = shardingEngine.shard(sql, getParameters());
    }

    private void initPreparedStatementExecutor() throws SQLException {
        preparedStatementExecutor.init(sqlRouteResult);
        setParametersForStatements();
        replayMethodForStatements();
    }
}

public final class PreparedStatementExecutor extends AbstractStatementExecutor {
    public int executeUpdate() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<Integer> executeCallback = SQLExecuteCallbackFactory.getPreparedUpdateSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
        List<Integer> results = executeCallback(executeCallback);
        if (isAccumulate()) {
            return accumulate(results);
        } else {
            return results.get(0);
        }
    }
}
  • clearPrevious清空之前的查询。
  • shard通过shardingEngine执行分片操作,shardingEngine为PreparedQueryShardingEngine对象。
  • initPreparedStatementExecutor负责初始化PreparedStatementExecutor。
  • preparedStatementExecutor.executeUpdate执行preparedStatementExecutor的executeUpdate动作。
  • 核心关注路由过程和executeUpdate过程。

路由过程

  • 整体路由顺序按照PreparedQueryShardingEngine
    => PreparedStatementRoutingEngine => ShardingRouter => StandardRoutingEngine 进行路由。

BaseShardingEngine

public abstract class BaseShardingEngine {

    public SQLRouteResult shard(final String sql, final List<Object> parameters) {
        List<Object> clonedParameters = cloneParameters(parameters);
        // 执行executeRoute
        SQLRouteResult result = executeRoute(sql, clonedParameters);
        result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));
        boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
        if (showSQL) {
            boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
            SQLLogger.logSQL(sql, showSimple, result.getSqlStatementContext(), result.getRouteUnits());
        }
        return result;
    }

    private SQLRouteResult executeRoute(final String sql, final List<Object> clonedParameters) {
        routingHook.start(sql);
        try {
            SQLRouteResult result = route(sql, clonedParameters);
            routingHook.finishSuccess(result, metaData.getTables());
            return result;
            // CHECKSTYLE:OFF
        } catch (final Exception ex) {
            // CHECKSTYLE:ON
            routingHook.finishFailure(ex);
            throw ex;
        }
    }
}
  • BaseShardingEngine是PreparedQueryShardingEngine的基类。
  • BaseShardingEngine代表的是PreparedQueryShardingEngine对象。
  • executeRoute执行PreparedQueryShardingEngine的route方法。

PreparedQueryShardingEngine

public final class PreparedQueryShardingEngine extends BaseShardingEngine {
    
    private final PreparedStatementRoutingEngine routingEngine;
    
    public PreparedQueryShardingEngine(final String sql, 
                                       final ShardingRule shardingRule, final ShardingProperties shardingProperties, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
        super(shardingRule, shardingProperties, metaData);
        routingEngine = new PreparedStatementRoutingEngine(sql, shardingRule, metaData, sqlParseEngine);
    }
    
    @Override
    protected SQLRouteResult route(final String sql, final List<Object> parameters) {
        return routingEngine.route(parameters);
    }
}
  • PreparedQueryShardingEngine的routingEngine为PreparedStatementRoutingEngine对象。
  • 执行PreparedStatementRoutingEngine的route方法。

PreparedStatementRoutingEngine

public final class PreparedStatementRoutingEngine {
    
    private final String logicSQL;
    private final ShardingRouter shardingRouter;
    private final ShardingMasterSlaveRouter masterSlaveRouter;
    private SQLStatement sqlStatement;
    
    public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingSphereMetaData metaData, final SQLParseEngine sqlParseEngine) {
        this.logicSQL = logicSQL;
        shardingRouter = new ShardingRouter(shardingRule, metaData, sqlParseEngine);
        masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
    }

    public SQLRouteResult route(final List<Object> parameters) {
        if (null == sqlStatement) {
            sqlStatement = shardingRouter.parse(logicSQL, true);
        }
        return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
    }
}
  • PreparedStatementRoutingEngine的shardingRouter为ShardingRouter对象。
  • 执行ShardingRouter的route方法。

ShardingRouter

public final class ShardingRouter {

    public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        Optional<ShardingStatementValidator> shardingStatementValidator = ShardingStatementValidatorFactory.newInstance(sqlStatement);
        if (shardingStatementValidator.isPresent()) {
            shardingStatementValidator.get().validate(shardingRule, sqlStatement, parameters);
        }
        // 1、创建SQLStatementContext
        SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getRelationMetas(), logicSQL, parameters, sqlStatement);
        Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
                ? GeneratedKey.getGenerateKey(shardingRule, metaData.getTables(), parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();
        //2、创建ShardingConditions
        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, generatedKey.orNull(), metaData.getRelationMetas());
        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext);
        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
            checkSubqueryShardingValues(sqlStatementContext, shardingConditions);
            mergeShardingConditions(shardingConditions);
        }
        //3、创建RoutingEngine对象,为StandardRoutingEngine对象。
        RoutingEngine routingEngine = RoutingEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions);
        //4、 执行流程,核心走的StandardRoutingEngine路由规则
        RoutingResult routingResult = routingEngine.route();
        if (needMergeShardingValues) {
            Preconditions.checkState(1 == routingResult.getRoutingUnits().size(), "Must have one sharding with subquery.");
        }
        //5、创建SQLRouteResult对象
        SQLRouteResult result = new SQLRouteResult(sqlStatementContext, shardingConditions, generatedKey.orNull());
        result.setRoutingResult(routingResult);

        if (sqlStatementContext instanceof InsertSQLStatementContext) {
            setGeneratedValues(result);
        }
        return result;
    }
}
  • 1、创建SQLStatementContext对象sqlStatementContext。
  • 2、创建ShardingConditions对象shardingConditions。
  • 3、创建RoutingEngine对象routingEngine,为StandardRoutingEngine对象。
  • 4、执行routingEngine.route()并生成RoutingResult对象routingResult。
  • 5、生成SQLRouteResult对象并返回。
  • 重点关注RoutingEngine.route()的方法。

StandardRoutingEngine

public final class StandardRoutingEngine implements RoutingEngine {

    private final ShardingRule shardingRule;
    private final String logicTableName;
    private final SQLStatementContext sqlStatementContext;
    private final ShardingConditions shardingConditions;

    @Override
    public RoutingResult route() {
        if (isDMLForModify(sqlStatementContext.getSqlStatement()) && !sqlStatementContext.getTablesContext().isSingleTable()) {
            throw new ShardingException("Cannot support Multiple-Table for '%s'.", sqlStatementContext.getSqlStatement());
        }
        return generateRoutingResult(getDataNodes(shardingRule.getTableRule(logicTableName)));
    }

    private Collection<DataNode> getDataNodes(final TableRule tableRule) {
        // 根据Hint去路由
        if (isRoutingByHint(tableRule)) {
            return routeByHint(tableRule);
        }
        // 根据条件去路由
        if (isRoutingByShardingConditions(tableRule)) {
            return routeByShardingConditions(tableRule);
        }
        // 混个条件去路由
        return routeByMixedConditions(tableRule);
    }

    private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
        // 执行routeByShardingConditionsWithCondition方法
        return shardingConditions.getConditions().isEmpty()
                ? route0(tableRule, Collections.<RouteValue>emptyList(), Collections.<RouteValue>emptyList()) : routeByShardingConditionsWithCondition(tableRule);
    }

    private Collection<DataNode> routeByShardingConditionsWithCondition(final TableRule tableRule) {
        Collection<DataNode> result = new LinkedList<>();
        for (ShardingCondition each : shardingConditions.getConditions()) {
            // Database维度的分片值,table维度的分片值
            Collection<DataNode> dataNodes = route0(tableRule, getShardingValuesFromShardingConditions(shardingRule.getDatabaseShardingStrategy(tableRule).getShardingColumns(), each),
                    getShardingValuesFromShardingConditions(shardingRule.getTableShardingStrategy(tableRule).getShardingColumns(), each));
            each.getDataNodes().addAll(dataNodes);
            result.addAll(dataNodes);
        }
        return result;
    }

    private Collection<DataNode> route0(final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
        // 1、先routeDataSources,tableRule是表规则,databaseShardingValues是分库的数据值
        Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);
        Collection<DataNode> result = new LinkedList<>();
        // 2、再执行routeTables,根据tableRule、dataSource、tableShardingValues是分表的数据值
        for (String each : routedDataSources) {
            result.addAll(routeTables(tableRule, each, tableShardingValues));
        }
        return result;
    }

    private Collection<String> routeDataSources(final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
        if (databaseShardingValues.isEmpty()) {
            return tableRule.getActualDatasourceNames();
        }
        // 执行DatabaseShardingStrategy的doSharding
        Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues));
        Preconditions.checkState(!result.isEmpty(), "no database route info");
        Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result), 
                "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
        return result;
    }

    private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
        Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
        // 执行TableShardingStrategy的doSharding
        Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));
        Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
        Collection<DataNode> result = new LinkedList<>();
        for (String each : routedTables) {
            result.add(new DataNode(routedDataSource, each));
        }
        return result;
    }

    private RoutingResult generateRoutingResult(final Collection<DataNode> routedDataNodes) {
        RoutingResult result = new RoutingResult();
        for (DataNode each : routedDataNodes) {
            RoutingUnit routingUnit = new RoutingUnit(each.getDataSourceName());
            routingUnit.getTableUnits().add(new TableUnit(logicTableName, each.getTableName()));
            result.getRoutingUnits().add(routingUnit);
        }
        return result;
    }
}
  • StandardRoutingEngine的route的核心逻辑在route0方法当中。
  • route0的核心逻辑:1、先根据database的分片值和分片规则确定database,在database确定的前提下根据table的分片值和分片规则确定table。
  • 结合database和table的分片结果组装返回结果。
  • routeDataSources负责实现dataSource的路由。
  • routeTables负责实现table的路由。

executeUpdate过程

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    public void init(final SQLRouteResult routeResult) throws SQLException {
        setSqlStatementContext(routeResult.getSqlStatementContext());
        // 初始化PreparedStatementExecutor的executeGroups对象
        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
        cacheStatements();
    }
    
    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {
            
            @Override
            public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }
            
            @Override
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
                return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }

    public int executeUpdate() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        // 生成SQLExecuteCallback对象
        SQLExecuteCallback<Integer> executeCallback = SQLExecuteCallbackFactory.getPreparedUpdateSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
        List<Integer> results = executeCallback(executeCallback);
        if (isAccumulate()) {
            return accumulate(results);
        } else {
            return results.get(0);
        }
    }
}


public abstract class AbstractStatementExecutor {

    private final SQLExecuteTemplate sqlExecuteTemplate;

    private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups = new LinkedList<>();

    protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
        // 通过sqlExecuteTemplate的executeGroup继续执行
        List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
        refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
        return result;
    }
}

public final class SQLExecuteCallbackFactory {
    
    public static SQLExecuteCallback<Integer> getPreparedUpdateSQLExecuteCallback(final DatabaseType databaseType, final boolean isExceptionThrown) {
        return new SQLExecuteCallback<Integer>(databaseType, isExceptionThrown) {
            
            @Override
            protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return ((PreparedStatement) statement).executeUpdate();
            }
        };
    }

    public static SQLExecuteCallback<Boolean> getPreparedSQLExecuteCallback(final DatabaseType databaseType, final boolean isExceptionThrown) {
        return new SQLExecuteCallback<Boolean>(databaseType, isExceptionThrown) {
            
            @Override
            protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return ((PreparedStatement) statement).execute();
            }
        };
    }
}
  • PreparedStatementExecutor通过sqlExecuteTemplate.executeGroup执行操作。
public final class SQLExecuteTemplate {

    private final ShardingExecuteEngine executeEngine;    
    private final boolean serial;

    public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups, final SQLExecuteCallback<T> callback) throws SQLException {
        return executeGroup(sqlExecuteGroups, null, callback);
    }

    public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups,
                                    final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
        try {
            return executeEngine.groupExecute((Collection) sqlExecuteGroups, firstCallback, callback, serial);
        } catch (final SQLException ex) {
            ExecutorExceptionHandler.handleException(ex);
            return Collections.emptyList();
        }
    }
}
  • SQLExecuteTemplate通过ShardingExecuteEngine的groupExecute继续执行。
public final class ShardingExecuteEngine implements AutoCloseable {

    public <I, O> List<O> groupExecute(
        final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback, final boolean serial)
        throws SQLException {
        if (inputGroups.isEmpty()) {
            return Collections.emptyList();
        }
        return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
    }

    private <I, O> List<O> serialExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
                                         final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
        ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
        List<O> result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback));
        for (ShardingExecuteGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
            result.addAll(syncGroupExecute(each, callback));
        }
        return result;
    }

    private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap());
    }

    private <I, O> List<O> parallelExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
                                           final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
        ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);
        return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
    }

    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
        for (ShardingExecuteGroup<I> each : inputGroups) {
            result.add(asyncGroupExecute(each, callback));
        }
        return result;
    }

    private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
        final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
        return executorService.submit(new Callable<Collection<O>>() {
            
            @Override
            public Collection<O> call() throws SQLException {
                return callback.execute(inputGroup.getInputs(), false, dataMap);
            }
        });
    }
}

分库分表中间结果

t_address t_order t_order_item

相关文章

网友评论

    本文标题:Sharding-JDBC 执行整体流程

    本文链接:https://www.haomeiwen.com/subject/rhxsahtx.html