美文网首页数据库
Sharding-JDBC源码解析

Sharding-JDBC源码解析

作者: aiwen2017 | 来源:发表于2019-08-06 00:29 被阅读0次

    1. Sharding-JDBC初始化

    Sharding-JDBC的初始化主要包括两个方面:

    1. 数据源元数据信息和表元数据信息的收集。
    2. 表分库分表策略和算法的配置信息收集。

    工厂类ShardingDataSourceFactory.createDataSource()方法在创建Sharding-JDBC的数据源实现类ShardingDataSource的同时还创建了ShardingRuleShardingContext两个核心类的对象,如下相关源码:

    public final class ShardingDataSourceFactory {
        
        public static DataSource createDataSource(
                final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Map<String, Object> configMap, final Properties props) throws SQLException {
            return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), configMap, props);
        }
    }
    
    public class ShardingDataSource extends AbstractDataSourceAdapter {
        
        private final ShardingContext shardingContext;
        
        public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Map<String, Object> configMap, final Properties props) throws SQLException {
            super(dataSourceMap);
            checkDataSourceType(dataSourceMap);
            if (!configMap.isEmpty()) {
                ConfigMapContext.getInstance().getConfigMap().putAll(configMap);
            }
            shardingContext = new ShardingContext(getDataSourceMap(), shardingRule, getDatabaseType(), props);
        }
    }
    

    ShardingContext持有两个属性ShardingRule、ShardingMetaData,ShardingRule保存了表的分库分表配置,这些配置包括分库策略以及算法、分表策略以及算法,也就是说根据一个表以及这个表的列可以从ShardingRule中获取这个表的分库分表策略和算法。ShardingMetaData则维护了数据源和表的元数据信息,其有两个属性:ShardingDataSourceMetaData和ShardingTableMetaData,分表表示数据源的元数据信息和表的元数据信息,这两个属性在ShardingMetaData的构造函数中被创建,源码如下:

    public final class ShardingMetaData {
        
        private final ShardingDataSourceMetaData dataSource;
        
        private final ShardingTableMetaData table;
        
        public ShardingMetaData(final Map<String, String> dataSourceURLs, final ShardingRule shardingRule, final DatabaseType databaseType, final ShardingExecuteEngine executeEngine, 
                                final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) {
            dataSource = new ShardingDataSourceMetaData(dataSourceURLs, shardingRule, databaseType);
            table = new ShardingTableMetaData(new TableMetaDataInitializer(dataSource, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData).load(shardingRule));
        }
    }
    

    ShardingRuleConfiguration是分库分表配置的核心和入口,它可以包含多个TableRuleConfiguration和MasterSlaveRuleConfiguration。每一组相同规则分片的表配置一个TableRuleConfiguration。一个TableRuleConfiguration表示一个表的分库分表策略配置,其持有两个类型为ShardingStrategyConfiguration的属性:databaseShardingStrategyConfig和tableShardingStrategyConfig,分别表示分库策略配置和分表策略配置。ShardingStrategyConfiguration有如下四种实现:

    • StandardShardingStrategyConfiguration 支持精确分片和范围分片
    • ComplexShardingStrategyConfiguration 支持复杂分表
    • HintShardingStrategyConfiguration 强制某种策略分片
    • InlineShardingStrategyConfiguration 支持表达式分片
      以上每种分片策略配置都关联一到两个对应的分片算法,分片算法由接口ShardingAlgorithm表示,其抽象子类有:
    • PreciseShardingAlgorithm 精确分片算法
    • RangeShardingAlgorithm 范围分片算法
    • HintShardingAlgorithm 强制分片算法
    • ComplexKeysShardingAlgorithm 复杂分片算法

    Sharding-JDBC会使用ShardingRuleConfiguration实力化TableRule对象,源码如下:

    public class ShardingRule {
        
        private final ShardingRuleConfiguration shardingRuleConfig;
        
        private final ShardingDataSourceNames shardingDataSourceNames;
        
        private final Collection<TableRule> tableRules = new LinkedList<>();
        
        private final ShardingStrategy defaultDatabaseShardingStrategy;
        
        private final ShardingStrategy defaultTableShardingStrategy;
        
        private final KeyGenerator defaultKeyGenerator;
        
        private final Collection<MasterSlaveRule> masterSlaveRules = new LinkedList<>();
        
        public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) {
            Preconditions.checkNotNull(dataSourceNames, "Data sources cannot be null.");
            Preconditions.checkArgument(!dataSourceNames.isEmpty(), "Data sources cannot be empty.");
            this.shardingRuleConfig = shardingRuleConfig;
            shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames);
            for (TableRuleConfiguration each : shardingRuleConfig.getTableRuleConfigs()) {
                // 对于每一个TableRuleConfiguration都生成一个TableRule对象,TableRule构造函数会排列组合表和数据源实力化DataNode集合
                tableRules.add(new TableRule(each, shardingDataSourceNames));
            }
            for (String group : shardingRuleConfig.getBindingTableGroups()) {
                List<TableRule> tableRulesForBinding = new LinkedList<>();
                for (String logicTableNameForBindingTable : StringUtil.splitWithComma(group)) {
                    tableRulesForBinding.add(getTableRuleByLogicTableName(logicTableNameForBindingTable));
                }
                bindingTableRules.add(new BindingTableRule(tableRulesForBinding));
            }
            broadcastTables.addAll(shardingRuleConfig.getBroadcastTables());
            defaultDatabaseShardingStrategy = null == shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig()
                    ? new NoneShardingStrategy() : ShardingStrategyFactory.newInstance(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig());
            defaultTableShardingStrategy = null == shardingRuleConfig.getDefaultTableShardingStrategyConfig()
                    ? new NoneShardingStrategy() : ShardingStrategyFactory.newInstance(shardingRuleConfig.getDefaultTableShardingStrategyConfig());
            defaultKeyGenerator = null == shardingRuleConfig.getDefaultKeyGenerator() ? new DefaultKeyGenerator() : shardingRuleConfig.getDefaultKeyGenerator();
            for (MasterSlaveRuleConfiguration each : shardingRuleConfig.getMasterSlaveRuleConfigs()) {
                masterSlaveRules.add(new MasterSlaveRule(each));
            }
        }
        // 略
    }
    

    一个TableRule对象表示一个逻辑表的库表资源,其维护一个类型为DataNode的集合属性actualDataNodes,这个DataNode集合表示此逻辑表对应的实际库表的集合,例如:现在有两个库db0、db1,每个库有三个表,逻辑表名为t_order,那么TableRule对象的属性actualDataNodes则有6个元素:

    db0 t_order0
    db0 t_order1
    db0 t_order2
    db1 t_order0
    db1 t_order1
    db1 t_order2
    

    Sharding-JDBC做路由时即是根据此集合使用相应的算法进行实际的库表选取的。

    相关类关系图如下:

    image

    2. sql的解析与路由

    ShardingPreparedStatement实现了PreparedStatement的三个核心方法:executeQuery()、executeUpdate()、execute(),相关源码如下:

    public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
        
        @Getter
        private final ShardingConnection connection;
        
        private final PreparedStatementRoutingEngine routingEngine;
        
        private final PreparedStatementExecutor preparedStatementExecutor;
        
        private SQLRouteResult routeResult;
        
        private ResultSet currentResultSet;
        
        // 略
        
        @Override
        public ResultSet executeQuery() throws SQLException {
            ResultSet result;
            try {
                clearPrevious();
                sqlRoute();
                initPreparedStatementExecutor();
                MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(), connection.getShardingContext().getShardingRule(), 
                        routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable(), preparedStatementExecutor.executeQuery());
                result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergeEngine.merge(), this);
            } finally {
                clearBatch();
            }
            currentResultSet = result;
            return result;
        }
        
        @Override
        public int executeUpdate() throws SQLException {
            try {
                clearPrevious();
                sqlRoute();
                initPreparedStatementExecutor();
                return preparedStatementExecutor.executeUpdate();
            } finally {
                refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
                clearBatch();
            }
        }
        
        @Override
        public boolean execute() throws SQLException {
            try {
                clearPrevious();
                sqlRoute();
                initPreparedStatementExecutor();
                return preparedStatementExecutor.execute();
            } finally {
                refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
                clearBatch();
            }
        }
        
        private void sqlRoute() {
            routeResult = routingEngine.route(new ArrayList<>(getParameters()));
        }
        
        // 略
    

    从上面的源码可以看到,三个核心方法都调用到了sqlRoute()方法,此方法是sql的解析与路由的入口,sqlRoute()方法直接调用PreparedStatementRoutingEngine的route()方法,此方法首先调用shardingRouter的parse()方法sql进行解析,接着调用route()方法进行路由,相关源码如下:

    public final class PreparedStatementRoutingEngine {
        
        private final String logicSQL;
        
        private final ShardingRouter shardingRouter;
        
        private final ShardingMasterSlaveRouter masterSlaveRouter;
        
        private SQLStatement sqlStatement;
        
        public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) {
            this.logicSQL = logicSQL;
            shardingRouter = ShardingRouterFactory.newInstance(shardingRule, shardingMetaData, databaseType, showSQL);
            masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
        }
        
        public SQLRouteResult route(final List<Object> parameters) {
            if (null == sqlStatement) {
                // 调用parse()方法解析sql
                sqlStatement = shardingRouter.parse(logicSQL, true);
            }
            // 调用route()方法路由
            return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
        }
    }
    

    从上面源码可以看到ShardingRouter是解析和路由的核心接口,其实现类为ParsingSQLRouter,ParsingSQLRouter使用四个引擎对sql进行解析、解析和重写,这四个引擎为:

    • SQLParsingEngine
      解析sql,返回SQLStatement作为解析的结果。
    • OptimizeEngine
      对SQLStatement进行优化,返回ShardingConditions对象。
    • RoutingEngine
      根据库表分片配置以及ShardingConditions找到目标库表,返回RoutingResult对象。
    • SQLRewriteEngine
      根据路由结果重写sql。

    源码如下:

    public final class ParsingSQLRouter implements ShardingRouter {
        
        private final ShardingRule shardingRule;
        
        private final ShardingMetaData shardingMetaData;
        
        private final DatabaseType databaseType;
        
        @Override
        public SQLStatement parse(final String logicSQL, final boolean useCache) {
            parsingHook.start(logicSQL);
            try {
                // 1. 调用SQLParsingEngine解析sql。
                SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable()).parse(useCache);
                parsingHook.finishSuccess();
                return result;
            } catch (final Exception ex) {
                parsingHook.finishFailure(ex);
                throw ex;
            }
        }
        
        @Override
        public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
            Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement ? getGenerateKey(shardingRule, (InsertStatement) sqlStatement, parameters) : Optional.<GeneratedKey>absent();
            SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());
            // 2. 调用OptimizeEngine优化SQLStatement。
            ShardingConditions shardingConditions = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();
            if (generatedKey.isPresent()) {
                setGeneratedKeys(result, generatedKey.get());
            }
            if (sqlStatement instanceof SelectStatement && !sqlStatement.getTables().isEmpty() && !((SelectStatement) sqlStatement).getSubQueryConditions().isEmpty()) {
                mergeShardingValueForSubQuery(sqlStatement.getConditions(), shardingConditions);
            }
            // 3. 调用RoutingEngine找到目标库表。
            RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, shardingConditions).route();
            SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement, shardingConditions, parameters);
            if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
                processLimit(parameters, (SelectStatement) sqlStatement);
            }
            // 4. 调用SQLRewriteEngine重写sql。
            SQLBuilder sqlBuilder = rewriteEngine.rewrite(routingResult.isSingleRouting());
            for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
                result.getRouteUnits().add(new RouteUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder, shardingMetaData.getDataSource())));
            }
            if (showSQL) {
                SQLLogger.logSQL(logicSQL, sqlStatement, result.getRouteUnits());
            }
            return result;
        }
        
        // 略
    }
    

    相关类关系图如下:

    image

    这里重点关注一下解析和优化后的结果,即ShardingConditions对象,此对象和自定义分片算法有关,因为在自定义分片算法的时候需要知道条件列和值,这些信息都在类ShardingConditions关联的ShardingCondition的集合属性ShardingValue中。如上图所示,ShardingValue有三个子类:ListShardingValue、PreciseShardingValue、RangeShardingValue,这三个子类搭配PreciseShardingAlgorithm、RangeShardingAlgorithm、HintShardingAlgorithm、ComplexKeysShardingAlgorithm四种分片算法可以覆盖到我们绝大多数的分片业务场景。

    上面提到RoutingEngine接口,这里使用到此接口的实现类为StandardRoutingEngine,下面看下它是如何做路由的,如下为StandardRoutingEngin相关源码:

    public final class StandardRoutingEngine implements RoutingEngine {
        
        private final ShardingRule shardingRule;
        
        private final String logicTableName;
        
        private final ShardingConditions shardingConditions;
       
        @Override
        public RoutingResult route() {
            return generateRoutingResult(getDataNodes(shardingRule.getTableRuleByLogicTableName(logicTableName)));
        }
        
        private Collection<DataNode> getDataNodes(final TableRule tableRule) {
            // 根据ShardingStrategy的类型执行对应的方法
            if (isRoutingByHint(tableRule)) {
                return routeByHint(tableRule);
            }
            if (isRoutingByShardingConditions(tableRule)) {
                return routeByShardingConditions(tableRule);
            }
            return routeByMixedConditions(tableRule);
        }
        
        private Collection<DataNode> routeByHint(final TableRule tableRule) {
            return route(tableRule, getDatabaseShardingValuesFromHint(), getTableShardingValuesFromHint());
        }
        
        private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
            return shardingConditions.getShardingConditions().isEmpty() ? route(tableRule, Collections.<ShardingValue>emptyList(), Collections.<ShardingValue>emptyList())
                    : routeByShardingConditionsWithCondition(tableRule);
        }
        
        private Collection<DataNode> route(final TableRule tableRule, final List<ShardingValue> databaseShardingValues, final List<ShardingValue> tableShardingValues) {
            // 1. 先找库
            Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);
            Collection<DataNode> result = new LinkedList<>();
            for (String each : routedDataSources) {
                result.addAll(routeTables(tableRule, each, tableShardingValues));
            }
            return result;
        }
        
        private Collection<DataNode> routeByMixedConditions(final TableRule tableRule) {
            return shardingConditions.getShardingConditions().isEmpty() ? routeByMixedConditionsWithHint(tableRule) : routeByMixedConditionsWithCondition(tableRule);
        }
        
        private Collection<DataNode> routeByMixedConditionsWithCondition(final TableRule tableRule) {
            Collection<DataNode> result = new LinkedList<>();
            for (ShardingCondition each : shardingConditions.getShardingConditions()) {
                Collection<DataNode> dataNodes = route(tableRule, getDatabaseShardingValues(tableRule, each), getTableShardingValues(tableRule, each));
                reviseShardingConditions(each, dataNodes);
                result.addAll(dataNodes);
            }
            return result;
        }
        
        private Collection<DataNode> routeByMixedConditionsWithHint(final TableRule tableRule) {
            if (shardingRule.getDatabaseShardingStrategy(tableRule) instanceof HintShardingStrategy) {
                return route(tableRule, getDatabaseShardingValuesFromHint(), Collections.<ShardingValue>emptyList());
            }
            return route(tableRule, Collections.<ShardingValue>emptyList(), getTableShardingValuesFromHint());
        }
        
        private Collection<String> routeDataSources(final TableRule tableRule, final List<ShardingValue> databaseShardingValues) {
            Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
            if (databaseShardingValues.isEmpty()) {
                return availableTargetDatabases;
            }
            Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));
            Preconditions.checkState(!result.isEmpty(), "no database route info");
            return result;
        }
        
        private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<ShardingValue> tableShardingValues) {
            Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
            Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                    : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));
            Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
            Collection<DataNode> result = new LinkedList<>();
            for (String each : routedTables) {
                result.add(new DataNode(routedDataSource, each));
            }
            return result;
        }
        
        // 略
    }
    

    可以看到route()方法是入库,此方法首先通过ShardingRule获取到逻辑表所对应的TableRule对象,上面说了TableRule保存了逻辑表对应的实际的库表关系集合,接着根据库和表的ShardingStrategy的类型走了三个不同的方法:routeByHint()、routeByShardingConditions()、routeByMixedConditions(),不管走哪个方法最终都会执行到含有三个参数的route()方法,此方法先调用routeDataSources()方法路由数据源(库),接着调用routeTables()方法路由表,路由库表的方法也很简单:

    1. 从TableRule中获取可用的库表集合。
    2. 从TableRule中获取库表的分片策略ShardingStrategy对象。
    3. 执行ShardingStrategy持有的分片算法ShardingAlgorithm的doSharding()方法返回路由到的库表。

    路由的结果以RoutingResult的形式返回,接着调用SQLRewriteEngine重写sql,因为此时sql中的表还只是逻辑表名,并不是具体的哪个表,接着生成SQLUnit,并最终以SQLRouteResult形式返回路由结果。

    3. sql的执行与结果合并

    类PreparedStatementExecutor负责sql的执行,其定义了executeQuery()、executeUpdate()、execute()方法,用于对应ShardingPreparedStatement中相应方法,不过在执行这些方法之前需要先调用init()方法,init()方法根据路由的结果SQLRouteResult中的RouteUnit集合创建StatementExecuteUnit集合对象,并保存在AbstractStatementExecutor的属性executeGroups中,其间会根据逻辑数据源名称获取真实数据源DataSource,并从数据源获取连接Connection,进而从连接处获取Statement。相关源码如下:

    public void init(final SQLRouteResult routeResult) throws SQLException {
        // 把创建出的ShardingExecuteGroup<StatementExecuteUnit>对象设置到AbstractStatementExecutor的属性executeGroups中
        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
        cacheStatements();
    }
    
    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {
            
            @Override
            public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                // 从ShardingConnection处获取一定数量的连接,ShardingConnection维护了一个Map<String, DataSource>类型的属性dataSourceMap,其中保存了数据源信息。
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }
            
            @Override
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
                // 从Connection处获取PreparedStatement对象并创建StatementExecuteUnit对象。
                return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }
    
    @SuppressWarnings("MagicConstant")
    private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
        return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
    }
    

    综上,init()方法其实是把SQLRouteResult中的RouteUnit对象转换为ShardingExecuteGroup<StatementExecuteUnit>对象集合并从数据源获取连接和PreparedStatement的过程。

    接着类PreparedStatementExecutor的executeQuery()、executeUpdate()或者execute()方法被调用,不管哪个方法被调用,都会执行到PreparedStatementExecutor的父类AbstractStatementExecutor中的executeCallback(SQLExecuteCallback<T> executeCallback)方法,相关源码如下:

    public List<QueryResult> executeQuery() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
            
            @Override
            protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
                return getQueryResult(statementExecuteUnit);
            }
        };
        return executeCallback(executeCallback);
    }
    
    public int executeUpdate() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<Integer> executeCallback = SQLExecuteCallbackFactory.getPreparedUpdateSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
        List<Integer> results = executeCallback(executeCallback);
        return accumulate(results);
    }
    
    public boolean execute() throws SQLException {
        boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<Boolean> executeCallback = SQLExecuteCallbackFactory.getPreparedSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
        List<Boolean> result = executeCallback(executeCallback);
        if (null == result || result.isEmpty() || null == result.get(0)) {
            return false;
        }
        return result.get(0);
    }
    

    类AbstractStatementExecutor中相关源码:

    public class AbstractStatementExecutor {
    
        private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
        
        private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups = new LinkedList<>();
        
        protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
                return sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
        }
        // 略
    }
    

    从上面源码可以看到executeCallback()方法调用了SQLExecutePrepareTemplate的executeGroup()方法,此方法调用ShardingExecuteEngine的groupExecute()执行ShardingExecuteGroup,executeGroup()以及相关方法源码如下

    public final class ShardingExecuteEngine implements AutoCloseable {
        
        private final ShardingExecutorService shardingExecutorService;
        
        private ListeningExecutorService executorService;
        
        public ShardingExecuteEngine(final int executorSize) {
            shardingExecutorService = new ShardingExecutorService(executorSize);
            executorService = shardingExecutorService.getExecutorService();
        }
        public <I, O> List<O> groupExecute(
                final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
            if (inputGroups.isEmpty()) {
                return Collections.emptyList();
            }
            Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
            ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
            // 异步执行从第二个开始的其他ShardingExecuteGroup,如果inputGroups集合不止一个元素的话。
            Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);
            // 同步执行第一个ShardingExecuteGroup。
            return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
        }
        
        private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) {
            Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
            for (ShardingExecuteGroup<I> each : inputGroups) {
                result.add(asyncGroupExecute(each, callback));
            }
            return result;
        }
        
        private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
            final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
            return executorService.submit(new Callable<Collection<O>>() {
                
                @Override
                public Collection<O> call() throws SQLException {
                    return callback.execute(inputGroup.getInputs(), false, dataMap);
                }
            });
        }
        // 略   
    }
    

    从上面源码可以看到对于Collection<ShardingExecuteGroup<I>>集合,第一个采用同步执行,其他的提交至ListeningExecutorService执行器中采用异步执行,如果inputGroups集合不止一个元素的话。

    如果执行的是executeQuery()则返回的是一个List<QueryResult>集合,此时需要调用MergeEngine的merge()方法对QueryResult集合进行合并,MergeEngine接口有两个实现类:DQLMergeEngine和DALMergeEngine,这两个实现类分别负责数据查询sql的合并和数据库管理sql的合并,如下:

    public final class MergeEngineFactory {
        
        public static MergeEngine newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, 
                                              final SQLStatement sqlStatement, final ShardingTableMetaData shardingTableMetaData, final List<QueryResult> queryResults) throws SQLException {
            if (sqlStatement instanceof SelectStatement) {
                return new DQLMergeEngine(databaseType, (SelectStatement) sqlStatement, queryResults);
            } 
            if (sqlStatement instanceof DALStatement) {
                return new DALMergeEngine(shardingRule, queryResults, (DALStatement) sqlStatement, shardingTableMetaData);
            }
            throw new UnsupportedOperationException(String.format("Cannot support type '%s'", sqlStatement.getType()));
        }
    }
    

    相关类结构关系图:

    image

    相关文章

      网友评论

        本文标题:Sharding-JDBC源码解析

        本文链接:https://www.haomeiwen.com/subject/opdbdctx.html