如果是早期使用过ShardingSphere的开发者,大多都知道ShardingSphere源于sharding-jdbc。sharding-jdbc 1.x版本主要功能就是分库分表、读写分离、分布式主键;在2.x版本增加了orchestration,3.x版本中增加了sharding-proxy、opentracing等功能,4.x版本进入了apache基金会,增加了加解密、影子表、扩容等。除了这些功能层面,在代码上其实各个版本对代码都进行了大量的重构,例如早期SQL的解析使用的alibaba的druid,后来又实现了一个简化版的SQL解析器,再后来统一基于antlr重新改写;早期内核引擎还有优化引擎等,后面重构到了重写引擎了;正在开发的5.x目标是pluggable,对代码结构、包类名能都有较大调整,对开发者也更友好。作为一名开发者,我觉得ShardingSphere不止是一个数据库中间件,而是一个围绕SQL、DB的开发平台和工具集。同时其代码质量也很高,对其进行源码分析更是可以学到很多软件设计与开发的知识。
本文不介绍ShardingSphere的具体功能,关于ShardingSphere功能,官网有更完整的介绍https://shardingsphere.apache.org/document/current/en/overview
本系列基于的源码是当前最新release版本4.1.1,但涉及到差异较大的也会跟5.0.0-RC1-SNAPSHOT进行比较。4.1.1版本共有18个子项目,具体名称分类如下:
sharding-sphere 4.1.1代码目录
根据功能划分,本系列会包含多个章节篇幅:
- 总览
- 解析引擎
- 路由引擎
- 改写引擎
- 执行引擎
- 归并引擎
- 事务篇
- JDBC篇
- Proxy篇
- Ochestration篇
- 扩容篇
- 5.x代码变化
本文为总览篇,会通过快速浏览下其执行流程,了解涉及到的类与方法的职责与定位,从而对ShardingSphere的内核形成一个整体认识,另外会总结下使用的设计模式、代码结构、代码风格,以及4.1.1版本中目前存在的问题。
代码调用分析
我们看源代码,需要一个入口,ShardingSphere中最成熟、使用率最高的莫过于sharding-jdbc,因此我们就从sharding-jdbc作为代码分析的切入点。从名字就可以看出sharding-jdbc支持JDBC,熟悉JDBC规范的开发者都知道其核心就是DataSource、Connection、Statement、PrepareStatement等接口,在sharding-jdbc中,这些接口的实现类分别对应ShardingDataSource、ShardingConnection、ShardingStatment、ShardingPreparedStatement类。接下来就从一条查询SQL出发,顺着方法的调用脉络看下这些类的代码:
为了在代码分析过程中更好的定位在调用链所处位置,会在通过加标题来注明接下来代码所属于的功能范畴。
JDBC
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource
public class ShardingDataSource extends AbstractDataSourceAdapter {
private final ShardingRuntimeContext runtimeContext;
…
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection
public final class ShardingConnection extends AbstractConnectionAdapter {
…
@Override
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new ShardingPreparedStatement(this, sql);
}
…
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
}
…
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingStatement
/**
* Statement that support sharding.
*/
public final class ShardingStatement extends AbstractStatementAdapter {
…
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
ResultSet result;
try {
executionContext = prepare(sql);
List<QueryResult> queryResults = statementExecutor.executeQuery();
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingResultSet(statementExecutor.getResultSets(), mergedResult, this, executionContext);
} finally {
currentResultSet = null;
}
currentResultSet = result;
return result;
}
…
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement
/**
* PreparedStatement that support sharding.
*/
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
…
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
prepare();
initPreparedStatementExecutor();
MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
可以看到,真正进行sql解析、路由都在Statement实现类中。
我们以ShardingPreparedStatement.executeQuery()为例,看看整个流程。
clearPrevious()负责PreparedStatementExecutor的重置,因为一个Statement可以多次执行多个SQL,每次执行完SQL,PreparedStatementExecutor会记录真实的Statement,connection,该方法负责关闭statement,清理记录的参数、连接等。
public void clear() throws SQLException {
clearStatements();
statements.clear();
parameterSets.clear();
connections.clear();
resultSets.clear();
inputGroups.clear();
}
executeQuery方法中剩余的代码则完成SQL的解析、路由、改写、执行与合并。
SQL处理流程Prepare引擎
接下来看下prepare,这个方法其实完成prepare引擎的执行和自增key的生成添加。
private void prepare() {
executionContext = prepareEngine.prepare(sql, getParameters());
findGeneratedKey().ifPresent(generatedKey -> generatedValues.add(generatedKey.getGeneratedValues().getLast()));
}
prepare引擎并不在官方内核引擎范围,因为它完成的事其实就是调用解析引擎、路由引擎、改写引擎进行SQL的解析、路由操作,相当于这些内核引擎的编排执行,其对应的类图如下:
其对应的类层次如下:
看下prepare引擎的prepare方法
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine
public ExecutionContext prepare(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
RouteContext routeContext = executeRoute(sql, clonedParameters);// SQL路由
ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));// SQL改写
if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
}
return result;
}
…
private RouteContext executeRoute(final String sql, final List<Object> clonedParameters) {
registerRouteDecorator();
return route(router, sql, clonedParameters);
}
private void registerRouteDecorator() {
for (Class<? extends RouteDecorator> each : OrderedRegistry.getRegisteredClasses(RouteDecorator.class)) {
RouteDecorator routeDecorator = createRouteDecorator(each);
Class<?> ruleClass = (Class<?>) routeDecorator.getType();
rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
.forEach(rule -> router.registerDecorator(rule, routeDecorator));
}
}
…
protected abstract RouteContext route(DataNodeRouter dataNodeRouter, String sql, List<Object> parameters);
…
}
可以看到prepare方法中核心的就是路由(executeRoute)和改写(executeRewrite)操作,分别看下这两个方法:
executeRoute方法实现了SQL的路由,包括registerRouteDecorator方法注册路由的装饰器(基于SPI的自定义扩展)以及route方法进行路由(计算应该运行在哪个库和操作哪张表),展开看看route的是怎么实现:
路由route
route方法由子类完成,即调用DataNodeRouter的route方法,看下BasePrepareEngine的子类PreparedQueryPrepareEngine。
org.apache.shardingsphere.underlying.pluggble.prepare.PreparedQueryPrepareEngine
@Override
protected RouteContext route(final DataNodeRouter dataNodeRouter, final String sql, final List<Object> parameters) {
return dataNodeRouter.route(sql, parameters, true);
}
继续看org.apache.shardingsphere.underlying.route.DataNodeRouter,这个类中完成了路由上下文RouteContext的生成。在executeRoute方法中有两个操作:
- 调用createRouteContext方法,该方法中调用解析引擎完成了SQL的解析,创建了一个初始的RouteContext实例;
- 依次调用注册的RouteDecorator实现类decorate方法,根据路由结果对RouteContext对象进行对应的操作。
org.apache.shardingsphere.underlying.route.DataNodeRouter
public final class DataNodeRouter {
…
public RouteContext route(final String sql, final List<Object> parameters, final boolean useCache) {
routingHook.start(sql);
try {
RouteContext result = executeRoute(sql, parameters, useCache);//进行路由计算,生成路由结果
routingHook.finishSuccess(result, metaData.getSchema());
return result;
} catch (final Exception ex) {
routingHook.finishFailure(ex);
throw ex;
}
}
@SuppressWarnings("unchecked")
private RouteContext executeRoute(final String sql, final List<Object> parameters, final boolean useCache) {
RouteContext result = createRouteContext(sql, parameters, useCache);
for (Entry<BaseRule, RouteDecorator> entry : decorators.entrySet()) {
result = entry.getValue().decorate(result, metaData, entry.getKey(), properties);
}
return result;
}
private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
SQLStatement sqlStatement = parserEngine.parse(sql, useCache);//解析SQL,生成SQL对应AST
try {
SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);// 生成SQL Statement上下文,相当于一部分语义分析
return new RouteContext(sqlStatementContext, parameters, new RouteResult());
} catch (final IndexOutOfBoundsException ex) {
return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());
}
}
}
SQL解析
可以看到在createRouteContext 方法,完成了两个操作:
- 调用parserEngine.parse方法,即通过SQL解析引擎对SQL进行解析,进而转化成AST,即SQLStatement接口实现类;
- 通过SQLStatementContextFactory方法将SQLStatement实例转化为SQLStatementContext实例。
SQLStatement的类层级关系如下:
目前sharingsphere4.1.1中SQLStatement的实现类有83个,上图中为了展示方面只包含了主要几个。
这里涉及到了ShardingSphere的一个内核引擎:SQL解析引擎,这里我们进入SQLParserEngine类,看下SQL解析引擎的实现。
org.apache.shardingsphere.sql.parser.SQLParserEngine
public final class SQLParserEngine {
private final String databaseTypeName;
private final SQLParseResultCache cache = new SQLParseResultCache();
…
/**
* Parse SQL.
*
* @param sql SQL
* @param useCache use cache or not
* @return SQL statement
*/
public SQLStatement parse(final String sql, final boolean useCache) {
ParsingHook parsingHook = new SPIParsingHook();// SQL解析hook
parsingHook.start(sql);
try {
SQLStatement result = parse0(sql, useCache);
parsingHook.finishSuccess(result);
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
parsingHook.finishFailure(ex);
throw ex;
}
}
private SQLStatement parse0(final String sql, final boolean useCache) {
if (useCache) {
Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
if (cachedSQLStatement.isPresent()) {// 如果缓存中有该SQL的解析结果,则直接复用
return cachedSQLStatement.get();
}
}
ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();// 1. 解析SQL生成AST,ParseTree是antlr对应的解析树接口
SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);//2. 通过访问者模式,将antlr的解析树转化为SQLStatement
if (useCache) {
cache.put(sql, result);
}
return result;
}
可以看到其内部通过调用SQLParserExecutor.execute()方法生成的解析树ParseASTNode,然后通过ParseTreeVisitor访问解析树,生成SQLStatement,看下SQLParserExecutor和ParseTreeVisitorFactory类
org.apache.shardingsphere.sql.parser.core.parser.SQLParserExecutor
public final class SQLParserExecutor {
private final String databaseTypeName;
private final String sql;
/**
* Execute to parse SQL.
*
* @return AST node
*/
public ParseASTNode execute() {
ParseASTNode result = towPhaseParse();
if (result.getRootNode() instanceof ErrorNode) {
throw new SQLParsingException(String.format("Unsupported SQL of `%s`", sql));
}
return result;
}
private ParseASTNode towPhaseParse() {//名称可能拼错了,可能是two towPhaseParse?
SQLParser sqlParser = SQLParserFactory.newInstance(databaseTypeName, sql);//创建该类型数据库对应的SQL解析器
try {
((Parser) sqlParser).setErrorHandler(new BailErrorStrategy());
((Parser) sqlParser).getInterpreter().setPredictionMode(PredictionMode.SLL);
return (ParseASTNode) sqlParser.parse();
} catch (final ParseCancellationException ex) {
((Parser) sqlParser).reset();
((Parser) sqlParser).setErrorHandler(new DefaultErrorStrategy());
((Parser) sqlParser).getInterpreter().setPredictionMode(PredictionMode.LL);
return (ParseASTNode) sqlParser.parse();
}
}
org.apache.shardingsphere.sql.parser.core.visitor.ParseTreeVisitorFactory
/**
* Parse tree visitor factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ParseTreeVisitorFactory {
/**
* New instance of SQL visitor.
*
* @param databaseTypeName name of database type
* @param visitorRule visitor rule
* @return parse tree visitor
*/
public static ParseTreeVisitor newInstance(final String databaseTypeName, final VisitorRule visitorRule) {
for (SQLParserConfiguration each : NewInstanceServiceLoader.newServiceInstances(SQLParserConfiguration.class)) {
if (each.getDatabaseTypeName().equals(databaseTypeName)) {
return createParseTreeVisitor(each, visitorRule.getType());
}
}
throw new UnsupportedOperationException(String.format("Cannot support database type '%s'", databaseTypeName));
}
@SneakyThrows
private static ParseTreeVisitor createParseTreeVisitor(final SQLParserConfiguration configuration, final SQLStatementType type) {
SQLVisitorFacade visitorFacade = configuration.getVisitorFacadeClass().getConstructor().newInstance();
switch (type) {
case DML:
return (ParseTreeVisitor) visitorFacade.getDMLVisitorClass().getConstructor().newInstance();
case DDL:
return (ParseTreeVisitor) visitorFacade.getDDLVisitorClass().getConstructor().newInstance();
case TCL:
return (ParseTreeVisitor) visitorFacade.getTCLVisitorClass().getConstructor().newInstance();
case DCL:
return (ParseTreeVisitor) visitorFacade.getDCLVisitorClass().getConstructor().newInstance();
case DAL:
return (ParseTreeVisitor) visitorFacade.getDALVisitorClass().getConstructor().newInstance();
case RL:
return (ParseTreeVisitor) visitorFacade.getRLVisitorClass().getConstructor().newInstance();
default:
throw new SQLParsingException("Can not support SQL statement type: `%s`", type);
}
}
}
SQLParserExecutor和ParseTreeVisitorFactory已经开始使用了antlr的类,这里我们先跳过,在后面的SQL解析篇章中再详细分析。
回到DataNodeRouter类createRouteContext方法,在获得SQL对应的AST(SQLStatement)对象后,通过SQLStatementContextFactory将SQLStatement转换为SQLStatementContext对象。
SQLStatementContext根据不同的SQL类型,有多种实现类,以Select类型为例org.apache.shardingsphere.sql.parser.binder.statement.dml.SelectStatementContext,其类结构如下:
可以看到其包含了Select SQL中包含的table、project、groupby、orderby、分页等上下文信息。
路由route
到此并未看到分库分表或者主从时真正的路由逻辑,其实这些操作都放到了这些RouteDecorator,看下RouterDecorator接口的实现类。
我们看下分库分表功能对应的路由修饰器类ShardingRouteDecorator类。
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator
public final class ShardingRouteDecorator implements RouteDecorator<ShardingRule> {
@Override
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
List<Object> parameters = routeContext.getParameters();
// 对SQL进行验证,主要用于判断一些不支持的SQL,在分片功能中不支持INSERT INTO .... ON DUPLICATE KEY、不支持更新sharding key
ShardingStatementValidatorFactory.newInstance(
sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
// 获取SQL的条件信息
ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
// 检查所有Sharding值(表、列、值)是不是相同,如果不相同则抛出异常
checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
//剔除重复的sharding条件信息
mergeShardingConditions(shardingConditions);
}
// 创建分片路由引擎
ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
// 进行路由,生成路由结果
RouteResult routeResult = shardingRouteEngine.route(shardingRule);
if (needMergeShardingValues) {
Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
}
return new RouteContext(sqlStatementContext, parameters, routeResult);
}
…
}
其中最关键的以下代码:
ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
RouteResult routeResult = shardingRouteEngine.route(shardingRule);
…
return new RouteContext(sqlStatementContext, parameters, routeResult);
通过ShardingRouteEngineFactory创建了Sharding路由引擎,调用route方法,然后根据生成的RouteResult,创建新的RouteContext进行了返回。
ShardingRouteEngineFactory是ShardingRouteEngine的工厂类,会根据SQL类型创建不同ShardingRouteEngine,因为不同的类型的SQL对应着的不同的路由策略,例如全库路由、全库表路由、单库路由、标准路由等。
org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngineFactory
/**
* Sharding routing engine factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingRouteEngineFactory {
/**
* Create new instance of routing engine.
*
* @param shardingRule sharding rule
* @param metaData meta data of ShardingSphere
* @param sqlStatementContext SQL statement context
* @param shardingConditions shardingConditions
* @param properties sharding sphere properties
* @return new instance of routing engine
*/
public static ShardingRouteEngine newInstance(final ShardingRule shardingRule,
final ShardingSphereMetaData metaData, final SQLStatementContext sqlStatementContext,
final ShardingConditions shardingConditions, final ConfigurationProperties properties) {
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
if (sqlStatement instanceof TCLStatement) {
return new ShardingDatabaseBroadcastRoutingEngine();
}
if (sqlStatement instanceof DDLStatement) {
return new ShardingTableBroadcastRoutingEngine(metaData.getSchema(), sqlStatementContext);
}
if (sqlStatement instanceof DALStatement) {
return getDALRoutingEngine(shardingRule, sqlStatement, tableNames);
}
if (sqlStatement instanceof DCLStatement) {
return getDCLRoutingEngine(sqlStatementContext, metaData);
}
if (shardingRule.isAllInDefaultDataSource(tableNames)) {
return new ShardingDefaultDatabaseRoutingEngine(tableNames);
}
if (shardingRule.isAllBroadcastTables(tableNames)) {
return sqlStatement instanceof SelectStatement ? new ShardingUnicastRoutingEngine(tableNames) : new ShardingDatabaseBroadcastRoutingEngine();
}
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && tableNames.isEmpty() && shardingRule.hasDefaultDataSourceName()) {
return new ShardingDefaultDatabaseRoutingEngine(tableNames);
}
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && shardingConditions.isAlwaysFalse() || tableNames.isEmpty() || !shardingRule.tableRuleExists(tableNames)) {
return new ShardingUnicastRoutingEngine(tableNames);
}
return getShardingRoutingEngine(shardingRule, sqlStatementContext, shardingConditions, tableNames, properties);
}
看下最常见的标准SQL的路由引擎
org.apache.shardingsphere.sharding.route.engine.type.standard.ShardingStandardRoutingEngine,这类中方法较多,从route看,分别通过调用getDataNodes->routeByShardingConditions->route0->generateRouteResult进行返回。
/**
* Sharding standard routing engine.
*/
@RequiredArgsConstructor
public final class ShardingStandardRoutingEngine implements ShardingRouteEngine {
…
public RouteResult route(final ShardingRule shardingRule) {
if (isDMLForModify(sqlStatementContext) && 1 != ((TableAvailable) sqlStatementContext).getAllTables().size()) {// 判断SQL中涉及的表,insert、update、delete不支持多张表
throw new ShardingSphereException("Cannot support Multiple-Table for '%s'.", sqlStatementContext.getSqlStatement());
}
return generateRouteResult(getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName)));
}
private boolean isDMLForModify(final SQLStatementContext sqlStatementContext) {
return sqlStatementContext instanceof InsertStatementContext || sqlStatementContext instanceof UpdateStatementContext || sqlStatementContext instanceof DeleteStatementContext;
}
// 根据数据节点生成路由结果
private RouteResult generateRouteResult(final Collection<DataNode> routedDataNodes) {
RouteResult result = new RouteResult();
result.getOriginalDataNodes().addAll(originalDataNodes);
for (DataNode each : routedDataNodes) {
result.getRouteUnits().add(
new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singletonList(new RouteMapper(logicTableName, each.getTableName()))));
}
return result;
}
// 计算数据节点,计算路由的核心方法
private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
if (isRoutingByHint(shardingRule, tableRule)) {// 库表路由都是hint方式,通过hint路由
return routeByHint(shardingRule, tableRule);
}
if (isRoutingByShardingConditions(shardingRule, tableRule)) {// 库表路由都不是通过hint方式,则通过sharding条件进行路由
return routeByShardingConditions(shardingRule, tableRule);
}
return routeByMixedConditions(shardingRule, tableRule);// 库表路由中既有hint,又有sharding条件,则进行混合路由
}
…
}
在以上调用链中,关键逻辑在route0方法中调用了routeDataSources和routeTables方法,这两方法实现如下:
// 执行配置的库路由计算方法,得到路由到数据库标识
private Collection<String> routeDataSources(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
if (databaseShardingValues.isEmpty()) {
return tableRule.getActualDatasourceNames();
}
Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties));
Preconditions.checkState(!result.isEmpty(), "no database route info");
Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result),
"Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
return result;
}
// 执行配置的表路由计算方法,得到实际表名
private Collection<DataNode> routeTables(final ShardingRule shardingRule, final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
: shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues, this.properties));
Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
Collection<DataNode> result = new LinkedList<>();
for (String each : routedTables) {
result.add(new DataNode(routedDataSource, each));
}
return result;
}
这两个方法中我们可以看到
shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties)与
shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues, this.properties);
shardingRule.getDatabaseShardingStrategy返回的就是ShardingStrategy接口,根据配置方式的不同,有多种实现类。
我们看下常用的org.apache.shardingsphere.core.strategy.route.standard.StandardShardingStrategy类
@Override
public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {
RouteValue shardingValue = shardingValues.iterator().next();
Collection<String> shardingResult = shardingValue instanceof ListRouteValue
? doSharding(availableTargetNames, (ListRouteValue) shardingValue) : doSharding(availableTargetNames, (RangeRouteValue) shardingValue);
Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
result.addAll(shardingResult);
return result;
}
@SuppressWarnings("unchecked")
private Collection<String> doSharding(final Collection<String> availableTargetNames, final RangeRouteValue<?> shardingValue) {
if (null == rangeShardingAlgorithm) {
throw new UnsupportedOperationException("Cannot find range sharding strategy in sharding rule.");
}
return rangeShardingAlgorithm.doSharding(availableTargetNames,
new RangeShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), shardingValue.getValueRange()));
}
@SuppressWarnings("unchecked")
private Collection<String> doSharding(final Collection<String> availableTargetNames, final ListRouteValue<?> shardingValue) {
Collection<String> result = new LinkedList<>();
for (Comparable<?> each : shardingValue.getValues()) {
String target = preciseShardingAlgorithm.doSharding(availableTargetNames, new PreciseShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), each));
if (null != target) {
result.add(target);
}
}
return result;
}
在第一个doSharding方法中可以看到,会根据SQL中实际的ShardingValue分别调用PreciseShardingAlgorithm和RangeShardingAlgorithm的doSharding,而这个两个算法类则是需要应用根据应用自定义实现的接口。
/**
* Precise sharding algorithm.
*
* @param <T> class type of sharding value
*/
public interface PreciseShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
/**
* Sharding.
*
* @param availableTargetNames available data sources or tables's names
* @param shardingValue sharding value
* @return sharding result for data source or table's name
*/
String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<T> shardingValue);
}
/**
* Range sharding algorithm.
*
* @param <T> class type of sharding value
*/
public interface RangeShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
/**
* Sharding.
*
* @param availableTargetNames available data sources or tables's names
* @param shardingValue sharding value
* @return sharding results for data sources or tables's names
*/
Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<T> shardingValue);
}
改写rewrite
看完路由部分,回到BasePrepareEngine#prepare方法中看看重写rewrite部分:
public ExecutionContext prepare(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
RouteContext routeContext = executeRoute(sql, clonedParameters);// SQL路由
ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));// SQL改写
if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
}
return result;
}
…
private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
registerRewriteDecorator();
SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
}
在executeRewrite方法中,rewriter是个SQLRewriteEntry对象,我们看下该类createSQLRewriteContext方法:
org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry
public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {
SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);
decorate(decorators, result, routeContext);
result.generateSQLTokens();
return result;
}
@SuppressWarnings("unchecked")
private void decorate(final Map<BaseRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
for (Entry<BaseRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {
BaseRule rule = entry.getKey();
SQLRewriteContextDecorator decorator = entry.getValue();
if (decorator instanceof RouteContextAware) {
((RouteContextAware) decorator).setRouteContext(routeContext);
}
decorator.decorate(rule, properties, sqlRewriteContext);
}
}
该方法有三个操作:
- 创建一个初始的SQL重写上下文SQLRewriteContext对象;
- 依次调用注册的SQLRewriteContextDecorator实现类的decorate方法,根据配置指定的功能以及SQL的类型添加相应的SQLToken生成器。
- 调用SQLRewriteContext的generateSQLTokens方法,生成此SQL对应的Token,然后返回SQLRewriteContext对象。
首先我们看下SQLRewriteContext类:
public SQLRewriteContext(final SchemaMetaData schemaMetaData, final SQLStatementContext sqlStatementContext, final String sql, final List<Object> parameters) {
this.schemaMetaData = schemaMetaData;
this.sqlStatementContext = sqlStatementContext;
this.sql = sql;
this.parameters = parameters;
addSQLTokenGenerators(new DefaultTokenGeneratorBuilder().getSQLTokenGenerators());
parameterBuilder = sqlStatementContext instanceof InsertStatementContext
? new GroupedParameterBuilder(((InsertStatementContext) sqlStatementContext).getGroupedParameters()) : new StandardParameterBuilder(parameters);
}
/**
* Add SQL token generators.
*
* @param sqlTokenGenerators SQL token generators
*/
public void addSQLTokenGenerators(final Collection<SQLTokenGenerator> sqlTokenGenerators) {
this.sqlTokenGenerators.addAll(sqlTokenGenerators);
}
/**
* Generate SQL tokens.
*/
public void generateSQLTokens() {
sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));
}
addSQLTokenGenerators方法负责添加Token生成器,generateSQLTokens方法根据添加的Token生成器生成Token,在SQLRewriteEntry类的decorate方法中,会根据注册的SQLRewriteContextDecorator,对于RouteContext进行处理,目前项目中已实现的SQLRewriteContextDecorate如图下所示:
分别对应数据分片SQL重写装饰器、影子表SQL重写装饰器、加密SQL重写装饰器。
我们看下最常用的数据分片SQL重写装饰器:
org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator
/**
* SQL rewrite context decorator for sharding.
*/
@Setter
public final class ShardingSQLRewriteContextDecorator implements SQLRewriteContextDecorator<ShardingRule>, RouteContextAware {
private RouteContext routeContext;
@SuppressWarnings("unchecked")
@Override
public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {
// 获取参数重写器(参数化SQL才需要),然后依次对SQL重写上下文中的参数构造器parameterBuilder进行重写操作,分片功能下主要是自增键以及分页参数
for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {
if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {
each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
}
}
//添加分片功能下对应的Token生成器
sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());
}…
}
在decorate方法中,首先通过ShardingParameterRewriterBuilder类生成了对应的ParameterRewriter对象集合,然后依次调用其rewrite方法,对sqlRewriteContext 的parameterBuilder进行相应的设置,在ShardingParameterRewriterBuilder中对应的ParameterRewriter包括ShardingGeneratedKeyInsertValueParameterRewriter与ShardingPaginationParameterRewriter,分别对应insert分布式自增参数重写和分页参数重写。具体代码如下:
org.apache.shardingsphere.sharding.rewrite.parameter.ShardingParameterRewriterBuilder
public Collection<ParameterRewriter> getParameterRewriters(final SchemaMetaData schemaMetaData) {
Collection<ParameterRewriter> result = getParameterRewriters();
for (ParameterRewriter each : result) {
setUpParameterRewriters(each, schemaMetaData);
}
return result;
}
private static Collection<ParameterRewriter> getParameterRewriters() {
Collection<ParameterRewriter> result = new LinkedList<>();
result.add(new ShardingGeneratedKeyInsertValueParameterRewriter());
result.add(new ShardingPaginationParameterRewriter());
return result;
}
ShardingSQLRewriteContextDecorator的decorate方法中之后添加了ShardingTokenGenerateBuilder. getSQLTokenGenerators()返回的一系列TokenGenerator,接下来我们看下此类:
org.apache.shardingsphere.sharding.rewrite.token.pojo.ShardingTokenGenerateBuilder
/**
* SQL token generator builder for sharding.
*/
@RequiredArgsConstructor
public final class ShardingTokenGenerateBuilder implements SQLTokenGeneratorBuilder {
private final ShardingRule shardingRule;
private final RouteContext routeContext;
@Override
public Collection<SQLTokenGenerator> getSQLTokenGenerators() {
Collection<SQLTokenGenerator> result = buildSQLTokenGenerators();
for (SQLTokenGenerator each : result) {
if (each instanceof ShardingRuleAware) {
((ShardingRuleAware) each).setShardingRule(shardingRule);
}
if (each instanceof RouteContextAware) {
((RouteContextAware) each).setRouteContext(routeContext);
}
}
return result;
}
private Collection<SQLTokenGenerator> buildSQLTokenGenerators() {
Collection<SQLTokenGenerator> result = new LinkedList<>();
addSQLTokenGenerator(result, new TableTokenGenerator());// 表名token处理,用于真实表名替换
addSQLTokenGenerator(result, new DistinctProjectionPrefixTokenGenerator());// select distinct关键字处理
addSQLTokenGenerator(result, new ProjectionsTokenGenerator());// select列名处理,主要是衍生列avg处理
addSQLTokenGenerator(result, new OrderByTokenGenerator());// Order by Token处理
addSQLTokenGenerator(result, new AggregationDistinctTokenGenerator());// 聚合函数的distinct关键字处理
addSQLTokenGenerator(result, new IndexTokenGenerator());// 索引重命名
addSQLTokenGenerator(result, new OffsetTokenGenerator());// offset 重写
addSQLTokenGenerator(result, new RowCountTokenGenerator());// rowCount重写
addSQLTokenGenerator(result, new GeneratedKeyInsertColumnTokenGenerator());// 分布式主键列添加,在insert sql列最后添加
addSQLTokenGenerator(result, new GeneratedKeyForUseDefaultInsertColumnsTokenGenerator());// insert SQL使用默认列名时需要完成补齐真实列名,包括自增列
addSQLTokenGenerator(result, new GeneratedKeyAssignmentTokenGenerator());// SET自增键生成
addSQLTokenGenerator(result, new ShardingInsertValuesTokenGenerator());// insert SQL 的values Token解析,为后续添加自增值做准备
addSQLTokenGenerator(result, new GeneratedKeyInsertValuesTokenGenerator());//为insert values添加自增列值
return result;
}
private void addSQLTokenGenerator(final Collection<SQLTokenGenerator> sqlTokenGenerators, final SQLTokenGenerator toBeAddedSQLTokenGenerator) {
if (toBeAddedSQLTokenGenerator instanceof IgnoreForSingleRoute && routeContext.getRouteResult().isSingleRouting()) {
return;
}
sqlTokenGenerators.add(toBeAddedSQLTokenGenerator);
}
}
ShardingTokenGenerateBuilder(名称可能拼错了,应为ShardingTokenGeneratorBuilder)为ShardingTokenGenerator的建造者类,可以看到其添加了TableTokenGenerator、DistinctProjectionPrefixTokenGenerator、ProjectionsTokenGenerator等很多TokenGenerator,这些Generator会生成对应的Token集合。
回到BasePrepareEngine中executeRewrite方法,在创建完SQLRewriteContext对象后调用rewrite,其中调用的是SQLRouteRewriteEngine#rewrite方法:
private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
registerRewriteDecorator();
SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
}
private Collection<ExecutionUnit> rewrite(final RouteContext routeContext, final SQLRewriteContext sqlRewriteContext) {
Collection<ExecutionUnit> result = new LinkedHashSet<>();
for (Entry<RouteUnit, SQLRewriteResult> entry : new SQLRouteRewriteEngine().rewrite(sqlRewriteContext, routeContext.getRouteResult()).entrySet()) {
result.add(new ExecutionUnit(entry.getKey().getDataSourceMapper().getActualName(), new SQLUnit(entry.getValue().getSql(), entry.getValue().getParameters())));
}
return result;
}
接下来我们看下SQL路由改写引擎org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine类
public final class SQLRouteRewriteEngine {
/**
* Rewrite SQL and parameters.
*
* @param sqlRewriteContext SQL rewrite context
* @param routeResult route result
* @return SQL map of route unit and rewrite result
*/
public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) {
Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1);
for (RouteUnit each : routeResult.getRouteUnits()) {
result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each)));
}
return result;
}
…
}
在创建SQLRewriteResult时,调用了RouteSQLBuilder的toSQL方法,此类为SQLBuilder接口实现类,负责进行SQL的改写SQL的生成。
查看其toSQL方法,可以看到其调用SQLRewriteContext的getSqlTokens方法,获得其上绑定的所有SQL Token,然后调用其SQLToken的toString(),最后拼接出重写后的SQL。
public abstract class AbstractSQLBuilder implements SQLBuilder {
private final SQLRewriteContext context;
@Override
public final String toSQL() {
if (context.getSqlTokens().isEmpty()) {
return context.getSql();
}
Collections.sort(context.getSqlTokens());// 按照Token的起始位置排序
StringBuilder result = new StringBuilder();
result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex()));// 添加第一个Token之前的原始SQL
for (SQLToken each : context.getSqlTokens()) {
result.append(getSQLTokenText(each));// 添加Token对应的SQL片段
result.append(getConjunctionText(each));// 添加Token之间的连接字符
}
return result.toString();
}
…
}
org.apache.shardingsphere.underlying.rewrite.sql.impl.RouteSQLBuilder
protected String getSQLTokenText(final SQLToken sqlToken) {
if (sqlToken instanceof RouteUnitAware) {
return ((RouteUnitAware) sqlToken).toString(routeUnit);
}
return sqlToken.toString();
}
回到BasePrepareEngine,完成SQL的重写后,就可以构建SQLUnit,进而构建出ExecutionUnit,然后添加到ExecutionContext,然后返回,完成PrepareEngine的prepare操作:
private Collection<ExecutionUnit> rewrite(final RouteContext routeContext, final SQLRewriteContext sqlRewriteContext) {
Collection<ExecutionUnit> result = new LinkedHashSet<>();
for (Entry<RouteUnit, SQLRewriteResult> entry : new SQLRouteRewriteEngine().rewrite(sqlRewriteContext, routeContext.getRouteResult()).entrySet()) {
result.add(new ExecutionUnit(entry.getKey().getDataSourceMapper().getActualName(), new SQLUnit(entry.getValue().getSql(), entry.getValue().getParameters())));
}
return result;
}
prepare方法中,生成executionContext后,将分布式自增key进行了添加。generatedKeyContext的生成的自增值是由ShardingRouteDecorator类中getShardingConditions方法中添加的InsertClauseShardingConditionEngine负责生成。
private void prepare() {
executionContext = prepareEngine.prepare(sql, getParameters());
findGeneratedKey().ifPresent(generatedKey -> generatedValues.add(generatedKey.getGeneratedValues().getLast()));
}
private Optional<GeneratedKeyContext> findGeneratedKey() {
return executionContext.getSqlStatementContext() instanceof InsertStatementContext
? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext() : Optional.empty();
}
执行引擎executor
回到ShardingPrepareStatement的executeQuery()方法,在前面执行完prepare,下面就是SQL执行以及结合集合并,SQL的执行主要是由PreparedStatementExecutor负责。
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
prepare();
initPreparedStatementExecutor();
MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
接下来看下initPreparedStatementExecutor()方法,首先进行了preparedStatementExecutor的初始化,然后设置了Statement的参数,之后对statement的一些方法进行回放设置。分别看下这几个方法。
回放是JDBC接入端的一个设计,WrapperAdapter类中有两个方法recordMethodInvocation和replayMethodsInvocation,前者记录调用的方法名称,例如setAutoCommit、setReadOnly、setFetchSize、setMaxFieldSize等,在外围程序调用ShardingConnection的setAutoCommit、setReadOnly以及ShardingPreparedStatement的setFetchSize、setMaxFieldSize时进行调用,后者则进行对这些方法的真实调用,分别在底层真实JDBC类(DB driver、数据库连接池等)时进行重新调用。
private void initPreparedStatementExecutor() throws SQLException {
preparedStatementExecutor.init(executionContext);
setParametersForStatements();
replayMethodForStatements();
}
obtainExecuteGroups方法对执行单元进行分组,真正调用的是SQLExecutePrepareTemplate. getExecuteUnitGroups方法,
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor
/**
* Initialize executor.
*
* @param executionContext execution context
* @throws SQLException SQL exception
*/
public void init(final ExecutionContext executionContext) throws SQLException {
setSqlStatementContext(executionContext.getSqlStatementContext());
getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));
cacheStatements();
}
…
private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {
return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {
@Override
// 在指定数据源上创建要求数量的数据库连接
public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
}
@Override
//根据执行单元信息 创建Statement执行单元对象
public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
}
});
}…
protected final void cacheStatements() {
for (InputGroup<StatementExecuteUnit> each : inputGroups) {
statements.addAll(each.getInputs().stream().map(StatementExecuteUnit::getStatement).collect(Collectors.toList()));
parameterSets.addAll(each.getInputs().stream().map(input -> input.getExecutionUnit().getSqlUnit().getParameters()).collect(Collectors.toList()));
}
}
getExecuteUnitGroups方法首先按照数据库进行分组,然后根据SQL单元的数量与maxConnectionsSizePerQuery计算得出采用连接限制类型还是内存限制类型,最后生成SQL执行单元的最后分组。
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate
/**
* Get execute unit groups.
*
* @param executionUnits execution units
* @param callback SQL execute prepare callback
* @return statement execute unit groups
* @throws SQLException SQL exception
*/
public Collection<InputGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
return getSynchronizedExecuteUnitGroups(executionUnits, callback);
}
// 生成同步执行单元分组
private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);// 生成数据源与其SQLUnit的对应映射
Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));// 将SQLUnit转化为InputGroup<StatementExecuteUnit>,对应关系为1:1
}
return result;
}
…
// 生成SQL执行分组
private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); // 根据要执行的SQL数量和maxConnectionsSizePerQuery配置,计算
int count = 0;
for (List<SQLUnit> each : sqlUnitPartitions) {
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));// 根据要执行的SQLUnit,生成对应StatementExecuteUnit对象,添加到返回结果集中
}
return result;
}
返回ShardingPreparedStament的executeQuery(),接下来执行的语句
MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
看下PreparedStatementExecutor的executeQuery方法,可以看到该方法就是通过callback方式,最后通过SqlExecuteTemplate.execute
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor
/**
* Execute query.
*
* @return result set list
* @throws SQLException SQL exception
*/
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
// 在指定的Statement上执行SQL,将JDBC结果集包装成查询QueryResult对象(基于流模式、基于内存模式两类)
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(statement, connectionMode);
}
};
return executeCallback(executeCallback);// 通过executeCallback操作
}
// 执行SQL,然后将结果集转成QueryResult对象
private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
PreparedStatement preparedStatement = (PreparedStatement) statement;
ResultSet resultSet = preparedStatement.executeQuery();
getResultSets().add(resultSet);
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}
org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
List<T> result = sqlExecuteTemplate.execute((Collection) inputGroups, executeCallback);
refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
return result;
}
public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups, final SQLExecuteCallback<T> callback) throws SQLException {
return execute(inputGroups, null, callback);
}
// org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteTemplate
@SuppressWarnings("unchecked")
public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups,
final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
try {
return executorEngine.execute((Collection) inputGroups, firstCallback, callback, serial);
} catch (final SQLException ex) {
ExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
}
}
可以看到在SQLExecuteTemplate中,真正调用的是executorEngine.execute方法。
org.apache.shardingsphere.underlying.executor.engine.ExecutorEngine
/**
* Execute.
*
* @param inputGroups input groups
* @param firstCallback first grouped callback
* @param callback other grouped callback
* @param serial whether using multi thread execute or not
* @param <I> type of input value
* @param <O> type of return value
* @return execute result
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> execute(final Collection<InputGroup<I>> inputGroups,
final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback, final boolean serial) throws SQLException {
if (inputGroups.isEmpty()) {
return Collections.emptyList();
}
return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
}
// 串行执行
private <I, O> List<O> serialExecute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback) throws SQLException {
Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
InputGroup<I> firstInputs = inputGroupsIterator.next();
List<O> result = new LinkedList<>(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback));
for (InputGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
result.addAll(syncExecute(each, callback));
}
return result;
}
// 并行执行,可以支持两个回调函数,第一条记录执行第一个回调函数,其它的执行第二个回调函数
private <I, O> List<O> parallelExecute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback) throws SQLException {
Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
InputGroup<I> firstInputs = inputGroupsIterator.next();
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(Lists.newArrayList(inputGroupsIterator), callback);
return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}
…
}
其中分为串行执行和并行执行两种方式,同步就在当前线程中执行callback逻辑,异步就在线程池中执行。而callback即为PreparedStatementExecutor类中executeQuery()所定义。
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(statement, connectionMode);
}
};
return executeCallback(executeCallback);
}
归并Merge
回到ShardingPreparedStatement类中mergeQuery方法,主要逻辑即为创建MergeEngine对象,执行merge方法。
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
…
MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
…
}
…
}
private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
}
可以看到mergeQuery方法中新建了MergeEngine实例,然后调用了其merge方法。展开看下MergeEngine中merge方法:
org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine
public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
registerMergeDecorator();// 注册归并装饰器
return merger.process(queryResults, sqlStatementContext);
}
private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultMergerEngine) {
ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
}
}
return Optional.empty();
}
private void registerMergeDecorator() {
for (Class<? extends ResultProcessEngine> each : OrderedRegistry.getRegisteredClasses(ResultProcessEngine.class)) {
ResultProcessEngine processEngine = createProcessEngine(each);
Class<?> ruleClass = (Class<?>) processEngine.getType();
rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
.forEach(rule -> merger.registerProcessEngine(rule, processEngine));
}
}
在注册的ResultProcessEngine实现类后,则直接调用merger.process方法即
org.apache.shardingsphere.underlying.merge.MergeEntry#process
/**
* Merge entry.
*/
public final class MergeEntry {
…
/**
* Process query results.
*
* @param queryResults query results
* @param sqlStatementContext SQL statement context
* @return merged result
* @throws SQLException SQL exception
*/
public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);
Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
}
@SuppressWarnings("unchecked")
private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultMergerEngine) {
ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
}
}
return Optional.empty();
}
@SuppressWarnings("unchecked")
private MergedResult decorate(final MergedResult mergedResult, final SQLStatementContext sqlStatementContext) throws SQLException {
MergedResult result = null;
for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultDecoratorEngine) {
ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
result = null == result ? resultDecorator.decorate(mergedResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
}
}
return null == result ? mergedResult : result;
}
@SuppressWarnings("unchecked")
private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
MergedResult result = null;
for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultDecoratorEngine) {
ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
}
}
return Optional.ofNullable(result);
}
}
在MergeEntry#process方法中,先调用了本地私有方法merge,这个方法中顺序执行ResultMergerEngine实现类的newInstance方法创建ResultMerger实例,然后执行其merge方法。
ResultMergerEngine接口实现类目前只有一个ShardingResultMergerEngine,看下该类的newInstance方法代码:
org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine
public final class ShardingResultMergerEngine implements ResultMergerEngine<ShardingRule> {
@Override
public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {
if (sqlStatementContext instanceof SelectStatementContext) {
return new ShardingDQLResultMerger(databaseType);
}
if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
return new ShardingDALResultMerger(shardingRule);
}
return new TransparentResultMerger();
}
可以看到newInstance方法中会根据当前SQL的类型创建对应的ResultMerger实例,以最常用的数据分片结果归并器ShardingDQLResultMerger为例,看下其处理逻辑
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger
/**
* DQL result merger for Sharding.
*/
public final class ShardingDQLResultMerger implements ResultMerger {
private final DatabaseType databaseType;
public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
if (1 == queryResults.size()) {
return new IteratorStreamMergedResult(queryResults);
}
Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
selectStatementContext.setIndexes(columnLabelIndexMap);
MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
return decorate(queryResults, selectStatementContext, mergedResult);
}
…
private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
if (isNeedProcessGroupBy(selectStatementContext)) {
return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
}
if (isNeedProcessDistinctRow(selectStatementContext)) {
setGroupByForDistinctRow(selectStatementContext);
return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
}
if (isNeedProcessOrderBy(selectStatementContext)) {
return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
}
return new IteratorStreamMergedResult(queryResults);
}
…
private MergedResult getGroupByMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
return selectStatementContext.isSameGroupByAndOrderByItems()
? new GroupByStreamMergedResult(columnLabelIndexMap, queryResults, selectStatementContext, schemaMetaData)
: new GroupByMemoryMergedResult(queryResults, selectStatementContext, schemaMetaData);
}
...
}
在merge方法中会根据判断SQL中是否包含group by、distinct row和order by,然后构建GroupByStreamMergedResult或者GroupByMemoryMergedResult、OrderByStreamMergedResult,这些MergedResult对象实现了MergedResult接口,支持next和getValue等方法,在ShardingPrepareStatement的executeQuery()方法中,最后使用此MergedResult构建了ShardingResultSet返回。
org.apache.shardingsphere.underlying.merge.result.MergedResult
/**
* Merged result after merge engine.
*/
public interface MergedResult {
/**
* Iterate next data.
*
* @return has next data
* @throws SQLException SQL Exception
*/
boolean next() throws SQLException;
/**
* Get data value.
*
* @param columnIndex column index
* @param type class type of data value
* @return data value
* @throws SQLException SQL Exception
*/
Object getValue(int columnIndex, Class<?> type) throws SQLException;
…
}
JDBC
org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet
/**
* Result that support sharding.
*/
public final class ShardingResultSet extends AbstractResultSetAdapter {
private final MergedResult mergeResultSet;
private final Map<String, Integer> columnLabelAndIndexMap;
…
@Override
public boolean next() throws SQLException {
return mergeResultSet.next();
}
…
}
ShardingResultSet实现了JDBC的ResultSet接口,在应用拿到ResultSet时,调用其next()方法时,实际调用的是这些MergedResult实现类的next()方法。ShardingResultSet的类层次关系如下:
此粗略的将sharding-jdbc的处理流程浏览了一遍,虽然并没有深入各类的细节,但可以比较清晰的看到各个内核引擎的职责与边界,按照SQL处理的流向以及各引擎的输入输出,可以画一个整体的内核引擎处理流程图如下:
设计模式
在整个代码浏览中,我们可以看到ShardingSphere使用率很高的几个设计模式,熟悉这几个设计模式,可以更快的理解作者的设计意图,也是ShardingSphere主要的扩展点。
1. 工厂模式
在ShardingSphere中工厂类主要为简单工厂模式,覆盖了大部分模块,主要职责就是根据配置类型、数据库类型、SQL类型等创建对应的接口实现类。工厂模式将创建各种用途的接口类细节进行了封装,从而将处理流程更加通用。例如在使用sharding-jdbc时, ShardingSphere提供了不同功能对应的DataSource工厂类,根据提供的配置与连接池创建对应的DataSource实现类。
分库分表DataSource工厂类
/**
* Sharding data source factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingDataSourceFactory {
/**
* Create sharding data source.
*
* @param dataSourceMap data source map
* @param shardingRuleConfig rule configuration for databases and tables sharding
* @param props properties for data source
* @return sharding data source
* @throws SQLException SQL exception
*/
public static DataSource createDataSource(
final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException {
return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}
}
读写分离DataSource工厂类
/**
* Master-slave data source factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MasterSlaveDataSourceFactory {
/**
* Create master-slave data source.
*
* @param dataSourceMap data source map
* @param masterSlaveRuleConfig master-slave rule configuration
* @param props props
* @return master-slave data source
* @throws SQLException SQL exception
*/
public static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException {
return new MasterSlaveDataSource(dataSourceMap, new MasterSlaveRule(masterSlaveRuleConfig), props);
}
}
2. 建造者模式
与工厂类类似,建造者Builder模式也是创建型设计模式,常见的builder设计多为fluent风格,提供类似set、add等方法设置或者添加属性,最后通过build方法完成实例的创建,主要解决的需要设置多个关联属性,创建实例时需要统一校验的类创建。
不过ShardingSphere中的建造者类并没有这么设计,它的作用是一些*Builder接口的实现,其中方法返回需要创建的类对象,与上面工厂类区别是,其创建的往往是一些确定的类实例。
默认SQL构建器类
/**
* Default SQL builder.
*/
public final class DefaultSQLBuilder extends AbstractSQLBuilder {
public DefaultSQLBuilder(final SQLRewriteContext context) {
super(context);
}
@Override
protected String getSQLTokenText(final SQLToken sqlToken) {
return sqlToken.toString();// 返回Token对应的文本字符
}
}
3. Callback回调模式
执行引擎中的大量使用该设计模式,与Spring的JDBCTemplate、TransactionTemplate类似,ShardingSphere中的SQLExecuteTemplate、ExecutorEngine也是如此设计,引擎使用者提供CallBack实现类,使用该模式是因为在SQL执行时,需要支持更多类型的SQL,不同的SQL如DQL、DML、DDL、不带参数的SQL、参数化SQL等,不同的SQL操作逻辑并不一样,但执行引擎需要提供一个通用的执行策略。
public final class SQLExecuteTemplate {
…
public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups,
final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
try {
return executorEngine.execute((Collection) inputGroups, firstCallback, callback, serial);
} catch (final SQLException ex) {
ExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
}
}
public final class ExecutorEngine implements AutoCloseable {
…
public <I, O> List<O> execute(final Collection<InputGroup<I>> inputGroups,
final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback, final boolean serial) throws SQLException {
if (inputGroups.isEmpty()) {
return Collections.emptyList();
}
return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
}
…
}
注意虽然这些类叫Template,但与经典设计模式中的模板模式并不一样。*
4. 装饰器模式
装饰器decorate模式在路由引擎、改写引擎、归并引擎中进行了使用,也是5.x版本中实现pluggable的主要的设计模式,在4.1.1中也已经可以看到数据分片、读写分离、加密、影子表等都做为一个功能实现以装饰器方式进行了重构。装饰器模式可以实现功能组合,例如既要使用数据分片,又要使用加密和影子表功能,这些功能够可以看做对改写、路由的增强。
在DataNodeRouter类中
private RouteContext executeRoute(final String sql, final List<Object> parameters, final boolean useCache) {
RouteContext result = createRouteContext(sql, parameters, useCache);
for (Entry<BaseRule, RouteDecorator> entry : decorators.entrySet()) {
result = entry.getValue().decorate(result, metaData, entry.getKey(), properties);
}
return result;
}
在SQLRewriteEntry类中
private void decorate(final Map<BaseRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
for (Entry<BaseRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {
BaseRule rule = entry.getKey();
SQLRewriteContextDecorator decorator = entry.getValue();
if (decorator instanceof RouteContextAware) {
((RouteContextAware) decorator).setRouteContext(routeContext);
}
decorator.decorate(rule, properties, sqlRewriteContext);
}
}
在MergeEntry类中
private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
MergedResult result = null;
for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultDecoratorEngine) {
ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
}
}
return Optional.ofNullable(result);
}
5. 访问者模式
ShardingSphere在SQL解析中使用了大量的访问者模式,这种模式我们在一般项目中并不经常使用,不过在编译技术领域却几乎是标配,antlr、druid都使用了该种模式,开发者如果需要进行功能开发都会通过这种模式实现对AST的访问。antlr4基于g4文件生成java类时除了词法Lexer、语法Parser类也生成一个访问者基类,ShardingSphere各种数据库对应的Vistor都是继承自此类,在实现的子类中拿到SQL中对应的各种要素,进而构建出对应的SQLSelectStatement。
例如org.apache.shardingsphere.sql.parser.mysql.visitor.MySQLVisitor
/**
* MySQL visitor.
*/
@Getter(AccessLevel.PROTECTED)
public abstract class MySQLVisitor extends MySQLStatementBaseVisitor<ASTNode> {
private int currentParameterIndex;
@Override
public final ASTNode visitParameterMarker(final ParameterMarkerContext ctx) {
return new ParameterMarkerValue(currentParameterIndex++);
}
@Override
public final ASTNode visitLiterals(final LiteralsContext ctx) {
if (null != ctx.stringLiterals()) {
return visit(ctx.stringLiterals());
}
if (null != ctx.numberLiterals()) {
return visit(ctx.numberLiterals());
}
if (null != ctx.dateTimeLiterals()) {
return visit(ctx.dateTimeLiterals());
}
if (null != ctx.hexadecimalLiterals()) {
return visit(ctx.hexadecimalLiterals());
}
if (null != ctx.bitValueLiterals()) {
return visit(ctx.bitValueLiterals());
}
if (null != ctx.booleanLiterals()) {
return visit(ctx.booleanLiterals());
}
if (null != ctx.nullValueLiterals()) {
return visit(ctx.nullValueLiterals());
}
throw new IllegalStateException("Literals must have string, number, dateTime, hex, bit, boolean or null.");
}
…
}
除了上述几个设计模式,还用到了解释器模式、单例模式、代理模式、策略模式等,出于篇幅关系这里就不一一介绍。
代码目录结构
我们在前面进行代码分析都是围绕SQL路由、改写、执行、归并这几个核心模块,这些也是ShardingSphere官方文档中所指的内核引擎,在4.1.1版本中代码位于是在sharding-core项目下。不过这里我们自己想一想什么是ShardingSphere的核心,这个问题相信ShardingSphere的主创者们有更深的认识,这里简单的说下我个人的理解、回答这个问题主要还是在于对ShardingSphere的定义,早期只有sharding-jdbc的时候,其实无所谓core,因为整个功能是明确的。但当sharding-proxy项目出来后,自然而然的想到的就是哪些功能、代码是jdbc和proxy可以复用的,除了接入方式不一样(一个是JDBC、一个是MySQL协议),SQL解析、路由、重写、执行都是可以复用的,因此这些就是成了core;再到后来,SQL解析器在经过druid->手写简化版->antlr的演化后,SQL解析现在变成了一个高扩展性、支持多种数据库方言的独立模块,因此将其进行剥离成了sql-parser, 与此类似,Proxy版本中MySQL报文协议的部分,后来增加了PostgreSQL,因此将其独立出来成了db-protocol部分。
在功能层面,前期只是分库分表、读写分离,后来增加了加解密、影子表等功能,这些功能又与原来core的代码柔合在一起,因此在5.x版本中,我们可以看到ShardingSphere作者们对其做了进一步的重构,将原来core的代码修改为infra模块,将路由、重写、合并、执行模块中涉及到具体功能的逻辑进行剥离,在infra中路由、重写、合并与执行就只剩下引擎以及装饰器,保留的就是执行算法与流程,数据分片sharding、读写分离master-slave、加密encrypt、影子表shadow都作为功能feature拆分到了feature项目中,实现自身对应的路由、重写、合并、执行等装饰器接口,这样的调整就使得ShardingSphere面向开发者提供了一个统一的接入、增强、定制的入口,同样ShardingSphere的原核心功能(分录分表、读写分离)也是这种方式集成,第三方开发者也是如此,这种一致的开发方式也更方便用户可以根据自身需要裁减ShardingSphere的各组件。
虽然目前5.x版本还未正式release,不过从github上master分支中代码目录结构中可以看出新的设计架构,feature项目下包含了分片、加密、主从、复制、影子表;infra项目下包含了原内核引擎(路由、重写、执行、归并);增加了控制面control-panel项目,将原来orchestration、metric、opentracing都迁移到了该项目下;将SQL解析与数据库报文协议都独立成为一个项目,这些更方便开发者有选择的ShardingSphere工具集;另外还增加了一个kernel项目,不过目前里面只有一些context类,目前还不知道这个项目的准确定位。
ShardingSphere 5.x目录结构代码风格与质量
ShardingSpherede 经过几个版本的迭代重构,代码已不止于满足功能层面,在编写风格、目录结构、扩展方式等方面等进行了精心的设计。相比于当下很多开源项目,阅读其代码给人的感觉很清爽,面条式代码很少。
项目用了大量lombok的注解,例如@Getter、@Setter、@RequiredArgsConstructor、@Data等,通过这种方式,减少了很多getter、setter、构造函数、hashCode等代码,从而让代码更加简洁。另外4.x版本在对JDK版本升级到8后,将数据集合遍历从for each修改为了stream方式,使用了Optional方式减少了很多判空类似if(xx==null)这样的代码,也使得代码看起来更加优雅易读。
不过部分代码为了简洁,存在级联调用太长、方法参数太多问题,导致可读性比较低,这种情况我觉得还是通过添加部分中间变量更好些,这种情况在sharding-proxy项目中更加常见。
例如org.apache.shardingsphere.shardingproxy.frontend.netty.FrontendChannelInboundHandler#channelRead方法:
public void channelRead(final ChannelHandlerContext context, final Object message) {
… CommandExecutorSelector.getExecutor(databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection(), backendConnection.isSupportHint(),
backendConnection.getTransactionType(), context.channel().id()).execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
}
…
}
org.apache.shardingsphere.shardingproxy.frontend.mysql.command.query.binary.execute.MySQLComStmtExecuteExecutor#createQueryPacket方法
private Collection<DatabasePacket> createQueryPacket(final QueryResponse backendResponse) {
…
for (QueryHeader each : queryHeader) {
result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, each.getSchema(), each.getTable(), each.getTable(),
each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
}
result.add(new MySQLEofPacket(++currentSequenceId));
return result;
}
官方开发规范中对开发理念、编码规范有更详细的介绍https://shardingsphere.apache.org/community/cn/contribute/code-conduct/
4.1.1中代码设计的一些问题
引擎边界不够清晰,调用关系嵌套
在4.1.1版本中是JDBC层调用prepare引擎,prepare引擎(PrepareEngine.preapre方法)调用路由引擎(DataNodeRouter.route方法),路由引擎内部(route方法)又调用了解析引擎(ParseEngine.parse)进行SQL的解析,然后再由prepare引擎(PrepareEngine.preapre方法)中再调用改写引擎(RewriteEngine.rewrite)进行SQL的重写,JDBC层再调用执行引擎运行SQL,最后JDBC层再调用执行引擎完成结果集合并。这样的调用关系其实耦合了各内核引擎,更好的方式应该是在JDBC层或者Proxy层统一负责编排各引擎,分别调用SQL引擎、路由引擎、改写引擎、执行引擎、归并引擎,而不是在路由引擎内嵌套调用SQL解析。
这个问题在目前5.0.0-RC1-SNAPSHOT版本中已进行了重构优化,例如在ShardingSpherePreparedStatement(原ShardingPreparedStatement改名)类中,可以看到SQL解析迁移到了构造函数中,将路由、改写操作统一放到了createExecutionContext()方法中,另外将生成执行输入分组操作也从PreparedStatementExecutor中挪到了JDBC层。
/**
* ShardingSphere prepared statement.
*/
public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {
… private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
schemaContexts = connection.getSchemaContexts();
this.sql = sql;
statements = new ArrayList<>();
parameterSets = new ArrayList<>();
sqlStatement = schemaContexts.getDefaultSchemaContext().getRuntimeContext().getSqlParserEngine().parse(sql, true);// 进行SQL解析
…
}
@Override
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
executionContext = createExecutionContext();// 创建执行上下文,完成SQL的路由与改写
List<QueryResult> queryResults;
if (ExecutorConstant.MANAGED_RESOURCE) {
Collection<InputGroup<StatementExecuteUnit>> inputGroups = getInputGroups();
cacheStatements(inputGroups);
reply();
queryResults = preparedStatementExecutor.executeQuery(inputGroups);// SQL执行
} else {
queryResults = rawExecutor.executeQuery(getRawInputGroups(), new RawSQLExecutorCallback());
}
MergedResult mergedResult = mergeQuery(queryResults);// 结果集归并
result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
…
}
内核引擎命名不统一,使用方式不太一致
解析引擎:
- 通过SQLParserEngineFactory. getSQLParserEngine返回对应的数据库SQL解析器
- 然后调用SQLParserEngine.parse方法解析成SQLStatement。
通过SQLParserExecutor. execute方法返回解析抽象语法树即ParseASTNode,execute方法中通过SQLParserFactory. newInstance创建出SQL解析器即SQLParser实现类,不同数据库有不同实现类。
路由引擎:
- 路由引擎并非直接创建RouteEngine实例,而是先要创建DataNodeRouter类实例,调用其 route方法,返回RouteContext对象,其中包含了路由后的结果。
- route方法中,依次调用各种RouteDecorator实现类,在具体实现类中创建了具体的RouteEngine执行route操作,例如在ShardingRouteDecorator中,通过ShardingRouteEngineFactory.newInstance创建了ShardingRouteEngine来实现真正的路由逻辑。
重写引擎:
- 通过SQLRewriteEntry类createSQLRewriteContext方法返回SQLRewriteContext;
- 然后调用SQLRewriteEngine或者SQLRouteRewriteEngine来完成路由;
- 在RewriteEngine中主要的逻辑其实只是通过SQLBuilder实现类,将SQLRewriteContext中的Token转换成SQL。
执行引擎:
- 创建PreparedStatementExecutor(Proxy方式为JDBCExecuteEngine)实例;
- 之后ExecutePrepareTemplate实例生成执行单元分组InputGroup<StatementExecuteUnit>(在5.x中此功能由ExecuteGroupEngine接口实现类完成);
- 最后创建SQLExecuteTemplate实例,其内部通过ExecutorEngine执行分组后的StatementExecuteUnit。
合并引擎:
- 创建MergeEngine,其内部会创建一个MergeEntry,其merge方法主要就是顺序调用ResultMergerEngine实现类,目前就是ShardingResultMergerEngine ,例如对于Select SQL,会创建ShardingDQLResultMerger,其merge方法会根据SelectStatementContext创建对应的stream/memory类型MergedResult;
- 最后有ResultDecoratorEngine实现类完成对MergedResult的再次处理。
可以看到这些引擎的使用方式各不相同,有的是由Entry类开始,有的是Engine,有的是Engine内部嵌套Entry类,这种不一致给开发者带来更高的心智负荷,这些设计问题在5.x正得到优化。
部分功能还缺乏扩展点
某些功能扩展还不够灵活,例如在事务这块,目前的ShardingSphere的事务类型只支持LOCAL、BASE、XA,并不支持SPI进行新增,之前项目中基于sharding-jdbc新增一个严格单库事务管理器时,只能修改TransactionType类源码。
好的一个开源项目应该是不断演变进步的。ShardingSphere 5.x版本的目前正在接近release,目标是pluggable,相信上述问题会通过更好的设计得到解决。另外ShardingSphere现在定位也不只是一个分库分表中间件,而是一个分布式数据库,未来后端除了关系数据库还会有其它存储方式,所以内核层面可能还会增加查询优化、执行计划、数据统计信息(与数据库类似)等概念,接入端可能还会支持更多的数据接入方式(前端增加REST、GraphQL、Redis等;后端增加Redis、MongoDB、HBase等)。
总览篇到此为止,在接下来的篇章里,会对各内核引擎进行更详细的源码分析。
网友评论