美文网首页ShardingSphere
Sharding-JDBC 分库配置解析过程

Sharding-JDBC 分库配置解析过程

作者: 晴天哥_王志 | 来源:发表于2020-06-14 18:01 被阅读0次

    系列

    开篇

    • 案例代码参考shardingsphere-example,版本基于4.0.0的tag版本。
    • 这篇文章用来分析分库配置的整个解析过程,根据shardingRuleConfig来生成shardingRule对象。
    • shardingRule包含核心变量tableRules,本质上是TableRule的列表。
    • shardingRule包含核心变量defaultDatabaseShardingStrategy,用于分库的策略。
    • TableRule包含逻辑表名logicTable和对应的实际存储节点名actualDataNodes。
    核心变量关系图
    • 核心变量的关系图主要包含ShardingRule和TableRule。
    • ShardingRule和TableRule的关系为1:N。

    分库分表策略

    支持策略

    public enum ShardingType {
        // 分库
        SHARDING_DATABASES,
        // 分表
        SHARDING_TABLES,
        // 分库分表
        SHARDING_DATABASES_AND_TABLES,
        // 主从
        MASTER_SLAVE,
        // 分片下的主从
        SHARDING_MASTER_SLAVE,
        // 加密
        ENCRYPT
    }
    
    • 支持分库、分表、分库分表、主从、分片下的主从、加密等共6种场景。

    精确值的分库分表策略

    public class DataSourceFactory {
        // 精确值的分库分表策略
        public static DataSource newInstance(final ShardingType shardingType) throws SQLException {
            switch (shardingType) {
                // 分库
                case SHARDING_DATABASES:
                    return new ShardingDatabasesConfigurationPrecise().getDataSource();
                // 分表
                case SHARDING_TABLES:
                    return new ShardingTablesConfigurationPrecise().getDataSource();
                // 分库分表
                case SHARDING_DATABASES_AND_TABLES:
                    return new ShardingDatabasesAndTablesConfigurationPrecise().getDataSource();
                // 主从
                case MASTER_SLAVE:
                    return new MasterSlaveConfiguration().getDataSource();
                // 分片场景下的主从
                case SHARDING_MASTER_SLAVE:
                    return new ShardingMasterSlaveConfigurationPrecise().getDataSource();
                default:
                    throw new UnsupportedOperationException(shardingType.name());
            }
        }
    }
    
    • 按照精确值进行分库分表策略配置对象。
    • 支持分库、分表、分库分表、主从、分片的主从等5类场景。

    范围值的分库分表策略

    public class RangeDataSourceFactory {
        // 范围值的分库分表策略
        public static DataSource newInstance(final ShardingType shardingType) throws SQLException {
            switch (shardingType) {
                // 分库
                case SHARDING_DATABASES:
                    return new ShardingDatabasesConfigurationRange().getDataSource();
                // 分表
                case SHARDING_TABLES:
                    return new ShardingTablesConfigurationRange().getDataSource();
                // 分库分表
                case SHARDING_DATABASES_AND_TABLES:
                    return new ShardingDatabasesAndTablesConfigurationRange().getDataSource();
                // 主从
                case MASTER_SLAVE:
                    return new MasterSlaveConfiguration().getDataSource();
                // 分片下的主从关系
                case SHARDING_MASTER_SLAVE:
                    return new ShardingMasterSlaveConfigurationRange().getDataSource();
                default:
                    throw new UnsupportedOperationException(shardingType.name());
            }
        }
    }
    
    • 按照范围值进行分库分表策略配置对象。
    • 支持分库、分表、分库分表、主从、分片的主从等5类场景。

    精确值分库配置

    配置的源码实现

    public final class ShardingDatabasesConfigurationPrecise implements ExampleConfiguration {
        
        @Override
        public DataSource getDataSource() throws SQLException {
            // 创建shardingRuleConfig对象
            ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
    
            // 绑定order/order_item表的配置信息
            shardingRuleConfig.getTableRuleConfigs().add(getOrderTableRuleConfiguration());
            shardingRuleConfig.getTableRuleConfigs().add(getOrderItemTableRuleConfiguration());
            shardingRuleConfig.getBroadcastTables().add("t_address");
    
            // 设置分库策略配置
            shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("user_id", "demo_ds_${user_id % 2}"));
    
            // 根据dataSource和shardingRuleConfig创建dataSource对象
            return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties());
        }
        
        private static TableRuleConfiguration getOrderTableRuleConfiguration() {
            TableRuleConfiguration result = new TableRuleConfiguration("t_order");
            result.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "order_id", getProperties()));
            return result;
        }
        
        private static TableRuleConfiguration getOrderItemTableRuleConfiguration() {
            TableRuleConfiguration result = new TableRuleConfiguration("t_order_item");
            result.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "order_item_id", getProperties()));
            return result;
        }
        
        private static Map<String, DataSource> createDataSourceMap() {
            Map<String, DataSource> result = new HashMap<>();
            result.put("demo_ds_0", DataSourceUtil.createDataSource("demo_ds_0"));
            result.put("demo_ds_1", DataSourceUtil.createDataSource("demo_ds_1"));
            return result;
        }
        
        private static Properties getProperties() {
            Properties result = new Properties();
            result.setProperty("worker.id", "123");
            return result;
        }
    }
    
    • 配置两个dataSource,分别是demo_ds_0和demo_ds_1。
    • 配置分库的逻辑DatabaseShardingStrategyConfig,分库的列为user_id,分库的表达式为demo_ds_${user_id % 2}。

    配置的json格式

    {
        "bindingTableGroups":[],
        "broadcastTables":["t_address"],
        "defaultDatabaseShardingStrategyConfig":{
            "algorithmExpression":"demo_ds_${user_id % 2}",
            "shardingColumn":"user_id"
        },
        "masterSlaveRuleConfigs":[
        ],
        "tableRuleConfigs":[
            {
                "keyGeneratorConfig":{
                    "column":"order_id",
                    "properties":{
                        "worker.id":"123"
                    },
                    "type":"SNOWFLAKE"
                },
                "logicTable":"t_order"
            },
            {
                "keyGeneratorConfig":{
                    "column":"order_item_id",
                    "properties":{
                        "worker.id":"123"
                    },
                    "type":"SNOWFLAKE"
                },
                "logicTable":"t_order_item"
            }]
    }
    
    • 分库配置的json格式展示,核心关注分库策略和t_order及t_order_item表。
    • defaultDatabaseShardingStrategyConfig为分库的策略,包含分库的列user_id和分库的逻辑策略demo_ds_${user_id % 2}

    配置规则的解析

    public final class ShardingDataSourceFactory {
        
        public static DataSource createDataSource(
                final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException {
            // 根据ShardingRuleConfiguration生成ShardingRule
            return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
        }
    }
    
    • 基于shardingRuleConfig和dataSourceMap生成ShardingRule对象。
    public class ShardingRule implements BaseRule {
    
        private final ShardingRuleConfiguration ruleConfiguration;
        private final ShardingDataSourceNames shardingDataSourceNames;
        private final Collection<TableRule> tableRules;
        private final Collection<BindingTableRule> bindingTableRules;
        private final Collection<String> broadcastTables;
        private final ShardingStrategy defaultDatabaseShardingStrategy;
        private final ShardingStrategy defaultTableShardingStrategy;
        private final ShardingKeyGenerator defaultShardingKeyGenerator;
        private final Collection<MasterSlaveRule> masterSlaveRules;
        private final EncryptRule encryptRule;
    
        public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) {
            Preconditions.checkArgument(null != shardingRuleConfig, "ShardingRuleConfig cannot be null.");
            Preconditions.checkArgument(null != dataSourceNames && !dataSourceNames.isEmpty(), "Data sources cannot be empty.");
            // 1、分片规则
            this.ruleConfiguration = shardingRuleConfig;
            // 2、解析生成数据源名字
            shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames);
            // 3、生成TableRule
            tableRules = createTableRules(shardingRuleConfig);
            // 4、生成broadcastTables
            broadcastTables = shardingRuleConfig.getBroadcastTables();
            // 5、生成BindingTableRule
            bindingTableRules = createBindingTableRules(shardingRuleConfig.getBindingTableGroups());
            // 6、生成分库策略defaultDatabaseShardingStrategy
            defaultDatabaseShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig());
            // 7、生成分表策略defaultTableShardingStrategy
            defaultTableShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultTableShardingStrategyConfig());
            // 8、生成分片key的生成器defaultShardingKeyGenerator
            defaultShardingKeyGenerator = createDefaultKeyGenerator(shardingRuleConfig.getDefaultKeyGeneratorConfig());
            // 9、生成MasterSlaveRule
            masterSlaveRules = createMasterSlaveRules(shardingRuleConfig.getMasterSlaveRuleConfigs());
            // 10、生成EncryptRule
            encryptRule = createEncryptRule(shardingRuleConfig.getEncryptRuleConfig());
        }
    }
    
    • 整体逻辑的注释如上所示,核心关注tableRules和defaultDatabaseShardingStrategy。
    • ShardingDataSourceNames记录分库的数据源。
    • ShardingRule包含tableRules,TableRule记录分库后的表规则。
    • defaultDatabaseShardingStrategy表示database的分库策略。
    public final class ShardingDataSourceNames {
        
        private final ShardingRuleConfiguration shardingRuleConfig;
        
        @Getter
        private final Collection<String> dataSourceNames;
        
        public ShardingDataSourceNames(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> rawDataSourceNames) {
            Preconditions.checkArgument(null != shardingRuleConfig, "can not construct ShardingDataSourceNames with null ShardingRuleConfig");
            this.shardingRuleConfig = shardingRuleConfig;
            dataSourceNames = getAllDataSourceNames(rawDataSourceNames);
        }
        
        private Collection<String> getAllDataSourceNames(final Collection<String> dataSourceNames) {
            Collection<String> result = new LinkedHashSet<>(dataSourceNames);
            for (MasterSlaveRuleConfiguration each : shardingRuleConfig.getMasterSlaveRuleConfigs()) {
                result.remove(each.getMasterDataSourceName());
                result.removeAll(each.getSlaveDataSourceNames());
                result.add(each.getName());
            }
            return result;
        }
    }
    
    • ShardingDataSourceNames的生成规则为包含传入的dataSourceNames,然后排除MasterSlaveRuleConfiguration的主从的DataSourceNames并添加对应的name。
    • 在分库的场景下dataSourceNames为["demo_ds_1","demo_ds_0"]
    public class ShardingRule implements BaseRule {
    
        private Collection<TableRule> createTableRules(final ShardingRuleConfiguration shardingRuleConfig) {
            // 1、getTableRuleConfigs的结果可以参考分库的json格式配置内容
            Collection<TableRuleConfiguration> tableRuleConfigurations = shardingRuleConfig.getTableRuleConfigs();
            // 每个表对应一个TableRule对象
            Collection<TableRule> result = new ArrayList<>(tableRuleConfigurations.size());
            // 2、遍历tableRuleConfigurations生成TableRule
            for (TableRuleConfiguration each : tableRuleConfigurations) {
                // 3、每个table对应一个TableRuleConfiguration,每个TableRuleConfiguration生成TableRule对象
                result.add(new TableRule(each, shardingDataSourceNames, getDefaultGenerateKeyColumn(shardingRuleConfig)));
            }
            return result;
        }
    
        private String getDefaultGenerateKeyColumn(final ShardingRuleConfiguration shardingRuleConfig) {
            return null == shardingRuleConfig.getDefaultKeyGeneratorConfig() ? null : shardingRuleConfig.getDefaultKeyGeneratorConfig().getColumn();
        }
    }
    
    
    public final class TableRule {
        
        private final String logicTable;
        private final List<DataNode> actualDataNodes;
        private final Set<String> actualTables;
        private final Map<DataNode, Integer> dataNodeIndexMap;
        private final ShardingStrategy databaseShardingStrategy;
        private final ShardingStrategy tableShardingStrategy;
        private final String generateKeyColumn;
        private final ShardingKeyGenerator shardingKeyGenerator;
        private final Collection<String> actualDatasourceNames = new LinkedHashSet<>();
        private final Map<String, Collection<String>> datasourceToTablesMap = new HashMap<>();
    
        public TableRule(final TableRuleConfiguration tableRuleConfig, final ShardingDataSourceNames shardingDataSourceNames, final String defaultGenerateKeyColumn) {
            // 生成逻辑表名logicTable
            logicTable = tableRuleConfig.getLogicTable().toLowerCase();
            // 解析actualNodes生成dataNodes,分库的场景下这个值为actualDataNodes为空
            List<String> dataNodes = new InlineExpressionParser(tableRuleConfig.getActualDataNodes()).splitAndEvaluate();
            dataNodeIndexMap = new HashMap<>(dataNodes.size(), 1);
            // 生成实际的DataNode节点,不同场景使用不同的策略
            actualDataNodes = isEmptyDataNodes(dataNodes)
                ? generateDataNodes(tableRuleConfig.getLogicTable(), shardingDataSourceNames.getDataSourceNames()) : generateDataNodes(dataNodes, shardingDataSourceNames.getDataSourceNames());
            // 获取实际的表名,获取actualDataNode种DataNode的表名
            actualTables = getActualTables();
            // 分库策略
            databaseShardingStrategy = null == tableRuleConfig.getDatabaseShardingStrategyConfig() ? null : ShardingStrategyFactory.newInstance(tableRuleConfig.getDatabaseShardingStrategyConfig());
            // 分表策略
            tableShardingStrategy = null == tableRuleConfig.getTableShardingStrategyConfig() ? null : ShardingStrategyFactory.newInstance(tableRuleConfig.getTableShardingStrategyConfig());
            // 生成key的列名
            generateKeyColumn = getGenerateKeyColumn(tableRuleConfig.getKeyGeneratorConfig(), defaultGenerateKeyColumn);
            // key的生成器
            shardingKeyGenerator = containsKeyGeneratorConfiguration(tableRuleConfig)
                    ? new ShardingKeyGeneratorServiceLoader().newService(tableRuleConfig.getKeyGeneratorConfig().getType(), tableRuleConfig.getKeyGeneratorConfig().getProperties()) : null;
            checkRule(dataNodes);
        }
    
        private List<DataNode> generateDataNodes(final String logicTable, final Collection<String> dataSourceNames) {
            List<DataNode> result = new LinkedList<>();
            int index = 0;
            for (String each : dataSourceNames) {
                // DataNode包含表名logicTable和dataSourceName
                DataNode dataNode = new DataNode(each, logicTable);
                result.add(dataNode);
                dataNodeIndexMap.put(dataNode, index);
                actualDatasourceNames.add(each);
                addActualTable(dataNode.getDataSourceName(), dataNode.getTableName());
                index++;
            }
            return result;
        }
    }
    
    public final class DataNode {
        private static final String DELIMITER = ".";
        // 所属的数据源
        private final String dataSourceName;
        // 逻辑表名
        private final String tableName;
    }
    
    • TableRule记录的是每个表的分库后的结果。
    • TableRule的核心变量包括logicTable逻辑、actualDataNodes实际数据存储节点、databaseShardingStrategy分库策略。
    • 分库场景下逻辑表就是在配置时候指定的,如t_order和t_order_item表。
    • 分库场景下actualDataNodes(实际节点)是将逻辑表名和数据源进行笛卡尔积,本质上一个逻辑表对应多个数据源。
    • t_order和t_order_item包含demo_ds_0和demo_ds_1两个dataSource,所以t_order结合dataSource生成两个DataNode,t_order_item相同。
    public final class ShardingStrategyFactory {
        
        public static ShardingStrategy newInstance(final ShardingStrategyConfiguration shardingStrategyConfig) {
            if (shardingStrategyConfig instanceof StandardShardingStrategyConfiguration) {
                return new StandardShardingStrategy((StandardShardingStrategyConfiguration) shardingStrategyConfig);
            }
            if (shardingStrategyConfig instanceof InlineShardingStrategyConfiguration) {
                // 分库场景下是分库策略为InlineShardingStrategy对象
                return new InlineShardingStrategy((InlineShardingStrategyConfiguration) shardingStrategyConfig);
            }
            if (shardingStrategyConfig instanceof ComplexShardingStrategyConfiguration) {
                return new ComplexShardingStrategy((ComplexShardingStrategyConfiguration) shardingStrategyConfig);
            }
            if (shardingStrategyConfig instanceof HintShardingStrategyConfiguration) {
                return new HintShardingStrategy((HintShardingStrategyConfiguration) shardingStrategyConfig);
            }
            return new NoneShardingStrategy();
        }
    }
    
    
    public interface ShardingStrategy {
        Collection<String> getShardingColumns();
        Collection<String> doSharding(Collection<String> availableTargetNames, Collection<RouteValue> shardingValues);
    }
    
    
    public final class InlineShardingStrategy implements ShardingStrategy {
        
        private final String shardingColumn;
        private final Closure<?> closure;
        
        public InlineShardingStrategy(final InlineShardingStrategyConfiguration inlineShardingStrategyConfig) {
            Preconditions.checkNotNull(inlineShardingStrategyConfig.getShardingColumn(), "Sharding column cannot be null.");
            Preconditions.checkNotNull(inlineShardingStrategyConfig.getAlgorithmExpression(), "Sharding algorithm expression cannot be null.");
            // 分片的列名
            shardingColumn = inlineShardingStrategyConfig.getShardingColumn();
            // 分片的表达式
            String algorithmExpression = InlineExpressionParser.handlePlaceHolder(inlineShardingStrategyConfig.getAlgorithmExpression().trim());
            // 根据分片的表达式生成执行闭包
            closure = new InlineExpressionParser(algorithmExpression).evaluateClosure();
        }
    
        @Override
        public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues) {
            RouteValue shardingValue = shardingValues.iterator().next();
            Preconditions.checkState(shardingValue instanceof ListRouteValue, "Inline strategy cannot support range sharding.");
            // 根据查询值ListRouteValue来执行分片操作
            Collection<String> shardingResult = doSharding((ListRouteValue) shardingValue);
            Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
            for (String each : shardingResult) {
                if (availableTargetNames.contains(each)) {
                    result.add(each);
                }
            }
            return result;
        }
        
        private Collection<String> doSharding(final ListRouteValue shardingValue) {
            Collection<String> result = new LinkedList<>();
            // 将shardingValue转为PreciseShardingValue
            for (PreciseShardingValue<?> each : transferToPreciseShardingValues(shardingValue)) {
                // 通过执行execute方法来将分片值通过分片表达式来进行执行并返回结果
                result.add(execute(each));
            }
            return result;
        }
        
        @SuppressWarnings("unchecked")
        private List<PreciseShardingValue> transferToPreciseShardingValues(final ListRouteValue<?> shardingValue) {
            // 将分片的值转为PreciseShardingValue对象
            List<PreciseShardingValue> result = new ArrayList<>(shardingValue.getValues().size());
            for (Comparable<?> each : shardingValue.getValues()) {
                result.add(new PreciseShardingValue(shardingValue.getTableName(), shardingValue.getColumnName(), each));
            }
            return result;
        }
        
        private String execute(final PreciseShardingValue shardingValue) {
            // 将分片值通过分片表达式的闭包执行并返回结果
            Closure<?> result = closure.rehydrate(new Expando(), null, null);
            result.setResolveStrategy(Closure.DELEGATE_ONLY);
            result.setProperty(shardingColumn, shardingValue.getValue());
            return result.call().toString();
        }
    }
    
    • ShardingStrategyFactory为分片策略工程,分库场景为InlineShardingStrategy对象。
    • InlineShardingStrategy包含分片的列shardingColumn和分片的表达式algorithmExpression,同时根据algorithmExpression来生成执行闭包closure。
    • doSharding过程根据ListRouteValue的值通过分片表达式闭包计算分片结果。
    public final class ListRouteValue<T extends Comparable<?>> implements RouteValue {
        // 列名
        private final String columnName;
        // 表名
        private final String tableName;
        // 对应的值
        private final Collection<T> values;
        
        @Override
        public String toString() {
            return tableName + "." + columnName + (1 == values.size() ? " = " + new ArrayList<>(values).get(0) : " in (" + Joiner.on(",").join(values) + ")");
        }
    }
    
    • ListRouteValue对象包含表名、列名和对应的值。



    ShardingStrategy
    • 分片策略的类关系图。

    相关文章

      网友评论

        本文标题:Sharding-JDBC 分库配置解析过程

        本文链接:https://www.haomeiwen.com/subject/rdfxxktx.html