从源码看ShardingSphere设计-总览篇

作者: 蚊子squirrel | 来源:发表于2020-08-28 21:51 被阅读0次

    如果是早期使用过ShardingSphere的开发者,大多都知道ShardingSphere源于sharding-jdbc。sharding-jdbc 1.x版本主要功能就是分库分表、读写分离、分布式主键;在2.x版本增加了orchestration,3.x版本中增加了sharding-proxy、opentracing等功能,4.x版本进入了apache基金会,增加了加解密、影子表、扩容等。除了这些功能层面,在代码上其实各个版本对代码都进行了大量的重构,例如早期SQL的解析使用的alibaba的druid,后来又实现了一个简化版的SQL解析器,再后来统一基于antlr重新改写;早期内核引擎还有优化引擎等,后面重构到了重写引擎了;正在开发的5.x目标是pluggable,对代码结构、包类名能都有较大调整,对开发者也更友好。作为一名开发者,我觉得ShardingSphere不止是一个数据库中间件,而是一个围绕SQL、DB的开发平台和工具集。同时其代码质量也很高,对其进行源码分析更是可以学到很多软件设计与开发的知识。

    本文不介绍ShardingSphere的具体功能,关于ShardingSphere功能,官网有更完整的介绍https://shardingsphere.apache.org/document/current/en/overview

    本系列基于的源码是当前最新release版本4.1.1,但涉及到差异较大的也会跟5.0.0-RC1-SNAPSHOT进行比较。4.1.1版本共有18个子项目,具体名称分类如下:


    sharding-sphere 4.1.1代码目录

    根据功能划分,本系列会包含多个章节篇幅:

    1. 总览
    2. 解析引擎
    3. 路由引擎
    4. 改写引擎
    5. 执行引擎
    6. 归并引擎
    7. 事务篇
    8. JDBC篇
    9. Proxy篇
    10. Ochestration篇
    11. 扩容篇
    12. 5.x代码变化

    本文为总览篇,会通过快速浏览下其执行流程,了解涉及到的类与方法的职责与定位,从而对ShardingSphere的内核形成一个整体认识,另外会总结下使用的设计模式、代码结构、代码风格,以及4.1.1版本中目前存在的问题。

    代码调用分析

    我们看源代码,需要一个入口,ShardingSphere中最成熟、使用率最高的莫过于sharding-jdbc,因此我们就从sharding-jdbc作为代码分析的切入点。从名字就可以看出sharding-jdbc支持JDBC,熟悉JDBC规范的开发者都知道其核心就是DataSource、Connection、Statement、PrepareStatement等接口,在sharding-jdbc中,这些接口的实现类分别对应ShardingDataSource、ShardingConnection、ShardingStatment、ShardingPreparedStatement类。接下来就从一条查询SQL出发,顺着方法的调用脉络看下这些类的代码:

    为了在代码分析过程中更好的定位在调用链所处位置,会在通过加标题来注明接下来代码所属于的功能范畴。

    JDBC

    org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource

    public class ShardingDataSource extends AbstractDataSourceAdapter {
            private final ShardingRuntimeContext runtimeContext;
    …
        @Override
        public final ShardingConnection getConnection() {
            return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
        }
    }
    

    org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection

    public final class ShardingConnection extends AbstractConnectionAdapter {
           …
        @Override
        public PreparedStatement prepareStatement(final String sql) throws SQLException {
            return new ShardingPreparedStatement(this, sql);
    }
    …
        @Override
        public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
            return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
        }
    …
    }
    

    org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingStatement

    /**
     * Statement that support sharding.
     */
    public final class ShardingStatement extends AbstractStatementAdapter {
    …
    @Override
        public ResultSet executeQuery(final String sql) throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
            ResultSet result;
            try {
                executionContext = prepare(sql);
                List<QueryResult> queryResults = statementExecutor.executeQuery();
                MergedResult mergedResult = mergeQuery(queryResults);
                result = new ShardingResultSet(statementExecutor.getResultSets(), mergedResult, this, executionContext);
            } finally {
                currentResultSet = null;
            }
            currentResultSet = result;
            return result;
    }
    …
    }
    

    org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement

     /**
     * PreparedStatement that support sharding.
     */
    public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
        …
     public ResultSet executeQuery() throws SQLException {
            ResultSet result;
            try {
                clearPrevious();
                prepare();
                initPreparedStatementExecutor();
                MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
                result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
            } finally {
                clearBatch();
            }
            currentResultSet = result;
            return result;
        }
    

    可以看到,真正进行sql解析、路由都在Statement实现类中。
    我们以ShardingPreparedStatement.executeQuery()为例,看看整个流程。
    clearPrevious()负责PreparedStatementExecutor的重置,因为一个Statement可以多次执行多个SQL,每次执行完SQL,PreparedStatementExecutor会记录真实的Statement,connection,该方法负责关闭statement,清理记录的参数、连接等。

    public void clear() throws SQLException {
            clearStatements();
            statements.clear();
            parameterSets.clear();
            connections.clear();
            resultSets.clear();
            inputGroups.clear();
        }
    

    executeQuery方法中剩余的代码则完成SQL的解析、路由、改写、执行与合并。

    SQL处理流程
    Prepare引擎

    接下来看下prepare,这个方法其实完成prepare引擎的执行和自增key的生成添加。

    private void prepare() {
            executionContext = prepareEngine.prepare(sql, getParameters());
            findGeneratedKey().ifPresent(generatedKey -> generatedValues.add(generatedKey.getGeneratedValues().getLast()));
        }
    

    prepare引擎并不在官方内核引擎范围,因为它完成的事其实就是调用解析引擎、路由引擎、改写引擎进行SQL的解析、路由操作,相当于这些内核引擎的编排执行,其对应的类图如下:
    其对应的类层次如下:


    看下prepare引擎的prepare方法
    org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine

    public ExecutionContext prepare(final String sql, final List<Object> parameters) {
            List<Object> clonedParameters = cloneParameters(parameters);
            RouteContext routeContext = executeRoute(sql, clonedParameters);// SQL路由
            ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
            result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));// SQL改写
            if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
                SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
            }
            return result;
    }
    …
    private RouteContext executeRoute(final String sql, final List<Object> clonedParameters) {
            registerRouteDecorator();
            return route(router, sql, clonedParameters);
        }
        
        private void registerRouteDecorator() {
            for (Class<? extends RouteDecorator> each : OrderedRegistry.getRegisteredClasses(RouteDecorator.class)) {
                RouteDecorator routeDecorator = createRouteDecorator(each);
                Class<?> ruleClass = (Class<?>) routeDecorator.getType();
                rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
                        .forEach(rule -> router.registerDecorator(rule, routeDecorator));
            }
        }
        …
        protected abstract RouteContext route(DataNodeRouter dataNodeRouter, String sql, List<Object> parameters);
    …
    }
    

    可以看到prepare方法中核心的就是路由(executeRoute)和改写(executeRewrite)操作,分别看下这两个方法:

    executeRoute方法实现了SQL的路由,包括registerRouteDecorator方法注册路由的装饰器(基于SPI的自定义扩展)以及route方法进行路由(计算应该运行在哪个库和操作哪张表),展开看看route的是怎么实现:

    路由route

    route方法由子类完成,即调用DataNodeRouter的route方法,看下BasePrepareEngine的子类PreparedQueryPrepareEngine。

    org.apache.shardingsphere.underlying.pluggble.prepare.PreparedQueryPrepareEngine

       @Override
        protected RouteContext route(final DataNodeRouter dataNodeRouter, final String sql, final List<Object> parameters) {
            return dataNodeRouter.route(sql, parameters, true);
        }
    

    继续看org.apache.shardingsphere.underlying.route.DataNodeRouter,这个类中完成了路由上下文RouteContext的生成。在executeRoute方法中有两个操作:

    1. 调用createRouteContext方法,该方法中调用解析引擎完成了SQL的解析,创建了一个初始的RouteContext实例;
    2. 依次调用注册的RouteDecorator实现类decorate方法,根据路由结果对RouteContext对象进行对应的操作。

    org.apache.shardingsphere.underlying.route.DataNodeRouter

    public final class DataNodeRouter {
    …
     public RouteContext route(final String sql, final List<Object> parameters, final boolean useCache) {
            routingHook.start(sql);
            try {
                RouteContext result = executeRoute(sql, parameters, useCache);//进行路由计算,生成路由结果
                routingHook.finishSuccess(result, metaData.getSchema());
                return result;
            } catch (final Exception ex) {
                routingHook.finishFailure(ex);
                throw ex;
            }
        }
        
        @SuppressWarnings("unchecked")
        private RouteContext executeRoute(final String sql, final List<Object> parameters, final boolean useCache) {
            RouteContext result = createRouteContext(sql, parameters, useCache);
            for (Entry<BaseRule, RouteDecorator> entry : decorators.entrySet()) {
                result = entry.getValue().decorate(result, metaData, entry.getKey(), properties);
            }
            return result;
        }
        
        private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
            SQLStatement sqlStatement = parserEngine.parse(sql, useCache);//解析SQL,生成SQL对应AST
            try {
                SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);// 生成SQL Statement上下文,相当于一部分语义分析
                return new RouteContext(sqlStatementContext, parameters, new RouteResult());
            } catch (final IndexOutOfBoundsException ex) {
                return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());
            }
        }
    }
    
    SQL解析

    可以看到在createRouteContext 方法,完成了两个操作:

    1. 调用parserEngine.parse方法,即通过SQL解析引擎对SQL进行解析,进而转化成AST,即SQLStatement接口实现类;
    2. 通过SQLStatementContextFactory方法将SQLStatement实例转化为SQLStatementContext实例。

    SQLStatement的类层级关系如下:


    目前sharingsphere4.1.1中SQLStatement的实现类有83个,上图中为了展示方面只包含了主要几个。

    这里涉及到了ShardingSphere的一个内核引擎:SQL解析引擎,这里我们进入SQLParserEngine类,看下SQL解析引擎的实现。
    org.apache.shardingsphere.sql.parser.SQLParserEngine

    public final class SQLParserEngine {
        private final String databaseTypeName;
    private final SQLParseResultCache cache = new SQLParseResultCache(); 
    …   
        /**
         * Parse SQL.
         *
         * @param sql SQL
         * @param useCache use cache or not
         * @return SQL statement
         */
           public SQLStatement parse(final String sql, final boolean useCache) {
            ParsingHook parsingHook = new SPIParsingHook();// SQL解析hook
            parsingHook.start(sql);
            try {
                SQLStatement result = parse0(sql, useCache);
                parsingHook.finishSuccess(result);
                return result;
                // CHECKSTYLE:OFF
            } catch (final Exception ex) {
                // CHECKSTYLE:ON
                parsingHook.finishFailure(ex);
                throw ex;
            }
    }
        
     private SQLStatement parse0(final String sql, final boolean useCache) {
            if (useCache) {
                Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
                if (cachedSQLStatement.isPresent()) {// 如果缓存中有该SQL的解析结果,则直接复用
                    return cachedSQLStatement.get();
                }
            }
            ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();// 1. 解析SQL生成AST,ParseTree是antlr对应的解析树接口
            SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);//2. 通过访问者模式,将antlr的解析树转化为SQLStatement
            if (useCache) {
                cache.put(sql, result);
            }
            return result;
        }
    

    可以看到其内部通过调用SQLParserExecutor.execute()方法生成的解析树ParseASTNode,然后通过ParseTreeVisitor访问解析树,生成SQLStatement,看下SQLParserExecutor和ParseTreeVisitorFactory类

    org.apache.shardingsphere.sql.parser.core.parser.SQLParserExecutor

    public final class SQLParserExecutor {
        
        private final String databaseTypeName;
        
        private final String sql;
        
        /**
         * Execute to parse SQL.
         *
         * @return AST node
         */
        public ParseASTNode execute() {
            ParseASTNode result = towPhaseParse();
            if (result.getRootNode() instanceof ErrorNode) {
                throw new SQLParsingException(String.format("Unsupported SQL of `%s`", sql));
            }
            return result;
        }
        
        private ParseASTNode towPhaseParse() {//名称可能拼错了,可能是two towPhaseParse?
            SQLParser sqlParser = SQLParserFactory.newInstance(databaseTypeName, sql);//创建该类型数据库对应的SQL解析器
            try {
                ((Parser) sqlParser).setErrorHandler(new BailErrorStrategy());
                ((Parser) sqlParser).getInterpreter().setPredictionMode(PredictionMode.SLL);
                return (ParseASTNode) sqlParser.parse();
            } catch (final ParseCancellationException ex) {
                ((Parser) sqlParser).reset();
                ((Parser) sqlParser).setErrorHandler(new DefaultErrorStrategy());
                ((Parser) sqlParser).getInterpreter().setPredictionMode(PredictionMode.LL);
                return (ParseASTNode) sqlParser.parse();
            }
        }
    

    org.apache.shardingsphere.sql.parser.core.visitor.ParseTreeVisitorFactory

    /**
     * Parse tree visitor factory.
     */
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class ParseTreeVisitorFactory {
        
        /** 
         * New instance of SQL visitor.
         * 
         * @param databaseTypeName name of database type
         * @param visitorRule visitor rule
         * @return parse tree visitor
         */
        public static ParseTreeVisitor newInstance(final String databaseTypeName, final VisitorRule visitorRule) {
            for (SQLParserConfiguration each : NewInstanceServiceLoader.newServiceInstances(SQLParserConfiguration.class)) {
                if (each.getDatabaseTypeName().equals(databaseTypeName)) {
                    return createParseTreeVisitor(each, visitorRule.getType());
                }
            }
            throw new UnsupportedOperationException(String.format("Cannot support database type '%s'", databaseTypeName));
        }
        
        @SneakyThrows
        private static ParseTreeVisitor createParseTreeVisitor(final SQLParserConfiguration configuration, final SQLStatementType type) {
            SQLVisitorFacade visitorFacade = configuration.getVisitorFacadeClass().getConstructor().newInstance();
            switch (type) {
                case DML:
                    return (ParseTreeVisitor) visitorFacade.getDMLVisitorClass().getConstructor().newInstance();
                case DDL:
                    return (ParseTreeVisitor) visitorFacade.getDDLVisitorClass().getConstructor().newInstance();
                case TCL:
                    return (ParseTreeVisitor) visitorFacade.getTCLVisitorClass().getConstructor().newInstance();
                case DCL:
                    return (ParseTreeVisitor) visitorFacade.getDCLVisitorClass().getConstructor().newInstance();
                case DAL:
                    return (ParseTreeVisitor) visitorFacade.getDALVisitorClass().getConstructor().newInstance();
                case RL:
                    return (ParseTreeVisitor) visitorFacade.getRLVisitorClass().getConstructor().newInstance();
                default:
                    throw new SQLParsingException("Can not support SQL statement type: `%s`", type);
            }
        }
    }
    

    SQLParserExecutor和ParseTreeVisitorFactory已经开始使用了antlr的类,这里我们先跳过,在后面的SQL解析篇章中再详细分析。

    回到DataNodeRouter类createRouteContext方法,在获得SQL对应的AST(SQLStatement)对象后,通过SQLStatementContextFactory将SQLStatement转换为SQLStatementContext对象。
    SQLStatementContext根据不同的SQL类型,有多种实现类,以Select类型为例org.apache.shardingsphere.sql.parser.binder.statement.dml.SelectStatementContext,其类结构如下:


    可以看到其包含了Select SQL中包含的table、project、groupby、orderby、分页等上下文信息。
    路由route

    到此并未看到分库分表或者主从时真正的路由逻辑,其实这些操作都放到了这些RouteDecorator,看下RouterDecorator接口的实现类。


    我们看下分库分表功能对应的路由修饰器类ShardingRouteDecorator类。
    org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator

     public final class ShardingRouteDecorator implements RouteDecorator<ShardingRule> {
    
     @Override
        public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
            SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
            List<Object> parameters = routeContext.getParameters();
            // 对SQL进行验证,主要用于判断一些不支持的SQL,在分片功能中不支持INSERT INTO .... ON DUPLICATE KEY、不支持更新sharding key
            ShardingStatementValidatorFactory.newInstance(
                    sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
            // 获取SQL的条件信息
            ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
            boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
            if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
                // 检查所有Sharding值(表、列、值)是不是相同,如果不相同则抛出异常
                checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
                //剔除重复的sharding条件信息
                mergeShardingConditions(shardingConditions);
            }
            // 创建分片路由引擎
            ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
            // 进行路由,生成路由结果
            RouteResult routeResult = shardingRouteEngine.route(shardingRule);
            if (needMergeShardingValues) {
                Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
            }
            return new RouteContext(sqlStatementContext, parameters, routeResult);
    }
    …
    }
    

    其中最关键的以下代码:

     ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
            RouteResult routeResult = shardingRouteEngine.route(shardingRule);
    …
      return new RouteContext(sqlStatementContext, parameters, routeResult);
    

    通过ShardingRouteEngineFactory创建了Sharding路由引擎,调用route方法,然后根据生成的RouteResult,创建新的RouteContext进行了返回。

    ShardingRouteEngineFactory是ShardingRouteEngine的工厂类,会根据SQL类型创建不同ShardingRouteEngine,因为不同的类型的SQL对应着的不同的路由策略,例如全库路由、全库表路由、单库路由、标准路由等。
    org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngineFactory

    /**
     * Sharding routing engine factory.
     */
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class ShardingRouteEngineFactory {
    
            /**
             * Create new instance of routing engine.
             *
             * @param shardingRule sharding rule
             * @param metaData meta data of ShardingSphere
             * @param sqlStatementContext SQL statement context
             * @param shardingConditions shardingConditions
             * @param properties sharding sphere properties
             * @return new instance of routing engine
             */
    public static ShardingRouteEngine newInstance(final ShardingRule shardingRule,
                                                      final ShardingSphereMetaData metaData, final SQLStatementContext sqlStatementContext,
                                                      final ShardingConditions shardingConditions, final ConfigurationProperties properties) {
            SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
            Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
            if (sqlStatement instanceof TCLStatement) {
                return new ShardingDatabaseBroadcastRoutingEngine();
            }
            if (sqlStatement instanceof DDLStatement) {
                return new ShardingTableBroadcastRoutingEngine(metaData.getSchema(), sqlStatementContext);
            }
            if (sqlStatement instanceof DALStatement) {
                return getDALRoutingEngine(shardingRule, sqlStatement, tableNames);
            }
            if (sqlStatement instanceof DCLStatement) {
                return getDCLRoutingEngine(sqlStatementContext, metaData);
            }
            if (shardingRule.isAllInDefaultDataSource(tableNames)) {
                return new ShardingDefaultDatabaseRoutingEngine(tableNames);
            }
            if (shardingRule.isAllBroadcastTables(tableNames)) {
                return sqlStatement instanceof SelectStatement ? new ShardingUnicastRoutingEngine(tableNames) : new ShardingDatabaseBroadcastRoutingEngine();
            }
            if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && tableNames.isEmpty() && shardingRule.hasDefaultDataSourceName()) {
                return new ShardingDefaultDatabaseRoutingEngine(tableNames);
            }
            if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && shardingConditions.isAlwaysFalse() || tableNames.isEmpty() || !shardingRule.tableRuleExists(tableNames)) {
                return new ShardingUnicastRoutingEngine(tableNames);
            }
            return getShardingRoutingEngine(shardingRule, sqlStatementContext, shardingConditions, tableNames, properties);
        }
    

    看下最常见的标准SQL的路由引擎
    org.apache.shardingsphere.sharding.route.engine.type.standard.ShardingStandardRoutingEngine,这类中方法较多,从route看,分别通过调用getDataNodes->routeByShardingConditions->route0->generateRouteResult进行返回。

       /**
     * Sharding standard routing engine.
     */
    @RequiredArgsConstructor
    public final class ShardingStandardRoutingEngine implements ShardingRouteEngine {
    …
    public RouteResult route(final ShardingRule shardingRule) {
            if (isDMLForModify(sqlStatementContext) && 1 != ((TableAvailable) sqlStatementContext).getAllTables().size()) {// 判断SQL中涉及的表,insert、update、delete不支持多张表
                throw new ShardingSphereException("Cannot support Multiple-Table for '%s'.", sqlStatementContext.getSqlStatement());
            }
            return generateRouteResult(getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName)));
        }
        
        private boolean isDMLForModify(final SQLStatementContext sqlStatementContext) {
            return sqlStatementContext instanceof InsertStatementContext || sqlStatementContext instanceof UpdateStatementContext || sqlStatementContext instanceof DeleteStatementContext;
        }
    
        // 根据数据节点生成路由结果
        private RouteResult generateRouteResult(final Collection<DataNode> routedDataNodes) {
            RouteResult result = new RouteResult();
            result.getOriginalDataNodes().addAll(originalDataNodes);
            for (DataNode each : routedDataNodes) {
                result.getRouteUnits().add(
                        new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singletonList(new RouteMapper(logicTableName, each.getTableName()))));
            }
            return result;
        }
    
        // 计算数据节点,计算路由的核心方法
        private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
            if (isRoutingByHint(shardingRule, tableRule)) {// 库表路由都是hint方式,通过hint路由
                return routeByHint(shardingRule, tableRule);
            }
            if (isRoutingByShardingConditions(shardingRule, tableRule)) {// 库表路由都不是通过hint方式,则通过sharding条件进行路由
                return routeByShardingConditions(shardingRule, tableRule);
            }
            return routeByMixedConditions(shardingRule, tableRule);// 库表路由中既有hint,又有sharding条件,则进行混合路由
        }
       …
    }
    

    在以上调用链中,关键逻辑在route0方法中调用了routeDataSources和routeTables方法,这两方法实现如下:

        // 执行配置的库路由计算方法,得到路由到数据库标识
        private Collection<String> routeDataSources(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
            if (databaseShardingValues.isEmpty()) {
                return tableRule.getActualDatasourceNames();
            }
            Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties));
            Preconditions.checkState(!result.isEmpty(), "no database route info");
            Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result), 
                    "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
            return result;
        }
        // 执行配置的表路由计算方法,得到实际表名
        private Collection<DataNode> routeTables(final ShardingRule shardingRule, final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
            Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
            Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                    : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues, this.properties));
            Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
            Collection<DataNode> result = new LinkedList<>();
            for (String each : routedTables) {
                result.add(new DataNode(routedDataSource, each));
            }
            return result;
        }
    

    这两个方法中我们可以看到
    shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties)与
    shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues, this.properties);

    shardingRule.getDatabaseShardingStrategy返回的就是ShardingStrategy接口,根据配置方式的不同,有多种实现类。

    我们看下常用的org.apache.shardingsphere.core.strategy.route.standard.StandardShardingStrategy

        
        @Override
        public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {
            RouteValue shardingValue = shardingValues.iterator().next();
            Collection<String> shardingResult = shardingValue instanceof ListRouteValue
                    ? doSharding(availableTargetNames, (ListRouteValue) shardingValue) : doSharding(availableTargetNames, (RangeRouteValue) shardingValue);
            Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
            result.addAll(shardingResult);
            return result;
        }
        
        @SuppressWarnings("unchecked")
        private Collection<String> doSharding(final Collection<String> availableTargetNames, final RangeRouteValue<?> shardingValue) {
            if (null == rangeShardingAlgorithm) {
                throw new UnsupportedOperationException("Cannot find range sharding strategy in sharding rule.");
            }
            return rangeShardingAlgorithm.doSharding(availableTargetNames, 
                    new RangeShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), shardingValue.getValueRange()));
        }
        
        @SuppressWarnings("unchecked")
        private Collection<String> doSharding(final Collection<String> availableTargetNames, final ListRouteValue<?> shardingValue) {
            Collection<String> result = new LinkedList<>();
            for (Comparable<?> each : shardingValue.getValues()) {
                String target = preciseShardingAlgorithm.doSharding(availableTargetNames, new PreciseShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), each));
                if (null != target) {
                    result.add(target);
                }
            }
            return result;
        }
    

    在第一个doSharding方法中可以看到,会根据SQL中实际的ShardingValue分别调用PreciseShardingAlgorithm和RangeShardingAlgorithm的doSharding,而这个两个算法类则是需要应用根据应用自定义实现的接口。

    /**
     * Precise sharding algorithm.
     * 
     * @param <T> class type of sharding value
     */
    public interface PreciseShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
        
        /**
         * Sharding.
         * 
         * @param availableTargetNames available data sources or tables's names
         * @param shardingValue sharding value
         * @return sharding result for data source or table's name
         */
        String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<T> shardingValue);
    }
    
    /**
     * Range sharding algorithm.
     * 
     * @param <T> class type of sharding value
     */
    public interface RangeShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
        
        /**
         * Sharding.
         * 
         * @param availableTargetNames available data sources or tables's names
         * @param shardingValue sharding value
         * @return sharding results for data sources or tables's names
         */
        Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<T> shardingValue);
    }
    
    改写rewrite

    看完路由部分,回到BasePrepareEngine#prepare方法中看看重写rewrite部分:

        public ExecutionContext prepare(final String sql, final List<Object> parameters) {
            List<Object> clonedParameters = cloneParameters(parameters);
            RouteContext routeContext = executeRoute(sql, clonedParameters);// SQL路由
            ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
            result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));// SQL改写
            if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
                SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
            }
            return result;
    }
    …
    private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
            registerRewriteDecorator();
            SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
            return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
        }
    

    在executeRewrite方法中,rewriter是个SQLRewriteEntry对象,我们看下该类createSQLRewriteContext方法:

    org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry

     public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {
            SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);
            decorate(decorators, result, routeContext);
            result.generateSQLTokens();
            return result;
        }
    
        @SuppressWarnings("unchecked")
        private void decorate(final Map<BaseRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
            for (Entry<BaseRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {
                BaseRule rule = entry.getKey();
                SQLRewriteContextDecorator decorator = entry.getValue();
                if (decorator instanceof RouteContextAware) {
                    ((RouteContextAware) decorator).setRouteContext(routeContext);
                }
                decorator.decorate(rule, properties, sqlRewriteContext);
            }
    }
    

    该方法有三个操作:

    1. 创建一个初始的SQL重写上下文SQLRewriteContext对象;
    2. 依次调用注册的SQLRewriteContextDecorator实现类的decorate方法,根据配置指定的功能以及SQL的类型添加相应的SQLToken生成器。
    3. 调用SQLRewriteContext的generateSQLTokens方法,生成此SQL对应的Token,然后返回SQLRewriteContext对象。

    首先我们看下SQLRewriteContext类:

        public SQLRewriteContext(final SchemaMetaData schemaMetaData, final SQLStatementContext sqlStatementContext, final String sql, final List<Object> parameters) {
            this.schemaMetaData = schemaMetaData;
            this.sqlStatementContext = sqlStatementContext;
            this.sql = sql;
            this.parameters = parameters;
            addSQLTokenGenerators(new DefaultTokenGeneratorBuilder().getSQLTokenGenerators());
            parameterBuilder = sqlStatementContext instanceof InsertStatementContext
                    ? new GroupedParameterBuilder(((InsertStatementContext) sqlStatementContext).getGroupedParameters()) : new StandardParameterBuilder(parameters);
        }
        
        /**
         * Add SQL token generators.
         * 
         * @param sqlTokenGenerators SQL token generators
         */
        public void addSQLTokenGenerators(final Collection<SQLTokenGenerator> sqlTokenGenerators) {
            this.sqlTokenGenerators.addAll(sqlTokenGenerators);
        }
        
        /**
         * Generate SQL tokens.
         */
        public void generateSQLTokens() {
            sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));
        }
    

    addSQLTokenGenerators方法负责添加Token生成器,generateSQLTokens方法根据添加的Token生成器生成Token,在SQLRewriteEntry类的decorate方法中,会根据注册的SQLRewriteContextDecorator,对于RouteContext进行处理,目前项目中已实现的SQLRewriteContextDecorate如图下所示:


    分别对应数据分片SQL重写装饰器、影子表SQL重写装饰器、加密SQL重写装饰器。

    我们看下最常用的数据分片SQL重写装饰器:
    org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator

    /**
     * SQL rewrite context decorator for sharding.
     */
    @Setter
    public final class ShardingSQLRewriteContextDecorator implements SQLRewriteContextDecorator<ShardingRule>, RouteContextAware {
        
        private RouteContext routeContext;
        
        @SuppressWarnings("unchecked")
        @Override
        public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {
            // 获取参数重写器(参数化SQL才需要),然后依次对SQL重写上下文中的参数构造器parameterBuilder进行重写操作,分片功能下主要是自增键以及分页参数
            for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {
                if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {
                    each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
                }
            }
            //添加分片功能下对应的Token生成器
            sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());
        }…
    }
    

    在decorate方法中,首先通过ShardingParameterRewriterBuilder类生成了对应的ParameterRewriter对象集合,然后依次调用其rewrite方法,对sqlRewriteContext 的parameterBuilder进行相应的设置,在ShardingParameterRewriterBuilder中对应的ParameterRewriter包括ShardingGeneratedKeyInsertValueParameterRewriter与ShardingPaginationParameterRewriter,分别对应insert分布式自增参数重写和分页参数重写。具体代码如下:

    org.apache.shardingsphere.sharding.rewrite.parameter.ShardingParameterRewriterBuilder

        public Collection<ParameterRewriter> getParameterRewriters(final SchemaMetaData schemaMetaData) {
            Collection<ParameterRewriter> result = getParameterRewriters();
            for (ParameterRewriter each : result) {
                setUpParameterRewriters(each, schemaMetaData);
            }
            return result;
        }
        
        private static Collection<ParameterRewriter> getParameterRewriters() {
            Collection<ParameterRewriter> result = new LinkedList<>();
            result.add(new ShardingGeneratedKeyInsertValueParameterRewriter());
            result.add(new ShardingPaginationParameterRewriter());
            return result;
        }
    

    ShardingSQLRewriteContextDecorator的decorate方法中之后添加了ShardingTokenGenerateBuilder. getSQLTokenGenerators()返回的一系列TokenGenerator,接下来我们看下此类:

    org.apache.shardingsphere.sharding.rewrite.token.pojo.ShardingTokenGenerateBuilder

    /**
     * SQL token generator builder for sharding.
     */
    @RequiredArgsConstructor
    public final class ShardingTokenGenerateBuilder implements SQLTokenGeneratorBuilder {
        
        private final ShardingRule shardingRule;
        
        private final RouteContext routeContext;
        
        @Override
        public Collection<SQLTokenGenerator> getSQLTokenGenerators() {
            Collection<SQLTokenGenerator> result = buildSQLTokenGenerators();
            for (SQLTokenGenerator each : result) {
                if (each instanceof ShardingRuleAware) {
                    ((ShardingRuleAware) each).setShardingRule(shardingRule);
                }
                if (each instanceof RouteContextAware) {
                    ((RouteContextAware) each).setRouteContext(routeContext);
                }
            }
            return result;
        }
        
        private Collection<SQLTokenGenerator> buildSQLTokenGenerators() {
            Collection<SQLTokenGenerator> result = new LinkedList<>();
            addSQLTokenGenerator(result, new TableTokenGenerator());// 表名token处理,用于真实表名替换
            addSQLTokenGenerator(result, new DistinctProjectionPrefixTokenGenerator());// select distinct关键字处理
            addSQLTokenGenerator(result, new ProjectionsTokenGenerator());// select列名处理,主要是衍生列avg处理
            addSQLTokenGenerator(result, new OrderByTokenGenerator());// Order by Token处理
            addSQLTokenGenerator(result, new AggregationDistinctTokenGenerator());// 聚合函数的distinct关键字处理
            addSQLTokenGenerator(result, new IndexTokenGenerator());// 索引重命名
            addSQLTokenGenerator(result, new OffsetTokenGenerator());// offset 重写
            addSQLTokenGenerator(result, new RowCountTokenGenerator());// rowCount重写
            addSQLTokenGenerator(result, new GeneratedKeyInsertColumnTokenGenerator());// 分布式主键列添加,在insert sql列最后添加
            addSQLTokenGenerator(result, new GeneratedKeyForUseDefaultInsertColumnsTokenGenerator());// insert SQL使用默认列名时需要完成补齐真实列名,包括自增列
            addSQLTokenGenerator(result, new GeneratedKeyAssignmentTokenGenerator());// SET自增键生成
            addSQLTokenGenerator(result, new ShardingInsertValuesTokenGenerator());// insert SQL 的values Token解析,为后续添加自增值做准备
            addSQLTokenGenerator(result, new GeneratedKeyInsertValuesTokenGenerator());//为insert values添加自增列值
            return result;
        }
        
        private void addSQLTokenGenerator(final Collection<SQLTokenGenerator> sqlTokenGenerators, final SQLTokenGenerator toBeAddedSQLTokenGenerator) {
            if (toBeAddedSQLTokenGenerator instanceof IgnoreForSingleRoute && routeContext.getRouteResult().isSingleRouting()) {
                return;
            }
            sqlTokenGenerators.add(toBeAddedSQLTokenGenerator);
        }
    }
    

    ShardingTokenGenerateBuilder(名称可能拼错了,应为ShardingTokenGeneratorBuilder)为ShardingTokenGenerator的建造者类,可以看到其添加了TableTokenGenerator、DistinctProjectionPrefixTokenGenerator、ProjectionsTokenGenerator等很多TokenGenerator,这些Generator会生成对应的Token集合。

    回到BasePrepareEngine中executeRewrite方法,在创建完SQLRewriteContext对象后调用rewrite,其中调用的是SQLRouteRewriteEngine#rewrite方法:

            private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
            registerRewriteDecorator();
            SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
            return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
        }
    private Collection<ExecutionUnit> rewrite(final RouteContext routeContext, final SQLRewriteContext sqlRewriteContext) {
            Collection<ExecutionUnit> result = new LinkedHashSet<>();
            for (Entry<RouteUnit, SQLRewriteResult> entry : new SQLRouteRewriteEngine().rewrite(sqlRewriteContext, routeContext.getRouteResult()).entrySet()) {
                result.add(new ExecutionUnit(entry.getKey().getDataSourceMapper().getActualName(), new SQLUnit(entry.getValue().getSql(), entry.getValue().getParameters())));
            }
            return result;
        }
    

    接下来我们看下SQL路由改写引擎org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine

    public final class SQLRouteRewriteEngine {
        
        /**
         * Rewrite SQL and parameters.
         *
         * @param sqlRewriteContext SQL rewrite context
         * @param routeResult route result
         * @return SQL map of route unit and rewrite result
         */
        public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) {
            Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1);
            for (RouteUnit each : routeResult.getRouteUnits()) {
                result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each)));
            }
            return result;
    }
    …
    }
    

    在创建SQLRewriteResult时,调用了RouteSQLBuilder的toSQL方法,此类为SQLBuilder接口实现类,负责进行SQL的改写SQL的生成。


    查看其toSQL方法,可以看到其调用SQLRewriteContext的getSqlTokens方法,获得其上绑定的所有SQL Token,然后调用其SQLToken的toString(),最后拼接出重写后的SQL。

    public abstract class AbstractSQLBuilder implements SQLBuilder {
        
        private final SQLRewriteContext context;
        
        @Override
        public final String toSQL() {
            if (context.getSqlTokens().isEmpty()) {
                return context.getSql();
            }
            Collections.sort(context.getSqlTokens());// 按照Token的起始位置排序
            StringBuilder result = new StringBuilder();
            result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex()));// 添加第一个Token之前的原始SQL
            for (SQLToken each : context.getSqlTokens()) {
                result.append(getSQLTokenText(each));// 添加Token对应的SQL片段
                result.append(getConjunctionText(each));// 添加Token之间的连接字符
            }
            return result.toString();
        }
    …
    }
    

    org.apache.shardingsphere.underlying.rewrite.sql.impl.RouteSQLBuilder

     protected String getSQLTokenText(final SQLToken sqlToken) {
            if (sqlToken instanceof RouteUnitAware) {
                return ((RouteUnitAware) sqlToken).toString(routeUnit);
            }
            return sqlToken.toString();
        }
    

    回到BasePrepareEngine,完成SQL的重写后,就可以构建SQLUnit,进而构建出ExecutionUnit,然后添加到ExecutionContext,然后返回,完成PrepareEngine的prepare操作:

    private Collection<ExecutionUnit> rewrite(final RouteContext routeContext, final SQLRewriteContext sqlRewriteContext) {
            Collection<ExecutionUnit> result = new LinkedHashSet<>();
            for (Entry<RouteUnit, SQLRewriteResult> entry : new SQLRouteRewriteEngine().rewrite(sqlRewriteContext, routeContext.getRouteResult()).entrySet()) {
                result.add(new ExecutionUnit(entry.getKey().getDataSourceMapper().getActualName(), new SQLUnit(entry.getValue().getSql(), entry.getValue().getParameters())));
            }
            return result;
        }
    

    prepare方法中,生成executionContext后,将分布式自增key进行了添加。generatedKeyContext的生成的自增值是由ShardingRouteDecorator类中getShardingConditions方法中添加的InsertClauseShardingConditionEngine负责生成。

        private void prepare() {
            executionContext = prepareEngine.prepare(sql, getParameters());
            findGeneratedKey().ifPresent(generatedKey -> generatedValues.add(generatedKey.getGeneratedValues().getLast()));
    }
       private Optional<GeneratedKeyContext> findGeneratedKey() {
            return executionContext.getSqlStatementContext() instanceof InsertStatementContext
                    ? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext() : Optional.empty();
        }
    
    执行引擎executor

    回到ShardingPrepareStatement的executeQuery()方法,在前面执行完prepare,下面就是SQL执行以及结合集合并,SQL的执行主要是由PreparedStatementExecutor负责。

    public ResultSet executeQuery() throws SQLException {
            ResultSet result;
            try {
                clearPrevious();
                prepare();
                initPreparedStatementExecutor();
                MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
                result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
            } finally {
                clearBatch();
            }
            currentResultSet = result;
            return result;
        }
    

    接下来看下initPreparedStatementExecutor()方法,首先进行了preparedStatementExecutor的初始化,然后设置了Statement的参数,之后对statement的一些方法进行回放设置。分别看下这几个方法。

    回放是JDBC接入端的一个设计,WrapperAdapter类中有两个方法recordMethodInvocation和replayMethodsInvocation,前者记录调用的方法名称,例如setAutoCommit、setReadOnly、setFetchSize、setMaxFieldSize等,在外围程序调用ShardingConnection的setAutoCommit、setReadOnly以及ShardingPreparedStatement的setFetchSize、setMaxFieldSize时进行调用,后者则进行对这些方法的真实调用,分别在底层真实JDBC类(DB driver、数据库连接池等)时进行重新调用。

     private void initPreparedStatementExecutor() throws SQLException {
            preparedStatementExecutor.init(executionContext);
            setParametersForStatements();
            replayMethodForStatements();
        }
    

    obtainExecuteGroups方法对执行单元进行分组,真正调用的是SQLExecutePrepareTemplate. getExecuteUnitGroups方法,
    org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor

        /**
         * Initialize executor.
         *
         * @param executionContext execution context
         * @throws SQLException SQL exception
         */
        public void init(final ExecutionContext executionContext) throws SQLException {
            setSqlStatementContext(executionContext.getSqlStatementContext());
            getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));
            cacheStatements();
    }
    …
          private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {
            return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {
    
                @Override
                // 在指定数据源上创建要求数量的数据库连接
                public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                    return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
                }
    
                @Override
                //根据执行单元信息 创建Statement执行单元对象
                public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException {
                    return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
                }
            });
        }…    
    protected final void cacheStatements() {
            for (InputGroup<StatementExecuteUnit> each : inputGroups) {
                statements.addAll(each.getInputs().stream().map(StatementExecuteUnit::getStatement).collect(Collectors.toList()));
                parameterSets.addAll(each.getInputs().stream().map(input -> input.getExecutionUnit().getSqlUnit().getParameters()).collect(Collectors.toList()));
            }
        }
    

    getExecuteUnitGroups方法首先按照数据库进行分组,然后根据SQL单元的数量与maxConnectionsSizePerQuery计算得出采用连接限制类型还是内存限制类型,最后生成SQL执行单元的最后分组。
    org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate

    /**
         * Get execute unit groups.
         *
         * @param executionUnits execution units
         * @param callback SQL execute prepare callback
         * @return statement execute unit groups
         * @throws SQLException SQL exception
         */
        public Collection<InputGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
            return getSynchronizedExecuteUnitGroups(executionUnits, callback);
        }
    
        // 生成同步执行单元分组
        private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
                final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
            Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);// 生成数据源与其SQLUnit的对应映射
            Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
            for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
                result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));// 将SQLUnit转化为InputGroup<StatementExecuteUnit>,对应关系为1:1
            }
            return result;
        }
    …
        // 生成SQL执行分组
        private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
                                                                           final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
            List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
            int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
            List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
            ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
            List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); // 根据要执行的SQL数量和maxConnectionsSizePerQuery配置,计算
            int count = 0;
            for (List<SQLUnit> each : sqlUnitPartitions) {
                result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));// 根据要执行的SQLUnit,生成对应StatementExecuteUnit对象,添加到返回结果集中
            }
            return result;
        }
    

    返回ShardingPreparedStament的executeQuery(),接下来执行的语句

     MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
    

    看下PreparedStatementExecutor的executeQuery方法,可以看到该方法就是通过callback方式,最后通过SqlExecuteTemplate.execute
    org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor

        /**
         * Execute query.
         *
         * @return result set list
         * @throws SQLException SQL exception
         */
        public List<QueryResult> executeQuery() throws SQLException {
            final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
            SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
                
                @Override
                // 在指定的Statement上执行SQL,将JDBC结果集包装成查询QueryResult对象(基于流模式、基于内存模式两类)
                protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                    return getQueryResult(statement, connectionMode);
                }
            };
            return executeCallback(executeCallback);// 通过executeCallback操作
        }
        // 执行SQL,然后将结果集转成QueryResult对象
        private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
            PreparedStatement preparedStatement = (PreparedStatement) statement;
            ResultSet resultSet = preparedStatement.executeQuery();
            getResultSets().add(resultSet);
            return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
        } 
    

    org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor

    protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
            List<T> result = sqlExecuteTemplate.execute((Collection) inputGroups, executeCallback);
            refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
            return result;
    }
        public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups, final SQLExecuteCallback<T> callback) throws SQLException {
            return execute(inputGroups, null, callback);
        }
        
    // org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteTemplate
        @SuppressWarnings("unchecked")
        public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups,
                                   final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
            try {
                return executorEngine.execute((Collection) inputGroups, firstCallback, callback, serial);
            } catch (final SQLException ex) {
                ExecutorExceptionHandler.handleException(ex);
                return Collections.emptyList();
            }
        }
    

    可以看到在SQLExecuteTemplate中,真正调用的是executorEngine.execute方法。
    org.apache.shardingsphere.underlying.executor.engine.ExecutorEngine

       /**
         * Execute.
         *
         * @param inputGroups input groups
         * @param firstCallback first grouped callback
         * @param callback other grouped callback
         * @param serial whether using multi thread execute or not
         * @param <I> type of input value
         * @param <O> type of return value
         * @return execute result
         * @throws SQLException throw if execute failure
         */
        public <I, O> List<O> execute(final Collection<InputGroup<I>> inputGroups, 
                                      final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback, final boolean serial) throws SQLException {
            if (inputGroups.isEmpty()) {
                return Collections.emptyList();
            }
            return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
    }
    
    // 串行执行
        private <I, O> List<O> serialExecute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback) throws SQLException {
            Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
            InputGroup<I> firstInputs = inputGroupsIterator.next();
            List<O> result = new LinkedList<>(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback));
            for (InputGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
                result.addAll(syncExecute(each, callback));
            }
            return result;
        }
    
        // 并行执行,可以支持两个回调函数,第一条记录执行第一个回调函数,其它的执行第二个回调函数
        private <I, O> List<O> parallelExecute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback) throws SQLException {
            Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
            InputGroup<I> firstInputs = inputGroupsIterator.next();
            Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(Lists.newArrayList(inputGroupsIterator), callback);
            return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
        }
    …
    }
    

    其中分为串行执行和并行执行两种方式,同步就在当前线程中执行callback逻辑,异步就在线程池中执行。而callback即为PreparedStatementExecutor类中executeQuery()所定义。

    public List<QueryResult> executeQuery() throws SQLException {
            final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
            SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
                
                @Override
                protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                    return getQueryResult(statement, connectionMode);
                }
            };
            return executeCallback(executeCallback);
        }
    
    归并Merge

    回到ShardingPreparedStatement类中mergeQuery方法,主要逻辑即为创建MergeEngine对象,执行merge方法。

    public ResultSet executeQuery() throws SQLException {
            ResultSet result;
            try {
    …
                MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
    …
            } 
    …
        }
    
    private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
            ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
            MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
            return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
        }
    

    可以看到mergeQuery方法中新建了MergeEngine实例,然后调用了其merge方法。展开看下MergeEngine中merge方法:

    org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine

    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
            registerMergeDecorator();// 注册归并装饰器
            return merger.process(queryResults, sqlStatementContext);
    }
    
        private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
            for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
                if (entry.getValue() instanceof ResultMergerEngine) {
                    ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
                    return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
                }
            }
            return Optional.empty();
        }
        
        private void registerMergeDecorator() {
            for (Class<? extends ResultProcessEngine> each : OrderedRegistry.getRegisteredClasses(ResultProcessEngine.class)) {
                ResultProcessEngine processEngine = createProcessEngine(each);
                Class<?> ruleClass = (Class<?>) processEngine.getType();
               
                rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
                        .forEach(rule -> merger.registerProcessEngine(rule, processEngine));
            }
        }
    

    在注册的ResultProcessEngine实现类后,则直接调用merger.process方法即

    org.apache.shardingsphere.underlying.merge.MergeEntry#process

    /**
     * Merge entry.
     */
    public final class MergeEntry {
    …    
        /**
         * Process query results.
         *
         * @param queryResults query results
         * @param sqlStatementContext SQL statement context
         * @return merged result
         * @throws SQLException SQL exception
         */
        public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
            Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);
            Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
            return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
        }
        
        @SuppressWarnings("unchecked")
        private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
            for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
                if (entry.getValue() instanceof ResultMergerEngine) {
                    ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
                    return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
                }
            }
            return Optional.empty();
        }
        
        @SuppressWarnings("unchecked")
        private MergedResult decorate(final MergedResult mergedResult, final SQLStatementContext sqlStatementContext) throws SQLException {
            MergedResult result = null;
            for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
                if (entry.getValue() instanceof ResultDecoratorEngine) {
                    ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
                    result = null == result ? resultDecorator.decorate(mergedResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
                }
            }
            return null == result ? mergedResult : result;
        }
        
        @SuppressWarnings("unchecked")
        private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
            MergedResult result = null;
            for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
                if (entry.getValue() instanceof ResultDecoratorEngine) {
                    ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
                    result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
                }
            }
            return Optional.ofNullable(result);
        }
    }
    

    在MergeEntry#process方法中,先调用了本地私有方法merge,这个方法中顺序执行ResultMergerEngine实现类的newInstance方法创建ResultMerger实例,然后执行其merge方法。

    ResultMergerEngine接口实现类目前只有一个ShardingResultMergerEngine,看下该类的newInstance方法代码:

    org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine

    public final class ShardingResultMergerEngine implements ResultMergerEngine<ShardingRule> {
        
        @Override
        public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {
            if (sqlStatementContext instanceof SelectStatementContext) {
                return new ShardingDQLResultMerger(databaseType);
            } 
            if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
                return new ShardingDALResultMerger(shardingRule);
            }
            return new TransparentResultMerger();
        }
    

    可以看到newInstance方法中会根据当前SQL的类型创建对应的ResultMerger实例,以最常用的数据分片结果归并器ShardingDQLResultMerger为例,看下其处理逻辑

    org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger

    /**
     * DQL result merger for Sharding.
     */
    public final class ShardingDQLResultMerger implements ResultMerger {
        
        private final DatabaseType databaseType;
        
    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
            if (1 == queryResults.size()) {
                return new IteratorStreamMergedResult(queryResults);
            }
            Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
            SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
            selectStatementContext.setIndexes(columnLabelIndexMap);
            MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
            return decorate(queryResults, selectStatementContext, mergedResult);
    }
    …
    private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                                   final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
            if (isNeedProcessGroupBy(selectStatementContext)) {
                return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
            }
            if (isNeedProcessDistinctRow(selectStatementContext)) {
                setGroupByForDistinctRow(selectStatementContext);
                return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
            }
            if (isNeedProcessOrderBy(selectStatementContext)) {
                return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
            }
            return new IteratorStreamMergedResult(queryResults);
        }
    …
    
        private MergedResult getGroupByMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                                                    final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
            return selectStatementContext.isSameGroupByAndOrderByItems()
                    ? new GroupByStreamMergedResult(columnLabelIndexMap, queryResults, selectStatementContext, schemaMetaData)
                    : new GroupByMemoryMergedResult(queryResults, selectStatementContext, schemaMetaData);
        }
     ...
    }
    

    在merge方法中会根据判断SQL中是否包含group by、distinct row和order by,然后构建GroupByStreamMergedResult或者GroupByMemoryMergedResult、OrderByStreamMergedResult,这些MergedResult对象实现了MergedResult接口,支持next和getValue等方法,在ShardingPrepareStatement的executeQuery()方法中,最后使用此MergedResult构建了ShardingResultSet返回。

    org.apache.shardingsphere.underlying.merge.result.MergedResult

    /**
     * Merged result after merge engine.
     */
    public interface MergedResult {
        
        /**
         * Iterate next data.
         * 
         * @return has next data
         * @throws SQLException SQL Exception
         */
        boolean next() throws SQLException;
        
        /**
         * Get data value.
         *
         * @param columnIndex column index
         * @param type class type of data value
         * @return data value
         * @throws SQLException SQL Exception
         */
    Object getValue(int columnIndex, Class<?> type) throws SQLException;
    
    …
    }
    
    JDBC

    org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet

    /**
     * Result that support sharding.
     */
    public final class ShardingResultSet extends AbstractResultSetAdapter {
        
        private final MergedResult mergeResultSet;
        
        private final Map<String, Integer> columnLabelAndIndexMap;
        
        …
        @Override
        public boolean next() throws SQLException {
            return mergeResultSet.next();
        }
        …
    }
    

    ShardingResultSet实现了JDBC的ResultSet接口,在应用拿到ResultSet时,调用其next()方法时,实际调用的是这些MergedResult实现类的next()方法。ShardingResultSet的类层次关系如下:



    此粗略的将sharding-jdbc的处理流程浏览了一遍,虽然并没有深入各类的细节,但可以比较清晰的看到各个内核引擎的职责与边界,按照SQL处理的流向以及各引擎的输入输出,可以画一个整体的内核引擎处理流程图如下:


    设计模式

    在整个代码浏览中,我们可以看到ShardingSphere使用率很高的几个设计模式,熟悉这几个设计模式,可以更快的理解作者的设计意图,也是ShardingSphere主要的扩展点。

    1. 工厂模式

    在ShardingSphere中工厂类主要为简单工厂模式,覆盖了大部分模块,主要职责就是根据配置类型、数据库类型、SQL类型等创建对应的接口实现类。工厂模式将创建各种用途的接口类细节进行了封装,从而将处理流程更加通用。例如在使用sharding-jdbc时, ShardingSphere提供了不同功能对应的DataSource工厂类,根据提供的配置与连接池创建对应的DataSource实现类。

    分库分表DataSource工厂类

    /**
     * Sharding data source factory.
     */
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class ShardingDataSourceFactory {
        
        /**
         * Create sharding data source.
         *
         * @param dataSourceMap data source map
         * @param shardingRuleConfig rule configuration for databases and tables sharding
         * @param props properties for data source
         * @return sharding data source
         * @throws SQLException SQL exception
         */
        public static DataSource createDataSource(
                final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException {
            return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
        }
    }
    

    读写分离DataSource工厂类

    /**
     * Master-slave data source factory.
     */
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class MasterSlaveDataSourceFactory {
        
        /**
         * Create master-slave data source.
         *
         * @param dataSourceMap data source map
         * @param masterSlaveRuleConfig master-slave rule configuration
         * @param props props
         * @return master-slave data source
         * @throws SQLException SQL exception
         */
        public static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException {
            return new MasterSlaveDataSource(dataSourceMap, new MasterSlaveRule(masterSlaveRuleConfig), props);
        }
    }
    
    2. 建造者模式

    与工厂类类似,建造者Builder模式也是创建型设计模式,常见的builder设计多为fluent风格,提供类似set、add等方法设置或者添加属性,最后通过build方法完成实例的创建,主要解决的需要设置多个关联属性,创建实例时需要统一校验的类创建。
    不过ShardingSphere中的建造者类并没有这么设计,它的作用是一些*Builder接口的实现,其中方法返回需要创建的类对象,与上面工厂类区别是,其创建的往往是一些确定的类实例。

    默认SQL构建器类

    /**
     * Default SQL builder.
     */
    public final class DefaultSQLBuilder extends AbstractSQLBuilder {
        
        public DefaultSQLBuilder(final SQLRewriteContext context) {
            super(context);
        }
        
        @Override
        protected String getSQLTokenText(final SQLToken sqlToken) {
            return sqlToken.toString();// 返回Token对应的文本字符
        }
    }
    
    3. Callback回调模式

    执行引擎中的大量使用该设计模式,与Spring的JDBCTemplate、TransactionTemplate类似,ShardingSphere中的SQLExecuteTemplate、ExecutorEngine也是如此设计,引擎使用者提供CallBack实现类,使用该模式是因为在SQL执行时,需要支持更多类型的SQL,不同的SQL如DQL、DML、DDL、不带参数的SQL、参数化SQL等,不同的SQL操作逻辑并不一样,但执行引擎需要提供一个通用的执行策略。

      public final class SQLExecuteTemplate {
     …
        public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups,
                                   final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
            try {
                return executorEngine.execute((Collection) inputGroups, firstCallback, callback, serial);
            } catch (final SQLException ex) {
                ExecutorExceptionHandler.handleException(ex);
                return Collections.emptyList();
            }
        }
    
    public final class ExecutorEngine implements AutoCloseable {
    …
    public <I, O> List<O> execute(final Collection<InputGroup<I>> inputGroups, 
                                      final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback, final boolean serial) throws SQLException {
            if (inputGroups.isEmpty()) {
                return Collections.emptyList();
            }
            return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
    }
    …
    }
    

    注意虽然这些类叫Template,但与经典设计模式中的模板模式并不一样。*

    4. 装饰器模式

    装饰器decorate模式在路由引擎、改写引擎、归并引擎中进行了使用,也是5.x版本中实现pluggable的主要的设计模式,在4.1.1中也已经可以看到数据分片、读写分离、加密、影子表等都做为一个功能实现以装饰器方式进行了重构。装饰器模式可以实现功能组合,例如既要使用数据分片,又要使用加密和影子表功能,这些功能够可以看做对改写、路由的增强。

    在DataNodeRouter类中

        private RouteContext executeRoute(final String sql, final List<Object> parameters, final boolean useCache) {
            RouteContext result = createRouteContext(sql, parameters, useCache);
            for (Entry<BaseRule, RouteDecorator> entry : decorators.entrySet()) {
                result = entry.getValue().decorate(result, metaData, entry.getKey(), properties);
            }
            return result;
        }
    

    在SQLRewriteEntry类中

     private void decorate(final Map<BaseRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
            for (Entry<BaseRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {
                BaseRule rule = entry.getKey();
                SQLRewriteContextDecorator decorator = entry.getValue();
                if (decorator instanceof RouteContextAware) {
                    ((RouteContextAware) decorator).setRouteContext(routeContext);
                }
                decorator.decorate(rule, properties, sqlRewriteContext);
            }
        }
    

    在MergeEntry类中

     private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
            MergedResult result = null;
            for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
                if (entry.getValue() instanceof ResultDecoratorEngine) {
                    ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
                    result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
                }
            }
            return Optional.ofNullable(result);
        }
    
    5. 访问者模式

    ShardingSphere在SQL解析中使用了大量的访问者模式,这种模式我们在一般项目中并不经常使用,不过在编译技术领域却几乎是标配,antlr、druid都使用了该种模式,开发者如果需要进行功能开发都会通过这种模式实现对AST的访问。antlr4基于g4文件生成java类时除了词法Lexer、语法Parser类也生成一个访问者基类,ShardingSphere各种数据库对应的Vistor都是继承自此类,在实现的子类中拿到SQL中对应的各种要素,进而构建出对应的SQLSelectStatement。

    例如org.apache.shardingsphere.sql.parser.mysql.visitor.MySQLVisitor

    /**
     * MySQL visitor.
     */
    @Getter(AccessLevel.PROTECTED)
    public abstract class MySQLVisitor extends MySQLStatementBaseVisitor<ASTNode> {
        
        private int currentParameterIndex;
        
        @Override
        public final ASTNode visitParameterMarker(final ParameterMarkerContext ctx) {
            return new ParameterMarkerValue(currentParameterIndex++);
    }
        @Override
        public final ASTNode visitLiterals(final LiteralsContext ctx) {
            if (null != ctx.stringLiterals()) {
                return visit(ctx.stringLiterals());
            }
            if (null != ctx.numberLiterals()) {
                return visit(ctx.numberLiterals());
            }
            if (null != ctx.dateTimeLiterals()) {
                return visit(ctx.dateTimeLiterals());
            }
            if (null != ctx.hexadecimalLiterals()) {
                return visit(ctx.hexadecimalLiterals());
            }
            if (null != ctx.bitValueLiterals()) {
                return visit(ctx.bitValueLiterals());
            }
            if (null != ctx.booleanLiterals()) {
                return visit(ctx.booleanLiterals());
            }
            if (null != ctx.nullValueLiterals()) {
                return visit(ctx.nullValueLiterals());
            }
            throw new IllegalStateException("Literals must have string, number, dateTime, hex, bit, boolean or null.");
        }
        …
    }
    

    除了上述几个设计模式,还用到了解释器模式、单例模式、代理模式、策略模式等,出于篇幅关系这里就不一一介绍。

    代码目录结构

    我们在前面进行代码分析都是围绕SQL路由、改写、执行、归并这几个核心模块,这些也是ShardingSphere官方文档中所指的内核引擎,在4.1.1版本中代码位于是在sharding-core项目下。不过这里我们自己想一想什么是ShardingSphere的核心,这个问题相信ShardingSphere的主创者们有更深的认识,这里简单的说下我个人的理解、回答这个问题主要还是在于对ShardingSphere的定义,早期只有sharding-jdbc的时候,其实无所谓core,因为整个功能是明确的。但当sharding-proxy项目出来后,自然而然的想到的就是哪些功能、代码是jdbc和proxy可以复用的,除了接入方式不一样(一个是JDBC、一个是MySQL协议),SQL解析、路由、重写、执行都是可以复用的,因此这些就是成了core;再到后来,SQL解析器在经过druid->手写简化版->antlr的演化后,SQL解析现在变成了一个高扩展性、支持多种数据库方言的独立模块,因此将其进行剥离成了sql-parser, 与此类似,Proxy版本中MySQL报文协议的部分,后来增加了PostgreSQL,因此将其独立出来成了db-protocol部分。

    在功能层面,前期只是分库分表、读写分离,后来增加了加解密、影子表等功能,这些功能又与原来core的代码柔合在一起,因此在5.x版本中,我们可以看到ShardingSphere作者们对其做了进一步的重构,将原来core的代码修改为infra模块,将路由、重写、合并、执行模块中涉及到具体功能的逻辑进行剥离,在infra中路由、重写、合并与执行就只剩下引擎以及装饰器,保留的就是执行算法与流程,数据分片sharding、读写分离master-slave、加密encrypt、影子表shadow都作为功能feature拆分到了feature项目中,实现自身对应的路由、重写、合并、执行等装饰器接口,这样的调整就使得ShardingSphere面向开发者提供了一个统一的接入、增强、定制的入口,同样ShardingSphere的原核心功能(分录分表、读写分离)也是这种方式集成,第三方开发者也是如此,这种一致的开发方式也更方便用户可以根据自身需要裁减ShardingSphere的各组件。

    虽然目前5.x版本还未正式release,不过从github上master分支中代码目录结构中可以看出新的设计架构,feature项目下包含了分片、加密、主从、复制、影子表;infra项目下包含了原内核引擎(路由、重写、执行、归并);增加了控制面control-panel项目,将原来orchestration、metric、opentracing都迁移到了该项目下;将SQL解析与数据库报文协议都独立成为一个项目,这些更方便开发者有选择的ShardingSphere工具集;另外还增加了一个kernel项目,不过目前里面只有一些context类,目前还不知道这个项目的准确定位。

    ShardingSphere 5.x目录结构

    代码风格与质量

    ShardingSpherede 经过几个版本的迭代重构,代码已不止于满足功能层面,在编写风格、目录结构、扩展方式等方面等进行了精心的设计。相比于当下很多开源项目,阅读其代码给人的感觉很清爽,面条式代码很少。
    项目用了大量lombok的注解,例如@Getter、@Setter、@RequiredArgsConstructor、@Data等,通过这种方式,减少了很多getter、setter、构造函数、hashCode等代码,从而让代码更加简洁。另外4.x版本在对JDK版本升级到8后,将数据集合遍历从for each修改为了stream方式,使用了Optional方式减少了很多判空类似if(xx==null)这样的代码,也使得代码看起来更加优雅易读。

    不过部分代码为了简洁,存在级联调用太长、方法参数太多问题,导致可读性比较低,这种情况我觉得还是通过添加部分中间变量更好些,这种情况在sharding-proxy项目中更加常见。

    例如org.apache.shardingsphere.shardingproxy.frontend.netty.FrontendChannelInboundHandler#channelRead方法:

        public void channelRead(final ChannelHandlerContext context, final Object message) {
    …        CommandExecutorSelector.getExecutor(databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection(), backendConnection.isSupportHint(),
                    backendConnection.getTransactionType(), context.channel().id()).execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
        }
    …
    }
    

    org.apache.shardingsphere.shardingproxy.frontend.mysql.command.query.binary.execute.MySQLComStmtExecuteExecutor#createQueryPacket方法

    private Collection<DatabasePacket> createQueryPacket(final QueryResponse backendResponse) {
    …
            for (QueryHeader each : queryHeader) {
                result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, each.getSchema(), each.getTable(), each.getTable(),
                        each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
            }
            result.add(new MySQLEofPacket(++currentSequenceId));
            return result;
        }
    

    官方开发规范中对开发理念、编码规范有更详细的介绍https://shardingsphere.apache.org/community/cn/contribute/code-conduct/

    4.1.1中代码设计的一些问题

    引擎边界不够清晰,调用关系嵌套

    在4.1.1版本中是JDBC层调用prepare引擎,prepare引擎(PrepareEngine.preapre方法)调用路由引擎(DataNodeRouter.route方法),路由引擎内部(route方法)又调用了解析引擎(ParseEngine.parse)进行SQL的解析,然后再由prepare引擎(PrepareEngine.preapre方法)中再调用改写引擎(RewriteEngine.rewrite)进行SQL的重写,JDBC层再调用执行引擎运行SQL,最后JDBC层再调用执行引擎完成结果集合并。这样的调用关系其实耦合了各内核引擎,更好的方式应该是在JDBC层或者Proxy层统一负责编排各引擎,分别调用SQL引擎、路由引擎、改写引擎、执行引擎、归并引擎,而不是在路由引擎内嵌套调用SQL解析。

    这个问题在目前5.0.0-RC1-SNAPSHOT版本中已进行了重构优化,例如在ShardingSpherePreparedStatement(原ShardingPreparedStatement改名)类中,可以看到SQL解析迁移到了构造函数中,将路由、改写操作统一放到了createExecutionContext()方法中,另外将生成执行输入分组操作也从PreparedStatementExecutor中挪到了JDBC层。

    /**
     * ShardingSphere prepared statement.
     */
    public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {
    …    private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
                                                final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
            this.connection = connection;
            schemaContexts = connection.getSchemaContexts();
            this.sql = sql;
            statements = new ArrayList<>();
            parameterSets = new ArrayList<>();
            sqlStatement = schemaContexts.getDefaultSchemaContext().getRuntimeContext().getSqlParserEngine().parse(sql, true);// 进行SQL解析
    …
        }
        
        @Override
        public ResultSet executeQuery() throws SQLException {
            ResultSet result;
            try {
                clearPrevious();
                executionContext = createExecutionContext();// 创建执行上下文,完成SQL的路由与改写
                List<QueryResult> queryResults;
                if (ExecutorConstant.MANAGED_RESOURCE) {
                    Collection<InputGroup<StatementExecuteUnit>> inputGroups = getInputGroups();
                    cacheStatements(inputGroups);
                    reply();
                    queryResults = preparedStatementExecutor.executeQuery(inputGroups);// SQL执行
                } else {
                    queryResults = rawExecutor.executeQuery(getRawInputGroups(), new RawSQLExecutorCallback());
                }
                MergedResult mergedResult = mergeQuery(queryResults);// 结果集归并
                result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
            } finally {
                clearBatch();
            }
            currentResultSet = result;
            return result;
    }
    …
    }
    

    内核引擎命名不统一,使用方式不太一致

    解析引擎:
    1. 通过SQLParserEngineFactory. getSQLParserEngine返回对应的数据库SQL解析器
    2. 然后调用SQLParserEngine.parse方法解析成SQLStatement。
      通过SQLParserExecutor. execute方法返回解析抽象语法树即ParseASTNode,execute方法中通过SQLParserFactory. newInstance创建出SQL解析器即SQLParser实现类,不同数据库有不同实现类。
    路由引擎:
    1. 路由引擎并非直接创建RouteEngine实例,而是先要创建DataNodeRouter类实例,调用其 route方法,返回RouteContext对象,其中包含了路由后的结果。
    2. route方法中,依次调用各种RouteDecorator实现类,在具体实现类中创建了具体的RouteEngine执行route操作,例如在ShardingRouteDecorator中,通过ShardingRouteEngineFactory.newInstance创建了ShardingRouteEngine来实现真正的路由逻辑。
    重写引擎:
    1. 通过SQLRewriteEntry类createSQLRewriteContext方法返回SQLRewriteContext;
    2. 然后调用SQLRewriteEngine或者SQLRouteRewriteEngine来完成路由;
    3. 在RewriteEngine中主要的逻辑其实只是通过SQLBuilder实现类,将SQLRewriteContext中的Token转换成SQL。
    执行引擎:
    1. 创建PreparedStatementExecutor(Proxy方式为JDBCExecuteEngine)实例;
    2. 之后ExecutePrepareTemplate实例生成执行单元分组InputGroup<StatementExecuteUnit>(在5.x中此功能由ExecuteGroupEngine接口实现类完成);
    3. 最后创建SQLExecuteTemplate实例,其内部通过ExecutorEngine执行分组后的StatementExecuteUnit。
    合并引擎:
    1. 创建MergeEngine,其内部会创建一个MergeEntry,其merge方法主要就是顺序调用ResultMergerEngine实现类,目前就是ShardingResultMergerEngine ,例如对于Select SQL,会创建ShardingDQLResultMerger,其merge方法会根据SelectStatementContext创建对应的stream/memory类型MergedResult;
    2. 最后有ResultDecoratorEngine实现类完成对MergedResult的再次处理。

    可以看到这些引擎的使用方式各不相同,有的是由Entry类开始,有的是Engine,有的是Engine内部嵌套Entry类,这种不一致给开发者带来更高的心智负荷,这些设计问题在5.x正得到优化。

    部分功能还缺乏扩展点

    某些功能扩展还不够灵活,例如在事务这块,目前的ShardingSphere的事务类型只支持LOCAL、BASE、XA,并不支持SPI进行新增,之前项目中基于sharding-jdbc新增一个严格单库事务管理器时,只能修改TransactionType类源码。

    好的一个开源项目应该是不断演变进步的。ShardingSphere 5.x版本的目前正在接近release,目标是pluggable,相信上述问题会通过更好的设计得到解决。另外ShardingSphere现在定位也不只是一个分库分表中间件,而是一个分布式数据库,未来后端除了关系数据库还会有其它存储方式,所以内核层面可能还会增加查询优化、执行计划、数据统计信息(与数据库类似)等概念,接入端可能还会支持更多的数据接入方式(前端增加REST、GraphQL、Redis等;后端增加Redis、MongoDB、HBase等)。

    总览篇到此为止,在接下来的篇章里,会对各内核引擎进行更详细的源码分析。

    相关文章

      网友评论

        本文标题:从源码看ShardingSphere设计-总览篇

        本文链接:https://www.haomeiwen.com/subject/onfpjktx.html