美文网首页
从源码看ShardingSphere设计-路由引擎篇

从源码看ShardingSphere设计-路由引擎篇

作者: 蚊子squirrel | 来源:发表于2020-09-09 21:55 被阅读0次

无论是分库分表、还是读写分离,一个SQL在DB上执行前都需要经过特定规则运算获得运行的目标库表信息。路由引擎的职责定位就是计算SQL应该在哪个数据库、哪个表上执行。前者结果会传给后续执行引擎,然后根据其数据库标识获取对应的数据库连接;后者结果则会传给改写引擎在SQL执行前进行表名的改写,即替换为正确的物理表名。

计算哪个数据库依据的算法是要用户配置的库路由规则,计算哪个表依据的算法是用户配置的表路由规则。目前在ShardingSphere中需要进行路由的功能模块有两个:分库分表sharding与读写分离master-slave。

代码调用分析

再回到BasePrepareEngine类中,在进行路由操作前先进行了路由装饰器的注册
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#registerRouteDecorator

private void registerRouteDecorator() {
        for (Class<? extends RouteDecorator> each : OrderedRegistry.getRegisteredClasses(RouteDecorator.class)) {
            RouteDecorator routeDecorator = createRouteDecorator(each);
            Class<?> ruleClass = (Class<?>) routeDecorator.getType();
            // FIXME rule.getClass().getSuperclass() == ruleClass for orchestration, should decouple extend between orchestration rule and sharding rule
            rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
                    .forEach(rule -> router.registerDecorator(rule, routeDecorator));
        }
    }

之后开始路由的真正流程,路由入口类是DataNodeRouter

org.apache.shardingsphere.underlying.route.DataNodeRouter

/**
 * Data node router.
 */
@RequiredArgsConstructor
    public final class DataNodeRouter {
    
    private final ShardingSphereMetaData metaData;
    
    private final ConfigurationProperties properties;
    
    private final SQLParserEngine parserEngine;
    
    private final Map<BaseRule, RouteDecorator> decorators = new LinkedHashMap<>();
    
    private SPIRoutingHook routingHook = new SPIRoutingHook();
    
    /**
     * Register route decorator.
     *
     * @param rule rule
     * @param decorator route decorator
     */
    public void registerDecorator(final BaseRule rule, final RouteDecorator decorator) {
        decorators.put(rule, decorator);
    }
    
    /**
     * Route SQL.
     *
     * @param sql SQL
     * @param parameters SQL parameters
     * @param useCache whether cache SQL parse result
     * @return route context
     */
    public RouteContext route(final String sql, final List<Object> parameters, final boolean useCache) {
        routingHook.start(sql);
        try {
            RouteContext result = executeRoute(sql, parameters, useCache);//进行路由计算,生成路由结果
            routingHook.finishSuccess(result, metaData.getSchema());
            return result;
            // CHECKSTYLE:OFF
        } catch (final Exception ex) {
            // CHECKSTYLE:ON
            routingHook.finishFailure(ex);
            throw ex;
        }
    }
    
    @SuppressWarnings("unchecked")
    private RouteContext executeRoute(final String sql, final List<Object> parameters, final boolean useCache) {
        RouteContext result = createRouteContext(sql, parameters, useCache);
        for (Entry<BaseRule, RouteDecorator> entry : decorators.entrySet()) {
            result = entry.getValue().decorate(result, metaData, entry.getKey(), properties);
        }
        return result;
    }
    
    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);//解析SQL,生成SQL对应AST
        try {
            SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);// 生成SQL Statement上下文,相当于一部分语义分析
            return new RouteContext(sqlStatementContext, parameters, new RouteResult());
            // TODO should pass parameters for master-slave
        } catch (final IndexOutOfBoundsException ex) {
            return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());
        }
    }
}

由上可以看到,在调用解析引擎拿到SQLStatement实例后,先通过SQLStatementContext工厂类SQLStatementContextFactory创建了SQLStatementContext对象,这部分逻辑已在SQL解析章节进行了介绍这里就不展开。RouteDecorator的实现类目前只有两个分别对应数据分片ShardingRouteDecorator和主从MasterSlaveRouteDecorator。


数据分片路由装饰器

我们先看下最常见的数据分片路由修饰器ShardingRouteDecorator
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator

/**
* Sharding route decorator.
*/
public final class ShardingRouteDecorator implements RouteDecorator<ShardingRule> {
 
 @SuppressWarnings("unchecked")
 @Override
 public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
     SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
     List<Object> parameters = routeContext.getParameters();
     // 对SQL进行验证,主要用于判断一些不支持的SQL,在分片功能中不支持INSERT INTO .... ON DUPLICATE KEY、不支持更新sharding key
     ShardingStatementValidatorFactory.newInstance(
             sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
     // 获取SQL的条件信息
     ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
     boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
     if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
         // 检查所有Sharding值(表、列、值)是不是相同,如果不相同则抛出异常
         checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
         //剔除重复的sharding条件信息
         mergeShardingConditions(shardingConditions);
     }
     // 创建分片路由引擎
     ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
     // 进行路由,生成路由结果
     RouteResult routeResult = shardingRouteEngine.route(shardingRule);
     if (needMergeShardingValues) {
         Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
     }
     return new RouteContext(sqlStatementContext, parameters, routeResult);
 }

 private ShardingConditions getShardingConditions(final List<Object> parameters, 
                                                  final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData, final ShardingRule shardingRule) {
     if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {
         if (sqlStatementContext instanceof InsertStatementContext) {
             // 根据insert values中信息创建分片条件信息
             return new ShardingConditions(new InsertClauseShardingConditionEngine(shardingRule).createShardingConditions((InsertStatementContext) sqlStatementContext, parameters));
         }
         // 根据where条件中信息创建分片条件信息
         return new ShardingConditions(new WhereClauseShardingConditionEngine(shardingRule, schemaMetaData).createShardingConditions(sqlStatementContext, parameters));
     }
     return new ShardingConditions(Collections.emptyList());
 }

从代码可以看到内部逻辑:

  1. 首先会到SQLStatement进行一些校验,主要判断SQL支持范围。
  2. 创建分片条件对象ShardingCondition,对应上面getShardingConditions方法,该方法完成分片条件ShardingCondition对象的创建,展开看下最常见的Where分片条件创造的逻辑。
    org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine
/**
 * Sharding condition engine for where clause.
 */
@RequiredArgsConstructor
public final class WhereClauseShardingConditionEngine {
    
    private final ShardingRule shardingRule;
    
    private final SchemaMetaData schemaMetaData;
    
    /**
     * Create sharding conditions.
     * 
     * @param sqlStatementContext SQL statement context
     * @param parameters SQL parameters
     * @return sharding conditions
     */
    public List<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final List<Object> parameters) {
        if (!(sqlStatementContext instanceof WhereAvailable)) {
            return Collections.emptyList();
        }
        List<ShardingCondition> result = new ArrayList<>();
        Optional<WhereSegment> whereSegment = ((WhereAvailable) sqlStatementContext).getWhere();
        if (whereSegment.isPresent()) {
            result.addAll(createShardingConditions(sqlStatementContext, whereSegment.get().getAndPredicates(), parameters));
        }
        return result;
    }
    
    private Collection<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection<AndPredicate> andPredicates, final List<Object> parameters) {
        Collection<ShardingCondition> result = new LinkedList<>();
        for (AndPredicate each : andPredicates) {
            Map<Column, Collection<RouteValue>> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);// 得到where中各列对应的路由值,只有在配置中配置的列才会添加
            if (routeValueMap.isEmpty()) {
                return Collections.emptyList();
            }
            result.add(createShardingCondition(routeValueMap));// 根据列与路由值map创建分片条件对象,其中会合并重复的路由值
        }
        return result;
    }
    
    private Map<Column, Collection<RouteValue>> createRouteValueMap(final SQLStatementContext sqlStatementContext, final AndPredicate andPredicate, final List<Object> parameters) {
        Map<Column, Collection<RouteValue>> result = new HashMap<>();
        for (PredicateSegment each : andPredicate.getPredicates()) {
            Optional<String> tableName = sqlStatementContext.getTablesContext().findTableName(each.getColumn(), schemaMetaData);
            if (!tableName.isPresent() || !shardingRule.isShardingColumn(each.getColumn().getIdentifier().getValue(), tableName.get())) {
                continue;
            }
            Column column = new Column(each.getColumn().getIdentifier().getValue(), tableName.get());
            // 根据运算符创建对应的路由值,=、in为ListRouteValue类型,>、<、between等范围型为RangeRouteValue类型
            Optional<RouteValue> routeValue = ConditionValueGeneratorFactory.generate(each.getRightValue(), column, parameters);
            if (!routeValue.isPresent()) {
                continue;
            }
            if (!result.containsKey(column)) {
                result.put(column, new LinkedList<>());
            }
            result.get(column).add(routeValue.get());
        }
        return result;
    }
    
    private ShardingCondition createShardingCondition(final Map<Column, Collection<RouteValue>> routeValueMap) {
        ShardingCondition result = new ShardingCondition();
        for (Entry<Column, Collection<RouteValue>> entry : routeValueMap.entrySet()) {
            try {
                RouteValue routeValue = mergeRouteValues(entry.getKey(), entry.getValue());
                if (routeValue instanceof AlwaysFalseRouteValue) {
                    return new AlwaysFalseShardingCondition();
                }
                result.getRouteValues().add(routeValue);
            } catch (final ClassCastException ex) {
                throw new ShardingSphereException("Found different types for sharding value `%s`.", entry.getKey());
            }
        }
        return result;
    }
  1. 然后分片路由引擎工厂类创建对应的分片路由引擎实例ShardingRouteEngine,看下引擎工厂类

org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngineFactory

/**
 * Sharding routing engine factory.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShardingRouteEngineFactory {
    
    /**
     * Create new instance of routing engine.
     * 
     * @param shardingRule sharding rule
     * @param metaData meta data of ShardingSphere
     * @param sqlStatementContext SQL statement context
     * @param shardingConditions shardingConditions
     * @param properties sharding sphere properties
     * @return new instance of routing engine
     */
    public static ShardingRouteEngine newInstance(final ShardingRule shardingRule,
                                                  final ShardingSphereMetaData metaData, final SQLStatementContext sqlStatementContext,
                                                  final ShardingConditions shardingConditions, final ConfigurationProperties properties) {
        SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
        Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
        if (sqlStatement instanceof TCLStatement) {// 事务控制类SQL(commit、rollback、savepoint、set transaction),库广播类路由
            return new ShardingDatabaseBroadcastRoutingEngine();
        }
        if (sqlStatement instanceof DDLStatement) {// DDL SQL(create、alter、drop、truncate...),表广播类路由
            return new ShardingTableBroadcastRoutingEngine(metaData.getSchema(), sqlStatementContext);
        }
        if (sqlStatement instanceof DALStatement) {// DAL SQL (show database、show tables... ),根据SQL类型选择库广播、表路由或者默认库路由
            return getDALRoutingEngine(shardingRule, sqlStatement, tableNames);
        }
        if (sqlStatement instanceof DCLStatement) {// DCL 采用表广播路由或者主库路由
            return getDCLRoutingEngine(sqlStatementContext, metaData);
        }
        if (shardingRule.isAllInDefaultDataSource(tableNames)) {// 如果都是表名都配置默认数据源,则采用默认库路由
            return new ShardingDefaultDatabaseRoutingEngine(tableNames);
        }
        if (shardingRule.isAllBroadcastTables(tableNames)) {// 如果都属于配置中的广播表,查询采用单一路由,随机选择配置的数据源
            return sqlStatement instanceof SelectStatement ? new ShardingUnicastRoutingEngine(tableNames) : new ShardingDatabaseBroadcastRoutingEngine();
        }
        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && tableNames.isEmpty() && shardingRule.hasDefaultDataSourceName()) {// DML SQL 如果表名为空而且配置了默认库,则采用默认库路由
            return new ShardingDefaultDatabaseRoutingEngine(tableNames);
        }
        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && shardingConditions.isAlwaysFalse() || tableNames.isEmpty() || !shardingRule.tableRuleExists(tableNames)) {// DML SQL如果表名为空或者未配置sharding规则,则采用单一路由,随机选择数据源
            return new ShardingUnicastRoutingEngine(tableNames);
        }
        // 其它采用标准路由或者复杂路由
        return getShardingRoutingEngine(shardingRule, sqlStatementContext, shardingConditions, tableNames, properties);
    }
    
    private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule shardingRule, final SQLStatement sqlStatement, final Collection<String> tableNames) {
        if (sqlStatement instanceof UseStatement) {// Use SQL忽略类路由
            return new ShardingIgnoreRoutingEngine();
        }
        if (sqlStatement instanceof SetStatement || sqlStatement instanceof ResetParameterStatement || sqlStatement instanceof ShowDatabasesStatement) {// Set、reset、show database 库广播类路由
            return new ShardingDatabaseBroadcastRoutingEngine();
        }
        if (!tableNames.isEmpty() && !shardingRule.tableRuleExists(tableNames) && shardingRule.hasDefaultDataSourceName()) {// 如果表名在sharding规则中未配置的使用默认库路由
            return new ShardingDefaultDatabaseRoutingEngine(tableNames);
        }
        if (!tableNames.isEmpty()) {//如果表名不为空,采用单一路由
            return new ShardingUnicastRoutingEngine(tableNames);
        }
        return new ShardingDataSourceGroupBroadcastRoutingEngine();// 采用数据库群组路由
    }
    
    private static ShardingRouteEngine getDCLRoutingEngine(final SQLStatementContext sqlStatementContext, final ShardingSphereMetaData metaData) {
        return isDCLForSingleTable(sqlStatementContext) 
                ? new ShardingTableBroadcastRoutingEngine(metaData.getSchema(), sqlStatementContext) : new ShardingMasterInstanceBroadcastRoutingEngine(metaData.getDataSources());
    }
    
    private static boolean isDCLForSingleTable(final SQLStatementContext sqlStatementContext) {
        if (sqlStatementContext instanceof TableAvailable) {
            TableAvailable tableSegmentsAvailable = (TableAvailable) sqlStatementContext;
            return 1 == tableSegmentsAvailable.getAllTables().size() && !"*".equals(tableSegmentsAvailable.getAllTables().iterator().next().getTableName().getIdentifier().getValue());
        }
        return false;
    }
    
    private static ShardingRouteEngine getShardingRoutingEngine(final ShardingRule shardingRule, final SQLStatementContext sqlStatementContext,
                                                                final ShardingConditions shardingConditions, final Collection<String> tableNames, final ConfigurationProperties properties) {
        Collection<String> shardingTableNames = shardingRule.getShardingLogicTableNames(tableNames);
        if (1 == shardingTableNames.size() || shardingRule.isAllBindingTables(shardingTableNames)) {// 只有一张逻辑表或者都是绑定表,采用标准路由
            return new ShardingStandardRoutingEngine(shardingTableNames.iterator().next(), sqlStatementContext, shardingConditions, properties);
        }
        // TODO config for cartesian set
        return new ShardingComplexRoutingEngine(tableNames, sqlStatementContext, shardingConditions, properties);
    }
}

可以看到不同的SQL类型,对应不同的路由策略,展开看下最常见的分片标准路由引擎
org.apache.shardingsphere.sharding.route.engine.type.standard.ShardingStandardRoutingEngine


/**
 * Sharding standard routing engine.
 */
@RequiredArgsConstructor
public final class ShardingStandardRoutingEngine implements ShardingRouteEngine {
    …    
    @Override
    public RouteResult route(final ShardingRule shardingRule) {
        if (isDMLForModify(sqlStatementContext) && 1 != ((TableAvailable) sqlStatementContext).getAllTables().size()) {// 判断SQL中涉及的表,insert、update、delete不支持多张表
            throw new ShardingSphereException("Cannot support Multiple-Table for '%s'.", sqlStatementContext.getSqlStatement());
        }
        return generateRouteResult(getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName)));
    }
    
    private boolean isDMLForModify(final SQLStatementContext sqlStatementContext) {
        return sqlStatementContext instanceof InsertStatementContext || sqlStatementContext instanceof UpdateStatementContext || sqlStatementContext instanceof DeleteStatementContext;
    }

    // 根据数据节点生成路由结果
    private RouteResult generateRouteResult(final Collection<DataNode> routedDataNodes) {
        RouteResult result = new RouteResult();
        result.getOriginalDataNodes().addAll(originalDataNodes);
        for (DataNode each : routedDataNodes) {
            result.getRouteUnits().add(
                    new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singletonList(new RouteMapper(logicTableName, each.getTableName()))));
        }
        return result;
    }

    // 计算数据节点,计算路由的核心方法
    private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
        if (isRoutingByHint(shardingRule, tableRule)) {// 库表路由都是hint方式,通过hint路由
            return routeByHint(shardingRule, tableRule);
        }
        if (isRoutingByShardingConditions(shardingRule, tableRule)) {// 库表路由都不是通过hint方式,则通过sharding条件进行路由
            return routeByShardingConditions(shardingRule, tableRule);
        }
        return routeByMixedConditions(shardingRule, tableRule);// 库表路由中既有hint,又有sharding条件,则进行混合路由
    }
    
  …
    private List<RouteValue> getDatabaseShardingValues(final ShardingRule shardingRule, final TableRule tableRule, final ShardingCondition shardingCondition) {
        ShardingStrategy dataBaseShardingStrategy = shardingRule.getDatabaseShardingStrategy(tableRule);
        return isGettingShardingValuesFromHint(dataBaseShardingStrategy)
                ? getDatabaseShardingValuesFromHint() : getShardingValuesFromShardingConditions(shardingRule, dataBaseShardingStrategy.getShardingColumns(), shardingCondition);
    }
    
    private List<RouteValue> getTableShardingValues(final ShardingRule shardingRule, final TableRule tableRule, final ShardingCondition shardingCondition) {
        ShardingStrategy tableShardingStrategy = shardingRule.getTableShardingStrategy(tableRule);
        return isGettingShardingValuesFromHint(tableShardingStrategy)
                ? getTableShardingValuesFromHint() : getShardingValuesFromShardingConditions(shardingRule, tableShardingStrategy.getShardingColumns(), shardingCondition);
    }
    
    // 获取通过hint设置的库路由值(通过HintManager.addDatabaseShardingValue或setDatabaseShardingValue设置)
    private List<RouteValue> getDatabaseShardingValuesFromHint() {
        return getRouteValues(HintManager.isDatabaseShardingOnly() ? HintManager.getDatabaseShardingValues() : HintManager.getDatabaseShardingValues(logicTableName));
    }

    // 获取通过hint设置的表路由值(通过HintManager.addTableShardingValue设置)
    private List<RouteValue> getTableShardingValuesFromHint() {
        return getRouteValues(HintManager.getTableShardingValues(logicTableName));
    }
    
…    
    private Collection<DataNode> route0(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
        Collection<String> routedDataSources = routeDataSources(shardingRule, tableRule, databaseShardingValues);// 计算应该路由到哪些库
        Collection<DataNode> result = new LinkedList<>();
        for (String each : routedDataSources) {
            result.addAll(routeTables(shardingRule, tableRule, each, tableShardingValues));// 计算应该路由到哪些表
        }
        return result;
    }

    // 执行配置的库路由计算方法,得到路由到数据库标识
    private Collection<String> routeDataSources(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
        if (databaseShardingValues.isEmpty()) {
            return tableRule.getActualDatasourceNames();
        }
        Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties));
        Preconditions.checkState(!result.isEmpty(), "no database route info");
        Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result), 
                "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames());
        return result;
    }
    // 执行配置的表路由计算方法,得到实际表名
    private Collection<DataNode> routeTables(final ShardingRule shardingRule, final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
        Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
        Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues, this.properties));
        Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
        Collection<DataNode> result = new LinkedList<>();
        for (String each : routedTables) {
            result.add(new DataNode(routedDataSource, each));
        }
        return result;
    }
}

可以看到,标准分片路由引擎会根据分片规则ShardingRule配置的库分片策略databaseShardingStrategy属性和表tableShardingStrategy属性,执行它们的doSharding方法计算应该路由的库(数据源名称)与表(真正的物理表名)。

再继续看下标准分片策略类StandardShardingStrategy,其创建时需要提供StandardShardingStrategyConfiguration实例,即应用负责的分片策略配置类(如果采用spring xml或者yaml方式,ShardingSphere会负责自动创建该类,如果基于JAVA API方式,则需要应用自行进行创建此类)

org.apache.shardingsphere.core.strategy.route.standard.StandardShardingStrategy

/**
 * Standard sharding strategy.
 */
public final class StandardShardingStrategy implements ShardingStrategy {
    
    private final String shardingColumn;
    
    private final PreciseShardingAlgorithm preciseShardingAlgorithm;
    
    private final RangeShardingAlgorithm rangeShardingAlgorithm;
    
    public StandardShardingStrategy(final StandardShardingStrategyConfiguration standardShardingStrategyConfig) {
        Preconditions.checkNotNull(standardShardingStrategyConfig.getShardingColumn(), "Sharding column cannot be null.");
        Preconditions.checkNotNull(standardShardingStrategyConfig.getPreciseShardingAlgorithm(), "precise sharding algorithm cannot be null.");
        shardingColumn = standardShardingStrategyConfig.getShardingColumn();
        preciseShardingAlgorithm = standardShardingStrategyConfig.getPreciseShardingAlgorithm();// 应用要提供实现PreciseShardingAlgorithm接口的类
        rangeShardingAlgorithm = standardShardingStrategyConfig.getRangeShardingAlgorithm();// 应用要提供实现RangeShardingAlgorithm接口的类
    }
    
    @Override
    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {
        RouteValue shardingValue = shardingValues.iterator().next();
        Collection<String> shardingResult = shardingValue instanceof ListRouteValue
                ? doSharding(availableTargetNames, (ListRouteValue) shardingValue) : doSharding(availableTargetNames, (RangeRouteValue) shardingValue);
        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        result.addAll(shardingResult);
        return result;
    }
    
    @SuppressWarnings("unchecked")
    private Collection<String> doSharding(final Collection<String> availableTargetNames, final RangeRouteValue<?> shardingValue) {
        if (null == rangeShardingAlgorithm) {
            throw new UnsupportedOperationException("Cannot find range sharding strategy in sharding rule.");
        }
        return rangeShardingAlgorithm.doSharding(availableTargetNames, 
                new RangeShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), shardingValue.getValueRange()));
    }
    
    @SuppressWarnings("unchecked")
    private Collection<String> doSharding(final Collection<String> availableTargetNames, final ListRouteValue<?> shardingValue) {
        Collection<String> result = new LinkedList<>();
        for (Comparable<?> each : shardingValue.getValues()) {
            String target = preciseShardingAlgorithm.doSharding(availableTargetNames, new PreciseShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), each));
            if (null != target) {
                result.add(target);
            }
        }
        return result;
    }
…
}

总结下路由引擎的整个流程:

  1. DataNodeRouter会先调用解析引擎解析SQL,得到对应的SQLStatement(此处与解析模块进行了耦合,应该剥离出去,让外围编排去调用,或者统一放在prepare流程中,5.x版本中已优化);
  2. 通过SQLStatementContext工厂类根据SQLStatement创建SQLStatementContext实例;
  3. 初始化一个RouteContext,与ShardingRule一起传给RouteDecorator的实现类
  4. 经过RouteDecorator的路由计算后,创建真正的RouteContext返回。
主从路由装饰器

看完数据分片的路由装饰器,最后看下主从路由装饰器MasterSlaveRouteDecorator
org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator

/**
 * Route decorator for master-slave.
 */
public final class MasterSlaveRouteDecorator implements RouteDecorator<MasterSlaveRule> {
    
    @Override
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
        if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {
            // 获取路由的数据源名称
            String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
            RouteResult routeResult = new RouteResult();
            routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList()));
            return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult);
        }
        // 分库分表+读写分离模式下,在计算完数据分片库路由(数据源实际是主从规则名称)后,
        // 还需要根据主从配置,替换为真实的数据源,因此需要先进行删除,再添加真实数据源
        Collection<RouteUnit> toBeRemoved = new LinkedList<>();
        Collection<RouteUnit> toBeAdded = new LinkedList<>();
        for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
            if (masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
                toBeRemoved.add(each);
                String actualDataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
                toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
            }
        }
        routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved);
        routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded);
        return routeContext;
    }

可以看到其中主要是通过MasterSlaveDataSourceRouter这个类来计算路由到具体哪个数据库,在数据分片+读写分离混合模式下,还需要进行真实的数据源替换。

org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter

/**
 * Data source router for master-slave.
 */
@RequiredArgsConstructor
public final class MasterSlaveDataSourceRouter {
    
    private final MasterSlaveRule masterSlaveRule;
    
    /**
     * Route.
     * 
     * @param sqlStatement SQL statement
     * @return data source name
     */
    public String route(final SQLStatement sqlStatement) {
        if (isMasterRoute(sqlStatement)) {// 需要路由到主库(SQL中包含锁例如select for update、非select、通过hint指定主库路由)
            MasterVisitedManager.setMasterVisited();
            return masterSlaveRule.getMasterDataSourceName();
        }
        return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(// 根据负载均衡算法计算要访问的数据源
                masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()));
    }
    
    private boolean isMasterRoute(final SQLStatement sqlStatement) {
        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
    }
    
    private boolean containsLockSegment(final SQLStatement sqlStatement) {
        return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent();
    }
}

可以看到真正负责数据源选择的是masterSlaveRule.getLoadBalanceAlgorithm()即MasterSlaveLoadBalanceAlgorithm接口的getDataSource方法。该接口ShardingSphere内置的实现有两个,开发者也可以根据需要进行扩展。


org.apache.shardingsphere.core.strategy.masterslave.RoundRobinMasterSlaveLoadBalanceAlgorithm

/**
 * Round-robin slave database load-balance algorithm.
 */
@Getter
@Setter
public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
    
    private static final ConcurrentHashMap<String, AtomicInteger> COUNTS = new ConcurrentHashMap<>();
    
    private Properties properties = new Properties();
    
    @Override
    public String getType() {
        return "ROUND_ROBIN";
    }
    
    @Override
    public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
        AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
        COUNTS.putIfAbsent(name, count);
        count.compareAndSet(slaveDataSourceNames.size(), 0);// 记录当前规则访问的次数,达到从库数量后从重置为0
        return slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size());// 通过访问次数取模从库数量,这样会依次获取到各从库
    }
}

org.apache.shardingsphere.core.strategy.masterslave.RandomMasterSlaveLoadBalanceAlgorithm

/**
 * Random slave database load-balance algorithm.
 */
@Getter
@Setter
public final class RandomMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
    
    private Properties properties = new Properties();
    
    @Override
    public String getType() {
        return "RANDOM";
    }
    
    @Override
    public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
        return slaveDataSourceNames.get(ThreadLocalRandom.current().nextInt(slaveDataSourceNames.size()));//通过随机数选择其中一个从库
    }
}

最后画一个路由引擎总体流程架构图:


路由引擎流程架构图

关于路由引擎更多的功能介绍,可参见官网文档https://shardingsphere.apache.org/document/current/cn/features/sharding/principle/route/

相关文章

网友评论

      本文标题:从源码看ShardingSphere设计-路由引擎篇

      本文链接:https://www.haomeiwen.com/subject/ebwisktx.html