从源码看ShardingSphere设计-总览篇

作者: 蚊子squirrel | 来源:发表于2020-08-28 21:51 被阅读0次

如果是早期使用过ShardingSphere的开发者,大多都知道ShardingSphere源于sharding-jdbc。sharding-jdbc 1.x版本主要功能就是分库分表、读写分离、分布式主键;在2.x版本增加了orchestration,3.x版本中增加了sharding-proxy、opentracing等功能,4.x版本进入了apache基金会,增加了加解密、影子表、扩容等。除了这些功能层面,在代码上其实各个版本对代码都进行了大量的重构,例如早期SQL的解析使用的alibaba的druid,后来又实现了一个简化版的SQL解析器,再后来统一基于antlr重新改写;早期内核引擎还有优化引擎等,后面重构到了重写引擎了;正在开发的5.x目标是pluggable,对代码结构、包类名能都有较大调整,对开发者也更友好。作为一名开发者,我觉得ShardingSphere不止是一个数据库中间件,而是一个围绕SQL、DB的开发平台和工具集。同时其代码质量也很高,对其进行源码分析更是可以学到很多软件设计与开发的知识。

本文不介绍ShardingSphere的具体功能,关于ShardingSphere功能,官网有更完整的介绍https://shardingsphere.apache.org/document/current/en/overview

本系列基于的源码是当前最新release版本4.1.1,但涉及到差异较大的也会跟5.0.0-RC1-SNAPSHOT进行比较。4.1.1版本共有18个子项目,具体名称分类如下:


sharding-sphere 4.1.1代码目录

根据功能划分,本系列会包含多个章节篇幅:

  1. 总览
  2. 解析引擎
  3. 路由引擎
  4. 改写引擎
  5. 执行引擎
  6. 归并引擎
  7. 事务篇
  8. JDBC篇
  9. Proxy篇
  10. Ochestration篇
  11. 扩容篇
  12. 5.x代码变化

本文为总览篇,会通过快速浏览下其执行流程,了解涉及到的类与方法的职责与定位,从而对ShardingSphere的内核形成一个整体认识,另外会总结下使用的设计模式、代码结构、代码风格,以及4.1.1版本中目前存在的问题。

代码调用分析

我们看源代码,需要一个入口,ShardingSphere中最成熟、使用率最高的莫过于sharding-jdbc,因此我们就从sharding-jdbc作为代码分析的切入点。从名字就可以看出sharding-jdbc支持JDBC,熟悉JDBC规范的开发者都知道其核心就是DataSource、Connection、Statement、PrepareStatement等接口,在sharding-jdbc中,这些接口的实现类分别对应ShardingDataSource、ShardingConnection、ShardingStatment、ShardingPreparedStatement类。接下来就从一条查询SQL出发,顺着方法的调用脉络看下这些类的代码:

为了在代码分析过程中更好的定位在调用链所处位置,会在通过加标题来注明接下来代码所属于的功能范畴。

JDBC

org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource

public class ShardingDataSource extends AbstractDataSourceAdapter {
        private final ShardingRuntimeContext runtimeContext;
…
    @Override
    public final ShardingConnection getConnection() {
        return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
    }
}

org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection

public final class ShardingConnection extends AbstractConnectionAdapter {
       …
    @Override
    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        return new ShardingPreparedStatement(this, sql);
}
…
    @Override
    public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
        return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
    }
…
}

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingStatement

/**
 * Statement that support sharding.
 */
public final class ShardingStatement extends AbstractStatementAdapter {
…
@Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        ResultSet result;
        try {
            executionContext = prepare(sql);
            List<QueryResult> queryResults = statementExecutor.executeQuery();
            MergedResult mergedResult = mergeQuery(queryResults);
            result = new ShardingResultSet(statementExecutor.getResultSets(), mergedResult, this, executionContext);
        } finally {
            currentResultSet = null;
        }
        currentResultSet = result;
        return result;
}
…
}

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement

 /**
 * PreparedStatement that support sharding.
 */
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
    …
 public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
            result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }

可以看到,真正进行sql解析、路由都在Statement实现类中。
我们以ShardingPreparedStatement.executeQuery()为例,看看整个流程。
clearPrevious()负责PreparedStatementExecutor的重置,因为一个Statement可以多次执行多个SQL,每次执行完SQL,PreparedStatementExecutor会记录真实的Statement,connection,该方法负责关闭statement,清理记录的参数、连接等。

public void clear() throws SQLException {
        clearStatements();
        statements.clear();
        parameterSets.clear();
        connections.clear();
        resultSets.clear();
        inputGroups.clear();
    }

executeQuery方法中剩余的代码则完成SQL的解析、路由、改写、执行与合并。

SQL处理流程
Prepare引擎

接下来看下prepare,这个方法其实完成prepare引擎的执行和自增key的生成添加。

private void prepare() {
        executionContext = prepareEngine.prepare(sql, getParameters());
        findGeneratedKey().ifPresent(generatedKey -> generatedValues.add(generatedKey.getGeneratedValues().getLast()));
    }

prepare引擎并不在官方内核引擎范围,因为它完成的事其实就是调用解析引擎、路由引擎、改写引擎进行SQL的解析、路由操作,相当于这些内核引擎的编排执行,其对应的类图如下:
其对应的类层次如下:


看下prepare引擎的prepare方法
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine

public ExecutionContext prepare(final String sql, final List<Object> parameters) {
        List<Object> clonedParameters = cloneParameters(parameters);
        RouteContext routeContext = executeRoute(sql, clonedParameters);// SQL路由
        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));// SQL改写
        if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
            SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
        }
        return result;
}
…
private RouteContext executeRoute(final String sql, final List<Object> clonedParameters) {
        registerRouteDecorator();
        return route(router, sql, clonedParameters);
    }
    
    private void registerRouteDecorator() {
        for (Class<? extends RouteDecorator> each : OrderedRegistry.getRegisteredClasses(RouteDecorator.class)) {
            RouteDecorator routeDecorator = createRouteDecorator(each);
            Class<?> ruleClass = (Class<?>) routeDecorator.getType();
            rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
                    .forEach(rule -> router.registerDecorator(rule, routeDecorator));
        }
    }
    …
    protected abstract RouteContext route(DataNodeRouter dataNodeRouter, String sql, List<Object> parameters);
…
}

可以看到prepare方法中核心的就是路由(executeRoute)和改写(executeRewrite)操作,分别看下这两个方法:

executeRoute方法实现了SQL的路由,包括registerRouteDecorator方法注册路由的装饰器(基于SPI的自定义扩展)以及route方法进行路由(计算应该运行在哪个库和操作哪张表),展开看看route的是怎么实现:

路由route

route方法由子类完成,即调用DataNodeRouter的route方法,看下BasePrepareEngine的子类PreparedQueryPrepareEngine。

org.apache.shardingsphere.underlying.pluggble.prepare.PreparedQueryPrepareEngine

   @Override
    protected RouteContext route(final DataNodeRouter dataNodeRouter, final String sql, final List<Object> parameters) {
        return dataNodeRouter.route(sql, parameters, true);
    }

继续看org.apache.shardingsphere.underlying.route.DataNodeRouter,这个类中完成了路由上下文RouteContext的生成。在executeRoute方法中有两个操作:

  1. 调用createRouteContext方法,该方法中调用解析引擎完成了SQL的解析,创建了一个初始的RouteContext实例;
  2. 依次调用注册的RouteDecorator实现类decorate方法,根据路由结果对RouteContext对象进行对应的操作。

org.apache.shardingsphere.underlying.route.DataNodeRouter

public final class DataNodeRouter {
…
 public RouteContext route(final String sql, final List<Object> parameters, final boolean useCache) {
        routingHook.start(sql);
        try {
            RouteContext result = executeRoute(sql, parameters, useCache);//进行路由计算,生成路由结果
            routingHook.finishSuccess(result, metaData.getSchema());
            return result;
        } catch (final Exception ex) {
            routingHook.finishFailure(ex);
            throw ex;
        }
    }
    
    @SuppressWarnings("unchecked")
    private RouteContext executeRoute(final String sql, final List<Object> parameters, final boolean useCache) {
        RouteContext result = createRouteContext(sql, parameters, useCache);
        for (Entry<BaseRule, RouteDecorator> entry : decorators.entrySet()) {
            result = entry.getValue().decorate(result, metaData, entry.getKey(), properties);
        }
        return result;
    }
    
    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);//解析SQL,生成SQL对应AST
        try {
            SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);// 生成SQL Statement上下文,相当于一部分语义分析
            return new RouteContext(sqlStatementContext, parameters, new RouteResult());
        } catch (final IndexOutOfBoundsException ex) {
            return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());
        }
    }
}
SQL解析

可以看到在createRouteContext 方法,完成了两个操作:

  1. 调用parserEngine.parse方法,即通过SQL解析引擎对SQL进行解析,进而转化成AST,即SQLStatement接口实现类;
  2. 通过SQLStatementContextFactory方法将SQLStatement实例转化为SQLStatementContext实例。

SQLStatement的类层级关系如下:


目前sharingsphere4.1.1中SQLStatement的实现类有83个,上图中为了展示方面只包含了主要几个。

这里涉及到了ShardingSphere的一个内核引擎:SQL解析引擎,这里我们进入SQLParserEngine类,看下SQL解析引擎的实现。
org.apache.shardingsphere.sql.parser.SQLParserEngine

public final class SQLParserEngine {
    private final String databaseTypeName;
private final SQLParseResultCache cache = new SQLParseResultCache(); 
…   
    /**
     * Parse SQL.
     *
     * @param sql SQL
     * @param useCache use cache or not
     * @return SQL statement
     */
       public SQLStatement parse(final String sql, final boolean useCache) {
        ParsingHook parsingHook = new SPIParsingHook();// SQL解析hook
        parsingHook.start(sql);
        try {
            SQLStatement result = parse0(sql, useCache);
            parsingHook.finishSuccess(result);
            return result;
            // CHECKSTYLE:OFF
        } catch (final Exception ex) {
            // CHECKSTYLE:ON
            parsingHook.finishFailure(ex);
            throw ex;
        }
}
    
 private SQLStatement parse0(final String sql, final boolean useCache) {
        if (useCache) {
            Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
            if (cachedSQLStatement.isPresent()) {// 如果缓存中有该SQL的解析结果,则直接复用
                return cachedSQLStatement.get();
            }
        }
        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();// 1. 解析SQL生成AST,ParseTree是antlr对应的解析树接口
        SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);//2. 通过访问者模式,将antlr的解析树转化为SQLStatement
        if (useCache) {
            cache.put(sql, result);
        }
        return result;
    }

可以看到其内部通过调用SQLParserExecutor.execute()方法生成的解析树ParseASTNode,然后通过ParseTreeVisitor访问解析树,生成SQLStatement,看下SQLParserExecutor和ParseTreeVisitorFactory类

org.apache.shardingsphere.sql.parser.core.parser.SQLParserExecutor

public final class SQLParserExecutor {
    
    private final String databaseTypeName;
    
    private final String sql;
    
    /**
     * Execute to parse SQL.
     *
     * @return AST node
     */
    public ParseASTNode execute() {
        ParseASTNode result = towPhaseParse();
        if (result.getRootNode() instanceof ErrorNode) {
            throw new SQLParsingException(String.format("Unsupported SQL of `%s`", sql));
        }
        return result;
    }
    
    private ParseASTNode towPhaseParse() {//名称可能拼错了,可能是two towPhaseParse?
        SQLParser sqlParser = SQLParserFactory.newInstance(databaseTypeName, sql);//创建该类型数据库对应的SQL解析器
        try {
            ((Parser) sqlParser).setErrorHandler(new BailErrorStrategy());
            ((Parser) sqlParser).getInterpreter().setPredictionMode(PredictionMode.SLL);
            return (ParseASTNode) sqlParser.parse();
        } catch (final ParseCancellationException ex) {
            ((Parser) sqlParser).reset();
            ((Parser) sqlParser).setErrorHandler(new DefaultErrorStrategy());
            ((Parser) sqlParser).getInterpreter().setPredictionMode(PredictionMode.LL);
            return (ParseASTNode) sqlParser.parse();
        }
    }

org.apache.shardingsphere.sql.parser.core.visitor.ParseTreeVisitorFactory

/**
 * Parse tree visitor factory.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ParseTreeVisitorFactory {
    
    /** 
     * New instance of SQL visitor.
     * 
     * @param databaseTypeName name of database type
     * @param visitorRule visitor rule
     * @return parse tree visitor
     */
    public static ParseTreeVisitor newInstance(final String databaseTypeName, final VisitorRule visitorRule) {
        for (SQLParserConfiguration each : NewInstanceServiceLoader.newServiceInstances(SQLParserConfiguration.class)) {
            if (each.getDatabaseTypeName().equals(databaseTypeName)) {
                return createParseTreeVisitor(each, visitorRule.getType());
            }
        }
        throw new UnsupportedOperationException(String.format("Cannot support database type '%s'", databaseTypeName));
    }
    
    @SneakyThrows
    private static ParseTreeVisitor createParseTreeVisitor(final SQLParserConfiguration configuration, final SQLStatementType type) {
        SQLVisitorFacade visitorFacade = configuration.getVisitorFacadeClass().getConstructor().newInstance();
        switch (type) {
            case DML:
                return (ParseTreeVisitor) visitorFacade.getDMLVisitorClass().getConstructor().newInstance();
            case DDL:
                return (ParseTreeVisitor) visitorFacade.getDDLVisitorClass().getConstructor().newInstance();
            case TCL:
                return (ParseTreeVisitor) visitorFacade.getTCLVisitorClass().getConstructor().newInstance();
            case DCL:
                return (ParseTreeVisitor) visitorFacade.getDCLVisitorClass().getConstructor().newInstance();
            case DAL:
                return (ParseTreeVisitor) visitorFacade.getDALVisitorClass().getConstructor().newInstance();
            case RL:
                return (ParseTreeVisitor) visitorFacade.getRLVisitorClass().getConstructor().newInstance();
            default:
                throw new SQLParsingException("Can not support SQL statement type: `%s`", type);
        }
    }
}

SQLParserExecutor和ParseTreeVisitorFactory已经开始使用了antlr的类,这里我们先跳过,在后面的SQL解析篇章中再详细分析。

回到DataNodeRouter类createRouteContext方法,在获得SQL对应的AST(SQLStatement)对象后,通过SQLStatementContextFactory将SQLStatement转换为SQLStatementContext对象。
SQLStatementContext根据不同的SQL类型,有多种实现类,以Select类型为例org.apache.shardingsphere.sql.parser.binder.statement.dml.SelectStatementContext,其类结构如下:


可以看到其包含了Select SQL中包含的table、project、groupby、orderby、分页等上下文信息。
路由route

到此并未看到分库分表或者主从时真正的路由逻辑,其实这些操作都放到了这些RouteDecorator,看下RouterDecorator接口的实现类。


我们看下分库分表功能对应的路由修饰器类ShardingRouteDecorator类。
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator

 public final class ShardingRouteDecorator implements RouteDecorator<ShardingRule> {

 @Override
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
        List<Object> parameters = routeContext.getParameters();
        // 对SQL进行验证,主要用于判断一些不支持的SQL,在分片功能中不支持INSERT INTO .... ON DUPLICATE KEY、不支持更新sharding key
        ShardingStatementValidatorFactory.newInstance(
                sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
        // 获取SQL的条件信息
        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
            // 检查所有Sharding值(表、列、值)是不是相同,如果不相同则抛出异常
            checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
            //剔除重复的sharding条件信息
            mergeShardingConditions(shardingConditions);
        }
        // 创建分片路由引擎
        ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
        // 进行路由,生成路由结果
        RouteResult routeResult = shardingRouteEngine.route(shardingRule);
        if (needMergeShardingValues) {
            Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
        }
        return new RouteContext(sqlStatementContext, parameters, routeResult);
}
…
}

其中最关键的以下代码:

 ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
        RouteResult routeResult = shardingRouteEngine.route(shardingRule);
…
  return new RouteContext(sqlStatementContext, parameters, routeResult);

通过ShardingRouteEngineFactory创建了Sharding路由引擎,调用route方法,然后根据生成的RouteResult,创建新的RouteContext进行了返回。

ShardingRouteEngineFactory是ShardingRouteEngine的工厂类,会根据SQL类型创建不同ShardingRouteEngine,因为不同的类型的SQL对应着的不同的路由策略,例如全库路由、全库表路由、单库路由、标准路由等。
org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngineFactory

/**
 * Sharding routing engine factory.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingRouteEngineFactory {

        /**
         * Create new instance of routing engine.
         *
         * @param shardingRule sharding rule
         * @param metaData meta data of ShardingSphere
         * @param sqlStatementContext SQL statement context
         * @param shardingConditions shardingConditions
         * @param properties sharding sphere properties
         * @return new instance of routing engine
         */
public static ShardingRouteEngine newInstance(final ShardingRule shardingRule,
                                                  final ShardingSphereMetaData metaData, final SQLStatementContext sqlStatementContext,
                                                  final ShardingConditions shardingConditions, final ConfigurationProperties properties) {
        SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
        Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
        if (sqlStatement instanceof TCLStatement) {
            return new ShardingDatabaseBroadcastRoutingEngine();
        }
        if (sqlStatement instanceof DDLStatement) {
            return new ShardingTableBroadcastRoutingEngine(metaData.getSchema(), sqlStatementContext);
        }
        if (sqlStatement instanceof DALStatement) {
            return getDALRoutingEngine(shardingRule, sqlStatement, tableNames);
        }
        if (sqlStatement instanceof DCLStatement) {
            return getDCLRoutingEngine(sqlStatementContext, metaData);
        }
        if (shardingRule.isAllInDefaultDataSource(tableNames)) {
            return new ShardingDefaultDatabaseRoutingEngine(tableNames);
        }
        if (shardingRule.isAllBroadcastTables(tableNames)) {
            return sqlStatement instanceof SelectStatement ? new ShardingUnicastRoutingEngine(tableNames) : new ShardingDatabaseBroadcastRoutingEngine();
        }
        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && tableNames.isEmpty() && shardingRule.hasDefaultDataSourceName()) {
            return new ShardingDefaultDatabaseRoutingEngine(tableNames);
        }
        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && shardingConditions.isAlwaysFalse() || tableNames.isEmpty() || !shardingRule.tableRuleExists(tableNames)) {
            return new ShardingUnicastRoutingEngine(tableNames);
        }
        return getShardingRoutingEngine(shardingRule, sqlStatementContext, shardingConditions, tableNames, properties);
    }

看下最常见的标准SQL的路由引擎
org.apache.shardingsphere.sharding.route.engine.type.standard.ShardingStandardRoutingEngine,这类中方法较多,从route看,分别通过调用getDataNodes->routeByShardingConditions->route0->generateRouteResult进行返回。

   /**
 * Sharding standard routing engine.
 */
@RequiredArgsConstructor
public final class ShardingStandardRoutingEngine implements ShardingRouteEngine {
…
public RouteResult route(final ShardingRule shardingRule) {
        if (isDMLForModify(sqlStatementContext) && 1 != ((TableAvailable) sqlStatementContext).getAllTables().size()) {// 判断SQL中涉及的表,insert、update、delete不支持多张表
            throw new ShardingSphereException("Cannot support Multiple-Table for '%s'.", sqlStatementContext.getSqlStatement());
        }
        return generateRouteResult(getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName)));
    }
    
    private boolean isDMLForModify(final SQLStatementContext sqlStatementContext) {
        return sqlStatementContext instanceof InsertStatementContext || sqlStatementContext instanceof UpdateStatementContext || sqlStatementContext instanceof DeleteStatementContext;
    }

    // 根据数据节点生成路由结果
    private RouteResult generateRouteResult(final Collection<DataNode> routedDataNodes) {
        RouteResult result = new RouteResult();
        result.getOriginalDataNodes().addAll(originalDataNodes);
        for (DataNode each : routedDataNodes) {
            result.getRouteUnits().add(
                    new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singletonList(new RouteMapper(logicTableName, each.getTableName()))));
        }
        return result;
    }

    // 计算数据节点,计算路由的核心方法
    private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
        if (isRoutingByHint(shardingRule, tableRule)) {// 库表路由都是hint方式,通过hint路由
            return routeByHint(shardingRule, tableRule);
        }
        if (isRoutingByShardingConditions(shardingRule, tableRule)) {// 库表路由都不是通过hint方式,则通过sharding条件进行路由
            return routeByShardingConditions(shardingRule, tableRule);
        }
        return routeByMixedConditions(shardingRule, tableRule);// 库表路由中既有hint,又有sharding条件,则进行混合路由
    }
   …
}

在以上调用链中,关键逻辑在route0方法中调用了routeDataSources和routeTables方法,这两方法实现如下:

    // 执行配置的库路由计算方法,得到路由到数据库标识
    private Collection<String> routeDataSources(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
        if (databaseShardingValues.isEmpty()) {
            return tableRule.getActualDatasourceNames();
        }
        Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties));
        Preconditions.checkState(!result.isEmpty(), "no database route info");
        Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result), 
                "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
        return result;
    }
    // 执行配置的表路由计算方法,得到实际表名
    private Collection<DataNode> routeTables(final ShardingRule shardingRule, final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
        Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
        Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues, this.properties));
        Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
        Collection<DataNode> result = new LinkedList<>();
        for (String each : routedTables) {
            result.add(new DataNode(routedDataSource, each));
        }
        return result;
    }

这两个方法中我们可以看到
shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties)与
shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues, this.properties);

shardingRule.getDatabaseShardingStrategy返回的就是ShardingStrategy接口,根据配置方式的不同,有多种实现类。

我们看下常用的org.apache.shardingsphere.core.strategy.route.standard.StandardShardingStrategy

    
    @Override
    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {
        RouteValue shardingValue = shardingValues.iterator().next();
        Collection<String> shardingResult = shardingValue instanceof ListRouteValue
                ? doSharding(availableTargetNames, (ListRouteValue) shardingValue) : doSharding(availableTargetNames, (RangeRouteValue) shardingValue);
        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        result.addAll(shardingResult);
        return result;
    }
    
    @SuppressWarnings("unchecked")
    private Collection<String> doSharding(final Collection<String> availableTargetNames, final RangeRouteValue<?> shardingValue) {
        if (null == rangeShardingAlgorithm) {
            throw new UnsupportedOperationException("Cannot find range sharding strategy in sharding rule.");
        }
        return rangeShardingAlgorithm.doSharding(availableTargetNames, 
                new RangeShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), shardingValue.getValueRange()));
    }
    
    @SuppressWarnings("unchecked")
    private Collection<String> doSharding(final Collection<String> availableTargetNames, final ListRouteValue<?> shardingValue) {
        Collection<String> result = new LinkedList<>();
        for (Comparable<?> each : shardingValue.getValues()) {
            String target = preciseShardingAlgorithm.doSharding(availableTargetNames, new PreciseShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), each));
            if (null != target) {
                result.add(target);
            }
        }
        return result;
    }

在第一个doSharding方法中可以看到,会根据SQL中实际的ShardingValue分别调用PreciseShardingAlgorithm和RangeShardingAlgorithm的doSharding,而这个两个算法类则是需要应用根据应用自定义实现的接口。

/**
 * Precise sharding algorithm.
 * 
 * @param <T> class type of sharding value
 */
public interface PreciseShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
    
    /**
     * Sharding.
     * 
     * @param availableTargetNames available data sources or tables's names
     * @param shardingValue sharding value
     * @return sharding result for data source or table's name
     */
    String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<T> shardingValue);
}
/**
 * Range sharding algorithm.
 * 
 * @param <T> class type of sharding value
 */
public interface RangeShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
    
    /**
     * Sharding.
     * 
     * @param availableTargetNames available data sources or tables's names
     * @param shardingValue sharding value
     * @return sharding results for data sources or tables's names
     */
    Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<T> shardingValue);
}
改写rewrite

看完路由部分,回到BasePrepareEngine#prepare方法中看看重写rewrite部分:

    public ExecutionContext prepare(final String sql, final List<Object> parameters) {
        List<Object> clonedParameters = cloneParameters(parameters);
        RouteContext routeContext = executeRoute(sql, clonedParameters);// SQL路由
        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));// SQL改写
        if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
            SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
        }
        return result;
}
…
private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
        registerRewriteDecorator();
        SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
        return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
    }

在executeRewrite方法中,rewriter是个SQLRewriteEntry对象,我们看下该类createSQLRewriteContext方法:

org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry

 public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {
        SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);
        decorate(decorators, result, routeContext);
        result.generateSQLTokens();
        return result;
    }

    @SuppressWarnings("unchecked")
    private void decorate(final Map<BaseRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
        for (Entry<BaseRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {
            BaseRule rule = entry.getKey();
            SQLRewriteContextDecorator decorator = entry.getValue();
            if (decorator instanceof RouteContextAware) {
                ((RouteContextAware) decorator).setRouteContext(routeContext);
            }
            decorator.decorate(rule, properties, sqlRewriteContext);
        }
}

该方法有三个操作:

  1. 创建一个初始的SQL重写上下文SQLRewriteContext对象;
  2. 依次调用注册的SQLRewriteContextDecorator实现类的decorate方法,根据配置指定的功能以及SQL的类型添加相应的SQLToken生成器。
  3. 调用SQLRewriteContext的generateSQLTokens方法,生成此SQL对应的Token,然后返回SQLRewriteContext对象。

首先我们看下SQLRewriteContext类:

    public SQLRewriteContext(final SchemaMetaData schemaMetaData, final SQLStatementContext sqlStatementContext, final String sql, final List<Object> parameters) {
        this.schemaMetaData = schemaMetaData;
        this.sqlStatementContext = sqlStatementContext;
        this.sql = sql;
        this.parameters = parameters;
        addSQLTokenGenerators(new DefaultTokenGeneratorBuilder().getSQLTokenGenerators());
        parameterBuilder = sqlStatementContext instanceof InsertStatementContext
                ? new GroupedParameterBuilder(((InsertStatementContext) sqlStatementContext).getGroupedParameters()) : new StandardParameterBuilder(parameters);
    }
    
    /**
     * Add SQL token generators.
     * 
     * @param sqlTokenGenerators SQL token generators
     */
    public void addSQLTokenGenerators(final Collection<SQLTokenGenerator> sqlTokenGenerators) {
        this.sqlTokenGenerators.addAll(sqlTokenGenerators);
    }
    
    /**
     * Generate SQL tokens.
     */
    public void generateSQLTokens() {
        sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));
    }

addSQLTokenGenerators方法负责添加Token生成器,generateSQLTokens方法根据添加的Token生成器生成Token,在SQLRewriteEntry类的decorate方法中,会根据注册的SQLRewriteContextDecorator,对于RouteContext进行处理,目前项目中已实现的SQLRewriteContextDecorate如图下所示:


分别对应数据分片SQL重写装饰器、影子表SQL重写装饰器、加密SQL重写装饰器。

我们看下最常用的数据分片SQL重写装饰器:
org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator

/**
 * SQL rewrite context decorator for sharding.
 */
@Setter
public final class ShardingSQLRewriteContextDecorator implements SQLRewriteContextDecorator<ShardingRule>, RouteContextAware {
    
    private RouteContext routeContext;
    
    @SuppressWarnings("unchecked")
    @Override
    public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {
        // 获取参数重写器(参数化SQL才需要),然后依次对SQL重写上下文中的参数构造器parameterBuilder进行重写操作,分片功能下主要是自增键以及分页参数
        for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {
            if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {
                each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
            }
        }
        //添加分片功能下对应的Token生成器
        sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());
    }…
}

在decorate方法中,首先通过ShardingParameterRewriterBuilder类生成了对应的ParameterRewriter对象集合,然后依次调用其rewrite方法,对sqlRewriteContext 的parameterBuilder进行相应的设置,在ShardingParameterRewriterBuilder中对应的ParameterRewriter包括ShardingGeneratedKeyInsertValueParameterRewriter与ShardingPaginationParameterRewriter,分别对应insert分布式自增参数重写和分页参数重写。具体代码如下:

org.apache.shardingsphere.sharding.rewrite.parameter.ShardingParameterRewriterBuilder

    public Collection<ParameterRewriter> getParameterRewriters(final SchemaMetaData schemaMetaData) {
        Collection<ParameterRewriter> result = getParameterRewriters();
        for (ParameterRewriter each : result) {
            setUpParameterRewriters(each, schemaMetaData);
        }
        return result;
    }
    
    private static Collection<ParameterRewriter> getParameterRewriters() {
        Collection<ParameterRewriter> result = new LinkedList<>();
        result.add(new ShardingGeneratedKeyInsertValueParameterRewriter());
        result.add(new ShardingPaginationParameterRewriter());
        return result;
    }

ShardingSQLRewriteContextDecorator的decorate方法中之后添加了ShardingTokenGenerateBuilder. getSQLTokenGenerators()返回的一系列TokenGenerator,接下来我们看下此类:

org.apache.shardingsphere.sharding.rewrite.token.pojo.ShardingTokenGenerateBuilder

/**
 * SQL token generator builder for sharding.
 */
@RequiredArgsConstructor
public final class ShardingTokenGenerateBuilder implements SQLTokenGeneratorBuilder {
    
    private final ShardingRule shardingRule;
    
    private final RouteContext routeContext;
    
    @Override
    public Collection<SQLTokenGenerator> getSQLTokenGenerators() {
        Collection<SQLTokenGenerator> result = buildSQLTokenGenerators();
        for (SQLTokenGenerator each : result) {
            if (each instanceof ShardingRuleAware) {
                ((ShardingRuleAware) each).setShardingRule(shardingRule);
            }
            if (each instanceof RouteContextAware) {
                ((RouteContextAware) each).setRouteContext(routeContext);
            }
        }
        return result;
    }
    
    private Collection<SQLTokenGenerator> buildSQLTokenGenerators() {
        Collection<SQLTokenGenerator> result = new LinkedList<>();
        addSQLTokenGenerator(result, new TableTokenGenerator());// 表名token处理,用于真实表名替换
        addSQLTokenGenerator(result, new DistinctProjectionPrefixTokenGenerator());// select distinct关键字处理
        addSQLTokenGenerator(result, new ProjectionsTokenGenerator());// select列名处理,主要是衍生列avg处理
        addSQLTokenGenerator(result, new OrderByTokenGenerator());// Order by Token处理
        addSQLTokenGenerator(result, new AggregationDistinctTokenGenerator());// 聚合函数的distinct关键字处理
        addSQLTokenGenerator(result, new IndexTokenGenerator());// 索引重命名
        addSQLTokenGenerator(result, new OffsetTokenGenerator());// offset 重写
        addSQLTokenGenerator(result, new RowCountTokenGenerator());// rowCount重写
        addSQLTokenGenerator(result, new GeneratedKeyInsertColumnTokenGenerator());// 分布式主键列添加,在insert sql列最后添加
        addSQLTokenGenerator(result, new GeneratedKeyForUseDefaultInsertColumnsTokenGenerator());// insert SQL使用默认列名时需要完成补齐真实列名,包括自增列
        addSQLTokenGenerator(result, new GeneratedKeyAssignmentTokenGenerator());// SET自增键生成
        addSQLTokenGenerator(result, new ShardingInsertValuesTokenGenerator());// insert SQL 的values Token解析,为后续添加自增值做准备
        addSQLTokenGenerator(result, new GeneratedKeyInsertValuesTokenGenerator());//为insert values添加自增列值
        return result;
    }
    
    private void addSQLTokenGenerator(final Collection<SQLTokenGenerator> sqlTokenGenerators, final SQLTokenGenerator toBeAddedSQLTokenGenerator) {
        if (toBeAddedSQLTokenGenerator instanceof IgnoreForSingleRoute && routeContext.getRouteResult().isSingleRouting()) {
            return;
        }
        sqlTokenGenerators.add(toBeAddedSQLTokenGenerator);
    }
}

ShardingTokenGenerateBuilder(名称可能拼错了,应为ShardingTokenGeneratorBuilder)为ShardingTokenGenerator的建造者类,可以看到其添加了TableTokenGenerator、DistinctProjectionPrefixTokenGenerator、ProjectionsTokenGenerator等很多TokenGenerator,这些Generator会生成对应的Token集合。

回到BasePrepareEngine中executeRewrite方法,在创建完SQLRewriteContext对象后调用rewrite,其中调用的是SQLRouteRewriteEngine#rewrite方法:

        private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
        registerRewriteDecorator();
        SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
        return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
    }
private Collection<ExecutionUnit> rewrite(final RouteContext routeContext, final SQLRewriteContext sqlRewriteContext) {
        Collection<ExecutionUnit> result = new LinkedHashSet<>();
        for (Entry<RouteUnit, SQLRewriteResult> entry : new SQLRouteRewriteEngine().rewrite(sqlRewriteContext, routeContext.getRouteResult()).entrySet()) {
            result.add(new ExecutionUnit(entry.getKey().getDataSourceMapper().getActualName(), new SQLUnit(entry.getValue().getSql(), entry.getValue().getParameters())));
        }
        return result;
    }

接下来我们看下SQL路由改写引擎org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine

public final class SQLRouteRewriteEngine {
    
    /**
     * Rewrite SQL and parameters.
     *
     * @param sqlRewriteContext SQL rewrite context
     * @param routeResult route result
     * @return SQL map of route unit and rewrite result
     */
    public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) {
        Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1);
        for (RouteUnit each : routeResult.getRouteUnits()) {
            result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each)));
        }
        return result;
}
…
}

在创建SQLRewriteResult时,调用了RouteSQLBuilder的toSQL方法,此类为SQLBuilder接口实现类,负责进行SQL的改写SQL的生成。


查看其toSQL方法,可以看到其调用SQLRewriteContext的getSqlTokens方法,获得其上绑定的所有SQL Token,然后调用其SQLToken的toString(),最后拼接出重写后的SQL。

public abstract class AbstractSQLBuilder implements SQLBuilder {
    
    private final SQLRewriteContext context;
    
    @Override
    public final String toSQL() {
        if (context.getSqlTokens().isEmpty()) {
            return context.getSql();
        }
        Collections.sort(context.getSqlTokens());// 按照Token的起始位置排序
        StringBuilder result = new StringBuilder();
        result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex()));// 添加第一个Token之前的原始SQL
        for (SQLToken each : context.getSqlTokens()) {
            result.append(getSQLTokenText(each));// 添加Token对应的SQL片段
            result.append(getConjunctionText(each));// 添加Token之间的连接字符
        }
        return result.toString();
    }
…
}

org.apache.shardingsphere.underlying.rewrite.sql.impl.RouteSQLBuilder

 protected String getSQLTokenText(final SQLToken sqlToken) {
        if (sqlToken instanceof RouteUnitAware) {
            return ((RouteUnitAware) sqlToken).toString(routeUnit);
        }
        return sqlToken.toString();
    }

回到BasePrepareEngine,完成SQL的重写后,就可以构建SQLUnit,进而构建出ExecutionUnit,然后添加到ExecutionContext,然后返回,完成PrepareEngine的prepare操作:

private Collection<ExecutionUnit> rewrite(final RouteContext routeContext, final SQLRewriteContext sqlRewriteContext) {
        Collection<ExecutionUnit> result = new LinkedHashSet<>();
        for (Entry<RouteUnit, SQLRewriteResult> entry : new SQLRouteRewriteEngine().rewrite(sqlRewriteContext, routeContext.getRouteResult()).entrySet()) {
            result.add(new ExecutionUnit(entry.getKey().getDataSourceMapper().getActualName(), new SQLUnit(entry.getValue().getSql(), entry.getValue().getParameters())));
        }
        return result;
    }

prepare方法中,生成executionContext后,将分布式自增key进行了添加。generatedKeyContext的生成的自增值是由ShardingRouteDecorator类中getShardingConditions方法中添加的InsertClauseShardingConditionEngine负责生成。

    private void prepare() {
        executionContext = prepareEngine.prepare(sql, getParameters());
        findGeneratedKey().ifPresent(generatedKey -> generatedValues.add(generatedKey.getGeneratedValues().getLast()));
}
   private Optional<GeneratedKeyContext> findGeneratedKey() {
        return executionContext.getSqlStatementContext() instanceof InsertStatementContext
                ? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext() : Optional.empty();
    }
执行引擎executor

回到ShardingPrepareStatement的executeQuery()方法,在前面执行完prepare,下面就是SQL执行以及结合集合并,SQL的执行主要是由PreparedStatementExecutor负责。

public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
            result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }

接下来看下initPreparedStatementExecutor()方法,首先进行了preparedStatementExecutor的初始化,然后设置了Statement的参数,之后对statement的一些方法进行回放设置。分别看下这几个方法。

回放是JDBC接入端的一个设计,WrapperAdapter类中有两个方法recordMethodInvocation和replayMethodsInvocation,前者记录调用的方法名称,例如setAutoCommit、setReadOnly、setFetchSize、setMaxFieldSize等,在外围程序调用ShardingConnection的setAutoCommit、setReadOnly以及ShardingPreparedStatement的setFetchSize、setMaxFieldSize时进行调用,后者则进行对这些方法的真实调用,分别在底层真实JDBC类(DB driver、数据库连接池等)时进行重新调用。

 private void initPreparedStatementExecutor() throws SQLException {
        preparedStatementExecutor.init(executionContext);
        setParametersForStatements();
        replayMethodForStatements();
    }

obtainExecuteGroups方法对执行单元进行分组,真正调用的是SQLExecutePrepareTemplate. getExecuteUnitGroups方法,
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor

    /**
     * Initialize executor.
     *
     * @param executionContext execution context
     * @throws SQLException SQL exception
     */
    public void init(final ExecutionContext executionContext) throws SQLException {
        setSqlStatementContext(executionContext.getSqlStatementContext());
        getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));
        cacheStatements();
}
…
      private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {

            @Override
            // 在指定数据源上创建要求数量的数据库连接
            public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }

            @Override
            //根据执行单元信息 创建Statement执行单元对象
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException {
                return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }…    
protected final void cacheStatements() {
        for (InputGroup<StatementExecuteUnit> each : inputGroups) {
            statements.addAll(each.getInputs().stream().map(StatementExecuteUnit::getStatement).collect(Collectors.toList()));
            parameterSets.addAll(each.getInputs().stream().map(input -> input.getExecutionUnit().getSqlUnit().getParameters()).collect(Collectors.toList()));
        }
    }

getExecuteUnitGroups方法首先按照数据库进行分组,然后根据SQL单元的数量与maxConnectionsSizePerQuery计算得出采用连接限制类型还是内存限制类型,最后生成SQL执行单元的最后分组。
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate

/**
     * Get execute unit groups.
     *
     * @param executionUnits execution units
     * @param callback SQL execute prepare callback
     * @return statement execute unit groups
     * @throws SQLException SQL exception
     */
    public Collection<InputGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        return getSynchronizedExecuteUnitGroups(executionUnits, callback);
    }

    // 生成同步执行单元分组
    private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
            final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);// 生成数据源与其SQLUnit的对应映射
        Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));// 将SQLUnit转化为InputGroup<StatementExecuteUnit>,对应关系为1:1
        }
        return result;
    }
…
    // 生成SQL执行分组
    private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
                                                                       final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
        List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
        List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); // 根据要执行的SQL数量和maxConnectionsSizePerQuery配置,计算
        int count = 0;
        for (List<SQLUnit> each : sqlUnitPartitions) {
            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));// 根据要执行的SQLUnit,生成对应StatementExecuteUnit对象,添加到返回结果集中
        }
        return result;
    }

返回ShardingPreparedStament的executeQuery(),接下来执行的语句

 MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());

看下PreparedStatementExecutor的executeQuery方法,可以看到该方法就是通过callback方式,最后通过SqlExecuteTemplate.execute
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor

    /**
     * Execute query.
     *
     * @return result set list
     * @throws SQLException SQL exception
     */
    public List<QueryResult> executeQuery() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
            
            @Override
            // 在指定的Statement上执行SQL,将JDBC结果集包装成查询QueryResult对象(基于流模式、基于内存模式两类)
            protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(statement, connectionMode);
            }
        };
        return executeCallback(executeCallback);// 通过executeCallback操作
    }
    // 执行SQL,然后将结果集转成QueryResult对象
    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
        PreparedStatement preparedStatement = (PreparedStatement) statement;
        ResultSet resultSet = preparedStatement.executeQuery();
        getResultSets().add(resultSet);
        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
    } 

org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor

protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
        List<T> result = sqlExecuteTemplate.execute((Collection) inputGroups, executeCallback);
        refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
        return result;
}
    public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups, final SQLExecuteCallback<T> callback) throws SQLException {
        return execute(inputGroups, null, callback);
    }
    
// org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteTemplate
    @SuppressWarnings("unchecked")
    public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups,
                               final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
        try {
            return executorEngine.execute((Collection) inputGroups, firstCallback, callback, serial);
        } catch (final SQLException ex) {
            ExecutorExceptionHandler.handleException(ex);
            return Collections.emptyList();
        }
    }

可以看到在SQLExecuteTemplate中,真正调用的是executorEngine.execute方法。
org.apache.shardingsphere.underlying.executor.engine.ExecutorEngine

   /**
     * Execute.
     *
     * @param inputGroups input groups
     * @param firstCallback first grouped callback
     * @param callback other grouped callback
     * @param serial whether using multi thread execute or not
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> execute(final Collection<InputGroup<I>> inputGroups, 
                                  final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback, final boolean serial) throws SQLException {
        if (inputGroups.isEmpty()) {
            return Collections.emptyList();
        }
        return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
}

// 串行执行
    private <I, O> List<O> serialExecute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback) throws SQLException {
        Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
        InputGroup<I> firstInputs = inputGroupsIterator.next();
        List<O> result = new LinkedList<>(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback));
        for (InputGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
            result.addAll(syncExecute(each, callback));
        }
        return result;
    }

    // 并行执行,可以支持两个回调函数,第一条记录执行第一个回调函数,其它的执行第二个回调函数
    private <I, O> List<O> parallelExecute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback) throws SQLException {
        Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
        InputGroup<I> firstInputs = inputGroupsIterator.next();
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(Lists.newArrayList(inputGroupsIterator), callback);
        return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
    }
…
}

其中分为串行执行和并行执行两种方式,同步就在当前线程中执行callback逻辑,异步就在线程池中执行。而callback即为PreparedStatementExecutor类中executeQuery()所定义。

public List<QueryResult> executeQuery() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
            
            @Override
            protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(statement, connectionMode);
            }
        };
        return executeCallback(executeCallback);
    }
归并Merge

回到ShardingPreparedStatement类中mergeQuery方法,主要逻辑即为创建MergeEngine对象,执行merge方法。

public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
…
            MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
…
        } 
…
    }
private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
        return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
    }

可以看到mergeQuery方法中新建了MergeEngine实例,然后调用了其merge方法。展开看下MergeEngine中merge方法:

org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine

public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        registerMergeDecorator();// 注册归并装饰器
        return merger.process(queryResults, sqlStatementContext);
}

    private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultMergerEngine) {
                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
                return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
            }
        }
        return Optional.empty();
    }
    
    private void registerMergeDecorator() {
        for (Class<? extends ResultProcessEngine> each : OrderedRegistry.getRegisteredClasses(ResultProcessEngine.class)) {
            ResultProcessEngine processEngine = createProcessEngine(each);
            Class<?> ruleClass = (Class<?>) processEngine.getType();
           
            rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
                    .forEach(rule -> merger.registerProcessEngine(rule, processEngine));
        }
    }

在注册的ResultProcessEngine实现类后,则直接调用merger.process方法即

org.apache.shardingsphere.underlying.merge.MergeEntry#process

/**
 * Merge entry.
 */
public final class MergeEntry {
…    
    /**
     * Process query results.
     *
     * @param queryResults query results
     * @param sqlStatementContext SQL statement context
     * @return merged result
     * @throws SQLException SQL exception
     */
    public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);
        Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
        return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
    }
    
    @SuppressWarnings("unchecked")
    private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultMergerEngine) {
                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
                return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
            }
        }
        return Optional.empty();
    }
    
    @SuppressWarnings("unchecked")
    private MergedResult decorate(final MergedResult mergedResult, final SQLStatementContext sqlStatementContext) throws SQLException {
        MergedResult result = null;
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultDecoratorEngine) {
                ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
                result = null == result ? resultDecorator.decorate(mergedResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
            }
        }
        return null == result ? mergedResult : result;
    }
    
    @SuppressWarnings("unchecked")
    private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
        MergedResult result = null;
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultDecoratorEngine) {
                ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
                result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
            }
        }
        return Optional.ofNullable(result);
    }
}

在MergeEntry#process方法中,先调用了本地私有方法merge,这个方法中顺序执行ResultMergerEngine实现类的newInstance方法创建ResultMerger实例,然后执行其merge方法。

ResultMergerEngine接口实现类目前只有一个ShardingResultMergerEngine,看下该类的newInstance方法代码:

org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine

public final class ShardingResultMergerEngine implements ResultMergerEngine<ShardingRule> {
    
    @Override
    public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {
        if (sqlStatementContext instanceof SelectStatementContext) {
            return new ShardingDQLResultMerger(databaseType);
        } 
        if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
            return new ShardingDALResultMerger(shardingRule);
        }
        return new TransparentResultMerger();
    }

可以看到newInstance方法中会根据当前SQL的类型创建对应的ResultMerger实例,以最常用的数据分片结果归并器ShardingDQLResultMerger为例,看下其处理逻辑

org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger

/**
 * DQL result merger for Sharding.
 */
public final class ShardingDQLResultMerger implements ResultMerger {
    
    private final DatabaseType databaseType;
    
public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        if (1 == queryResults.size()) {
            return new IteratorStreamMergedResult(queryResults);
        }
        Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
        SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
        selectStatementContext.setIndexes(columnLabelIndexMap);
        MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        return decorate(queryResults, selectStatementContext, mergedResult);
}
…
private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                               final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
        if (isNeedProcessGroupBy(selectStatementContext)) {
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessDistinctRow(selectStatementContext)) {
            setGroupByForDistinctRow(selectStatementContext);
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessOrderBy(selectStatementContext)) {
            return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
        }
        return new IteratorStreamMergedResult(queryResults);
    }
…

    private MergedResult getGroupByMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                                                final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
        return selectStatementContext.isSameGroupByAndOrderByItems()
                ? new GroupByStreamMergedResult(columnLabelIndexMap, queryResults, selectStatementContext, schemaMetaData)
                : new GroupByMemoryMergedResult(queryResults, selectStatementContext, schemaMetaData);
    }
 ...
}

在merge方法中会根据判断SQL中是否包含group by、distinct row和order by,然后构建GroupByStreamMergedResult或者GroupByMemoryMergedResult、OrderByStreamMergedResult,这些MergedResult对象实现了MergedResult接口,支持next和getValue等方法,在ShardingPrepareStatement的executeQuery()方法中,最后使用此MergedResult构建了ShardingResultSet返回。

org.apache.shardingsphere.underlying.merge.result.MergedResult

/**
 * Merged result after merge engine.
 */
public interface MergedResult {
    
    /**
     * Iterate next data.
     * 
     * @return has next data
     * @throws SQLException SQL Exception
     */
    boolean next() throws SQLException;
    
    /**
     * Get data value.
     *
     * @param columnIndex column index
     * @param type class type of data value
     * @return data value
     * @throws SQLException SQL Exception
     */
Object getValue(int columnIndex, Class<?> type) throws SQLException;

…
}
JDBC

org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet

/**
 * Result that support sharding.
 */
public final class ShardingResultSet extends AbstractResultSetAdapter {
    
    private final MergedResult mergeResultSet;
    
    private final Map<String, Integer> columnLabelAndIndexMap;
    
    …
    @Override
    public boolean next() throws SQLException {
        return mergeResultSet.next();
    }
    …
}

ShardingResultSet实现了JDBC的ResultSet接口,在应用拿到ResultSet时,调用其next()方法时,实际调用的是这些MergedResult实现类的next()方法。ShardingResultSet的类层次关系如下:



此粗略的将sharding-jdbc的处理流程浏览了一遍,虽然并没有深入各类的细节,但可以比较清晰的看到各个内核引擎的职责与边界,按照SQL处理的流向以及各引擎的输入输出,可以画一个整体的内核引擎处理流程图如下:


设计模式

在整个代码浏览中,我们可以看到ShardingSphere使用率很高的几个设计模式,熟悉这几个设计模式,可以更快的理解作者的设计意图,也是ShardingSphere主要的扩展点。

1. 工厂模式

在ShardingSphere中工厂类主要为简单工厂模式,覆盖了大部分模块,主要职责就是根据配置类型、数据库类型、SQL类型等创建对应的接口实现类。工厂模式将创建各种用途的接口类细节进行了封装,从而将处理流程更加通用。例如在使用sharding-jdbc时, ShardingSphere提供了不同功能对应的DataSource工厂类,根据提供的配置与连接池创建对应的DataSource实现类。

分库分表DataSource工厂类

/**
 * Sharding data source factory.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingDataSourceFactory {
    
    /**
     * Create sharding data source.
     *
     * @param dataSourceMap data source map
     * @param shardingRuleConfig rule configuration for databases and tables sharding
     * @param props properties for data source
     * @return sharding data source
     * @throws SQLException SQL exception
     */
    public static DataSource createDataSource(
            final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException {
        return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
    }
}

读写分离DataSource工厂类

/**
 * Master-slave data source factory.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MasterSlaveDataSourceFactory {
    
    /**
     * Create master-slave data source.
     *
     * @param dataSourceMap data source map
     * @param masterSlaveRuleConfig master-slave rule configuration
     * @param props props
     * @return master-slave data source
     * @throws SQLException SQL exception
     */
    public static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException {
        return new MasterSlaveDataSource(dataSourceMap, new MasterSlaveRule(masterSlaveRuleConfig), props);
    }
}
2. 建造者模式

与工厂类类似,建造者Builder模式也是创建型设计模式,常见的builder设计多为fluent风格,提供类似set、add等方法设置或者添加属性,最后通过build方法完成实例的创建,主要解决的需要设置多个关联属性,创建实例时需要统一校验的类创建。
不过ShardingSphere中的建造者类并没有这么设计,它的作用是一些*Builder接口的实现,其中方法返回需要创建的类对象,与上面工厂类区别是,其创建的往往是一些确定的类实例。

默认SQL构建器类

/**
 * Default SQL builder.
 */
public final class DefaultSQLBuilder extends AbstractSQLBuilder {
    
    public DefaultSQLBuilder(final SQLRewriteContext context) {
        super(context);
    }
    
    @Override
    protected String getSQLTokenText(final SQLToken sqlToken) {
        return sqlToken.toString();// 返回Token对应的文本字符
    }
}
3. Callback回调模式

执行引擎中的大量使用该设计模式,与Spring的JDBCTemplate、TransactionTemplate类似,ShardingSphere中的SQLExecuteTemplate、ExecutorEngine也是如此设计,引擎使用者提供CallBack实现类,使用该模式是因为在SQL执行时,需要支持更多类型的SQL,不同的SQL如DQL、DML、DDL、不带参数的SQL、参数化SQL等,不同的SQL操作逻辑并不一样,但执行引擎需要提供一个通用的执行策略。

  public final class SQLExecuteTemplate {
 …
    public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups,
                               final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
        try {
            return executorEngine.execute((Collection) inputGroups, firstCallback, callback, serial);
        } catch (final SQLException ex) {
            ExecutorExceptionHandler.handleException(ex);
            return Collections.emptyList();
        }
    }
public final class ExecutorEngine implements AutoCloseable {
…
public <I, O> List<O> execute(final Collection<InputGroup<I>> inputGroups, 
                                  final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback, final boolean serial) throws SQLException {
        if (inputGroups.isEmpty()) {
            return Collections.emptyList();
        }
        return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
}
…
}

注意虽然这些类叫Template,但与经典设计模式中的模板模式并不一样。*

4. 装饰器模式

装饰器decorate模式在路由引擎、改写引擎、归并引擎中进行了使用,也是5.x版本中实现pluggable的主要的设计模式,在4.1.1中也已经可以看到数据分片、读写分离、加密、影子表等都做为一个功能实现以装饰器方式进行了重构。装饰器模式可以实现功能组合,例如既要使用数据分片,又要使用加密和影子表功能,这些功能够可以看做对改写、路由的增强。

在DataNodeRouter类中

    private RouteContext executeRoute(final String sql, final List<Object> parameters, final boolean useCache) {
        RouteContext result = createRouteContext(sql, parameters, useCache);
        for (Entry<BaseRule, RouteDecorator> entry : decorators.entrySet()) {
            result = entry.getValue().decorate(result, metaData, entry.getKey(), properties);
        }
        return result;
    }

在SQLRewriteEntry类中

 private void decorate(final Map<BaseRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
        for (Entry<BaseRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {
            BaseRule rule = entry.getKey();
            SQLRewriteContextDecorator decorator = entry.getValue();
            if (decorator instanceof RouteContextAware) {
                ((RouteContextAware) decorator).setRouteContext(routeContext);
            }
            decorator.decorate(rule, properties, sqlRewriteContext);
        }
    }

在MergeEntry类中

 private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
        MergedResult result = null;
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultDecoratorEngine) {
                ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
                result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
            }
        }
        return Optional.ofNullable(result);
    }
5. 访问者模式

ShardingSphere在SQL解析中使用了大量的访问者模式,这种模式我们在一般项目中并不经常使用,不过在编译技术领域却几乎是标配,antlr、druid都使用了该种模式,开发者如果需要进行功能开发都会通过这种模式实现对AST的访问。antlr4基于g4文件生成java类时除了词法Lexer、语法Parser类也生成一个访问者基类,ShardingSphere各种数据库对应的Vistor都是继承自此类,在实现的子类中拿到SQL中对应的各种要素,进而构建出对应的SQLSelectStatement。

例如org.apache.shardingsphere.sql.parser.mysql.visitor.MySQLVisitor

/**
 * MySQL visitor.
 */
@Getter(AccessLevel.PROTECTED)
public abstract class MySQLVisitor extends MySQLStatementBaseVisitor<ASTNode> {
    
    private int currentParameterIndex;
    
    @Override
    public final ASTNode visitParameterMarker(final ParameterMarkerContext ctx) {
        return new ParameterMarkerValue(currentParameterIndex++);
}
    @Override
    public final ASTNode visitLiterals(final LiteralsContext ctx) {
        if (null != ctx.stringLiterals()) {
            return visit(ctx.stringLiterals());
        }
        if (null != ctx.numberLiterals()) {
            return visit(ctx.numberLiterals());
        }
        if (null != ctx.dateTimeLiterals()) {
            return visit(ctx.dateTimeLiterals());
        }
        if (null != ctx.hexadecimalLiterals()) {
            return visit(ctx.hexadecimalLiterals());
        }
        if (null != ctx.bitValueLiterals()) {
            return visit(ctx.bitValueLiterals());
        }
        if (null != ctx.booleanLiterals()) {
            return visit(ctx.booleanLiterals());
        }
        if (null != ctx.nullValueLiterals()) {
            return visit(ctx.nullValueLiterals());
        }
        throw new IllegalStateException("Literals must have string, number, dateTime, hex, bit, boolean or null.");
    }
    …
}

除了上述几个设计模式,还用到了解释器模式、单例模式、代理模式、策略模式等,出于篇幅关系这里就不一一介绍。

代码目录结构

我们在前面进行代码分析都是围绕SQL路由、改写、执行、归并这几个核心模块,这些也是ShardingSphere官方文档中所指的内核引擎,在4.1.1版本中代码位于是在sharding-core项目下。不过这里我们自己想一想什么是ShardingSphere的核心,这个问题相信ShardingSphere的主创者们有更深的认识,这里简单的说下我个人的理解、回答这个问题主要还是在于对ShardingSphere的定义,早期只有sharding-jdbc的时候,其实无所谓core,因为整个功能是明确的。但当sharding-proxy项目出来后,自然而然的想到的就是哪些功能、代码是jdbc和proxy可以复用的,除了接入方式不一样(一个是JDBC、一个是MySQL协议),SQL解析、路由、重写、执行都是可以复用的,因此这些就是成了core;再到后来,SQL解析器在经过druid->手写简化版->antlr的演化后,SQL解析现在变成了一个高扩展性、支持多种数据库方言的独立模块,因此将其进行剥离成了sql-parser, 与此类似,Proxy版本中MySQL报文协议的部分,后来增加了PostgreSQL,因此将其独立出来成了db-protocol部分。

在功能层面,前期只是分库分表、读写分离,后来增加了加解密、影子表等功能,这些功能又与原来core的代码柔合在一起,因此在5.x版本中,我们可以看到ShardingSphere作者们对其做了进一步的重构,将原来core的代码修改为infra模块,将路由、重写、合并、执行模块中涉及到具体功能的逻辑进行剥离,在infra中路由、重写、合并与执行就只剩下引擎以及装饰器,保留的就是执行算法与流程,数据分片sharding、读写分离master-slave、加密encrypt、影子表shadow都作为功能feature拆分到了feature项目中,实现自身对应的路由、重写、合并、执行等装饰器接口,这样的调整就使得ShardingSphere面向开发者提供了一个统一的接入、增强、定制的入口,同样ShardingSphere的原核心功能(分录分表、读写分离)也是这种方式集成,第三方开发者也是如此,这种一致的开发方式也更方便用户可以根据自身需要裁减ShardingSphere的各组件。

虽然目前5.x版本还未正式release,不过从github上master分支中代码目录结构中可以看出新的设计架构,feature项目下包含了分片、加密、主从、复制、影子表;infra项目下包含了原内核引擎(路由、重写、执行、归并);增加了控制面control-panel项目,将原来orchestration、metric、opentracing都迁移到了该项目下;将SQL解析与数据库报文协议都独立成为一个项目,这些更方便开发者有选择的ShardingSphere工具集;另外还增加了一个kernel项目,不过目前里面只有一些context类,目前还不知道这个项目的准确定位。

ShardingSphere 5.x目录结构

代码风格与质量

ShardingSpherede 经过几个版本的迭代重构,代码已不止于满足功能层面,在编写风格、目录结构、扩展方式等方面等进行了精心的设计。相比于当下很多开源项目,阅读其代码给人的感觉很清爽,面条式代码很少。
项目用了大量lombok的注解,例如@Getter、@Setter、@RequiredArgsConstructor、@Data等,通过这种方式,减少了很多getter、setter、构造函数、hashCode等代码,从而让代码更加简洁。另外4.x版本在对JDK版本升级到8后,将数据集合遍历从for each修改为了stream方式,使用了Optional方式减少了很多判空类似if(xx==null)这样的代码,也使得代码看起来更加优雅易读。

不过部分代码为了简洁,存在级联调用太长、方法参数太多问题,导致可读性比较低,这种情况我觉得还是通过添加部分中间变量更好些,这种情况在sharding-proxy项目中更加常见。

例如org.apache.shardingsphere.shardingproxy.frontend.netty.FrontendChannelInboundHandler#channelRead方法:

    public void channelRead(final ChannelHandlerContext context, final Object message) {
…        CommandExecutorSelector.getExecutor(databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection(), backendConnection.isSupportHint(),
                backendConnection.getTransactionType(), context.channel().id()).execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
    }
…
}

org.apache.shardingsphere.shardingproxy.frontend.mysql.command.query.binary.execute.MySQLComStmtExecuteExecutor#createQueryPacket方法

private Collection<DatabasePacket> createQueryPacket(final QueryResponse backendResponse) {
…
        for (QueryHeader each : queryHeader) {
            result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, each.getSchema(), each.getTable(), each.getTable(),
                    each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
        }
        result.add(new MySQLEofPacket(++currentSequenceId));
        return result;
    }

官方开发规范中对开发理念、编码规范有更详细的介绍https://shardingsphere.apache.org/community/cn/contribute/code-conduct/

4.1.1中代码设计的一些问题

引擎边界不够清晰,调用关系嵌套

在4.1.1版本中是JDBC层调用prepare引擎,prepare引擎(PrepareEngine.preapre方法)调用路由引擎(DataNodeRouter.route方法),路由引擎内部(route方法)又调用了解析引擎(ParseEngine.parse)进行SQL的解析,然后再由prepare引擎(PrepareEngine.preapre方法)中再调用改写引擎(RewriteEngine.rewrite)进行SQL的重写,JDBC层再调用执行引擎运行SQL,最后JDBC层再调用执行引擎完成结果集合并。这样的调用关系其实耦合了各内核引擎,更好的方式应该是在JDBC层或者Proxy层统一负责编排各引擎,分别调用SQL引擎、路由引擎、改写引擎、执行引擎、归并引擎,而不是在路由引擎内嵌套调用SQL解析。

这个问题在目前5.0.0-RC1-SNAPSHOT版本中已进行了重构优化,例如在ShardingSpherePreparedStatement(原ShardingPreparedStatement改名)类中,可以看到SQL解析迁移到了构造函数中,将路由、改写操作统一放到了createExecutionContext()方法中,另外将生成执行输入分组操作也从PreparedStatementExecutor中挪到了JDBC层。

/**
 * ShardingSphere prepared statement.
 */
public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {
…    private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
                                            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        this.connection = connection;
        schemaContexts = connection.getSchemaContexts();
        this.sql = sql;
        statements = new ArrayList<>();
        parameterSets = new ArrayList<>();
        sqlStatement = schemaContexts.getDefaultSchemaContext().getRuntimeContext().getSqlParserEngine().parse(sql, true);// 进行SQL解析
…
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            executionContext = createExecutionContext();// 创建执行上下文,完成SQL的路由与改写
            List<QueryResult> queryResults;
            if (ExecutorConstant.MANAGED_RESOURCE) {
                Collection<InputGroup<StatementExecuteUnit>> inputGroups = getInputGroups();
                cacheStatements(inputGroups);
                reply();
                queryResults = preparedStatementExecutor.executeQuery(inputGroups);// SQL执行
            } else {
                queryResults = rawExecutor.executeQuery(getRawInputGroups(), new RawSQLExecutorCallback());
            }
            MergedResult mergedResult = mergeQuery(queryResults);// 结果集归并
            result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
}
…
}

内核引擎命名不统一,使用方式不太一致

解析引擎:
  1. 通过SQLParserEngineFactory. getSQLParserEngine返回对应的数据库SQL解析器
  2. 然后调用SQLParserEngine.parse方法解析成SQLStatement。
    通过SQLParserExecutor. execute方法返回解析抽象语法树即ParseASTNode,execute方法中通过SQLParserFactory. newInstance创建出SQL解析器即SQLParser实现类,不同数据库有不同实现类。
路由引擎:
  1. 路由引擎并非直接创建RouteEngine实例,而是先要创建DataNodeRouter类实例,调用其 route方法,返回RouteContext对象,其中包含了路由后的结果。
  2. route方法中,依次调用各种RouteDecorator实现类,在具体实现类中创建了具体的RouteEngine执行route操作,例如在ShardingRouteDecorator中,通过ShardingRouteEngineFactory.newInstance创建了ShardingRouteEngine来实现真正的路由逻辑。
重写引擎:
  1. 通过SQLRewriteEntry类createSQLRewriteContext方法返回SQLRewriteContext;
  2. 然后调用SQLRewriteEngine或者SQLRouteRewriteEngine来完成路由;
  3. 在RewriteEngine中主要的逻辑其实只是通过SQLBuilder实现类,将SQLRewriteContext中的Token转换成SQL。
执行引擎:
  1. 创建PreparedStatementExecutor(Proxy方式为JDBCExecuteEngine)实例;
  2. 之后ExecutePrepareTemplate实例生成执行单元分组InputGroup<StatementExecuteUnit>(在5.x中此功能由ExecuteGroupEngine接口实现类完成);
  3. 最后创建SQLExecuteTemplate实例,其内部通过ExecutorEngine执行分组后的StatementExecuteUnit。
合并引擎:
  1. 创建MergeEngine,其内部会创建一个MergeEntry,其merge方法主要就是顺序调用ResultMergerEngine实现类,目前就是ShardingResultMergerEngine ,例如对于Select SQL,会创建ShardingDQLResultMerger,其merge方法会根据SelectStatementContext创建对应的stream/memory类型MergedResult;
  2. 最后有ResultDecoratorEngine实现类完成对MergedResult的再次处理。

可以看到这些引擎的使用方式各不相同,有的是由Entry类开始,有的是Engine,有的是Engine内部嵌套Entry类,这种不一致给开发者带来更高的心智负荷,这些设计问题在5.x正得到优化。

部分功能还缺乏扩展点

某些功能扩展还不够灵活,例如在事务这块,目前的ShardingSphere的事务类型只支持LOCAL、BASE、XA,并不支持SPI进行新增,之前项目中基于sharding-jdbc新增一个严格单库事务管理器时,只能修改TransactionType类源码。

好的一个开源项目应该是不断演变进步的。ShardingSphere 5.x版本的目前正在接近release,目标是pluggable,相信上述问题会通过更好的设计得到解决。另外ShardingSphere现在定位也不只是一个分库分表中间件,而是一个分布式数据库,未来后端除了关系数据库还会有其它存储方式,所以内核层面可能还会增加查询优化、执行计划、数据统计信息(与数据库类似)等概念,接入端可能还会支持更多的数据接入方式(前端增加REST、GraphQL、Redis等;后端增加Redis、MongoDB、HBase等)。

总览篇到此为止,在接下来的篇章里,会对各内核引擎进行更详细的源码分析。

相关文章

网友评论

    本文标题:从源码看ShardingSphere设计-总览篇

    本文链接:https://www.haomeiwen.com/subject/onfpjktx.html