美文网首页JavaShardingSphere
Sharding-JDBC 核心组件介绍

Sharding-JDBC 核心组件介绍

作者: 晴天哥_王志 | 来源:发表于2020-05-24 20:25 被阅读0次

    Sharding-JDBC系列

    ShardingSphere介绍

    • ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(计划中)这3款相互独立的产品组成。 他们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如Java同构、异构语言、云原生等各种多样化的应用场景。

    • ShardingSphere定位为关系型数据库中间件,旨在充分合理地在分布式的场景下利用关系型数据库的计算和存储能力,而并非实现一个全新的关系型数据库。

    • 官方文档参考shardingsphere官网,文档非常详细推荐阅读。

    Sharding-JDBC

    Rule 分片信息

    • RuleConfiguration的核心配置依次为ShardingRule(分片Rule)、TableRule (表规则)、ShardingRuleConfiguration(分片规则)、TableRuleConfiguration(表规则)和MasterSlaveRuleConfiguration(主从规则)。

    ShardingRule 分片规则

    @Getter
    public class ShardingRule implements BaseRule {
        // 分片规则配置
        private final ShardingRuleConfiguration ruleConfiguration;
        // 分片的数据源名称
        private final ShardingDataSourceNames shardingDataSourceNames;
        // 逻辑表的库表资源
        private final Collection<TableRule> tableRules;
        // 相同表分片规则的组
        private final Collection<BindingTableRule> bindingTableRules;
        // 广播的表名
        private final Collection<String> broadcastTables;
        // 默认分库策略
        private final ShardingStrategy defaultDatabaseShardingStrategy;
        // 默认分表策略
        private final ShardingStrategy defaultTableShardingStrategy;
        // 默认主键生成器
        private final ShardingKeyGenerator defaultShardingKeyGenerator;
        // 主从规则
        private final Collection<MasterSlaveRule> masterSlaveRules;
        // 加密规则
        private final EncryptRule encryptRule;
        
        public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) {
            Preconditions.checkArgument(null != shardingRuleConfig, "ShardingRuleConfig cannot be null.");
            Preconditions.checkArgument(null != dataSourceNames && !dataSourceNames.isEmpty(), "Data sources cannot be empty.");
            this.ruleConfiguration = shardingRuleConfig;
            shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames);
            tableRules = createTableRules(shardingRuleConfig);
            broadcastTables = shardingRuleConfig.getBroadcastTables();
            bindingTableRules = createBindingTableRules(shardingRuleConfig.getBindingTableGroups());
            defaultDatabaseShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig());
            defaultTableShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultTableShardingStrategyConfig());
            defaultShardingKeyGenerator = createDefaultKeyGenerator(shardingRuleConfig.getDefaultKeyGeneratorConfig());
            masterSlaveRules = createMasterSlaveRules(shardingRuleConfig.getMasterSlaveRuleConfigs());
            encryptRule = createEncryptRule(shardingRuleConfig.getEncryptRuleConfig());
        }
    }
    
    ShardingRule

    TableRule 表规则

    @Getter
    @ToString(exclude = {"dataNodeIndexMap", "actualTables", "actualDatasourceNames", "datasourceToTablesMap"})
    public final class TableRule {
        // 逻辑表
        private final String logicTable;
        // 实际数据节点
        private final List<DataNode> actualDataNodes;
        @Getter(AccessLevel.NONE)
        // 实际表名
        private final Set<String> actualTables;
        @Getter(AccessLevel.NONE)
        private final Map<DataNode, Integer> dataNodeIndexMap;
        private final ShardingStrategy databaseShardingStrategy;
        private final ShardingStrategy tableShardingStrategy;
        private final String generateKeyColumn;
        private final ShardingKeyGenerator shardingKeyGenerator;
        private final Collection<String> actualDatasourceNames = new LinkedHashSet<>();
        private final Map<String, Collection<String>> datasourceToTablesMap = new HashMap<>();
        
        public TableRule(final String defaultDataSourceName, final String logicTableName) {
            logicTable = logicTableName.toLowerCase();
            actualDataNodes = Collections.singletonList(new DataNode(defaultDataSourceName, logicTableName));
            actualTables = getActualTables();
            cacheActualDatasourcesAndTables();
            dataNodeIndexMap = Collections.emptyMap();
            databaseShardingStrategy = null;
            tableShardingStrategy = null;
            generateKeyColumn = null;
            shardingKeyGenerator = null;
        }
    }
    
    TableRule

    ShardingRuleConfiguration 分片规则配置

    @Getter
    @Setter
    public final class ShardingRuleConfiguration implements RuleConfiguration {
        // 表规则配置
        private Collection<TableRuleConfiguration> tableRuleConfigs = new LinkedList<>();
        // 相同表分片规则的组,如果表分片规则相同,则可以放在一个组里。
        private Collection<String> bindingTableGroups = new LinkedList<>();
        // 广播的表
        private Collection<String> broadcastTables = new LinkedList<>();
        // 默认数据源名称
        private String defaultDataSourceName;
        // 默认数据库的分片算法配置
        private ShardingStrategyConfiguration defaultDatabaseShardingStrategyConfig;
        // 默认表的分片算法配置
        private ShardingStrategyConfiguration defaultTableShardingStrategyConfig;
        // 自动生成键的配置
        private KeyGeneratorConfiguration defaultKeyGeneratorConfig;
        // 主备配置信息
        private Collection<MasterSlaveRuleConfiguration> masterSlaveRuleConfigs = new LinkedList<>();
        // 加密配置规则
        private EncryptRuleConfiguration encryptRuleConfig;
    }
    
    ShardingRuleConfiguration

    TableRuleConfiguration 表配置

    @Getter
    @Setter
    public final class TableRuleConfiguration {
        // 逻辑表名
        private final String logicTable;
        // 实际物理的表,按照dataBase+table的维度
        private final String actualDataNodes;
        // database的分片策略
        private ShardingStrategyConfiguration databaseShardingStrategyConfig;
        // table的分片的策略
        private ShardingStrategyConfiguration tableShardingStrategyConfig;
        // 自动生成键的配置
        private KeyGeneratorConfiguration keyGeneratorConfig;
        
        public TableRuleConfiguration(final String logicTable) {
            this(logicTable, null);
        }
        
        public TableRuleConfiguration(final String logicTable, final String actualDataNodes) {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(logicTable), "LogicTable is required.");
            this.logicTable = logicTable;
            this.actualDataNodes = actualDataNodes;
        }
    }
    
    TableRuleConfiguration

    MasterSlaveRuleConfiguration 主从配置

    @Getter
    public class MasterSlaveRuleConfiguration implements RuleConfiguration {
        // 名称
        private final String name;
        // 主数据源
        private final String masterDataSourceName;
        // 从数据源
        private final List<String> slaveDataSourceNames;
        // 负载均衡算法
        private final LoadBalanceStrategyConfiguration loadBalanceStrategyConfiguration;
        
        public MasterSlaveRuleConfiguration(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
            this(name, masterDataSourceName, slaveDataSourceNames, null);
        }
        
        public MasterSlaveRuleConfiguration(final String name, 
                                            final String masterDataSourceName, final List<String> slaveDataSourceNames, final LoadBalanceStrategyConfiguration loadBalanceStrategyConfiguration) {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Name is required.");
            Preconditions.checkArgument(!Strings.isNullOrEmpty(masterDataSourceName), "MasterDataSourceName is required.");
            Preconditions.checkArgument(null != slaveDataSourceNames && !slaveDataSourceNames.isEmpty(), "SlaveDataSourceNames is required.");
            this.name = name;
            this.masterDataSourceName = masterDataSourceName;
            this.slaveDataSourceNames = slaveDataSourceNames;
            this.loadBalanceStrategyConfiguration = loadBalanceStrategyConfiguration;
        }
    }
    
    MasterSlaveRuleConfiguration

    配置关系图

    配置关系图

    JDBC核心组件

    • JDBC的核心模块包括DataSource、Connection、PreparedStatement和Statement、ResultSet。
    • Sharding-JDBC针对上述的核心进行封装,实现分库分表的目的。

    DataSource

    ShardingDataSource

     DataSource getShardingDataSource() throws SQLException {
          // 创建数据分配规则
         ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
         // 绑定order表的表配置
         shardingRuleConfig.getTableRuleConfigs().add(getOrderTableRuleConfiguration());
         //绑定order_item表的配置
         shardingRuleConfig.getTableRuleConfigs().add(getOrderItemTableRuleConfiguration());
         // 绑定相同分片规则的表
         shardingRuleConfig.getBindingTableGroups().add("t_order, t_order_item");
         // 绑定广播的表
         shardingRuleConfig.getBroadcastTables().add("t_config");
         // 绑定库表的分片策略
         shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("user_id", "ds${user_id % 2}"));
         shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("order_id", new ModuloShardingTableAlgorithm()));
         // 创建ShardingDataSource对象
         return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties());
     }
     
     private static KeyGeneratorConfiguration getKeyGeneratorConfiguration() {
         KeyGeneratorConfiguration result = new KeyGeneratorConfiguration("SNOWFLAKE", "order_id");
         return result;
     }
     
     TableRuleConfiguration getOrderTableRuleConfiguration() {
         TableRuleConfiguration result = new TableRuleConfiguration("t_order", "ds${0..1}.t_order${0..1}");
         result.setKeyGeneratorConfig(getKeyGeneratorConfiguration());
         return result;
     }
     
     TableRuleConfiguration getOrderItemTableRuleConfiguration() {
         TableRuleConfiguration result = new TableRuleConfiguration("t_order_item", "ds${0..1}.t_order_item${0..1}");
         return result;
     }
     
     Map<String, DataSource> createDataSourceMap() {
         Map<String, DataSource> result = new HashMap<>();
         result.put("ds0", DataSourceUtil.createDataSource("ds0"));
         result.put("ds1", DataSourceUtil.createDataSource("ds1"));
         return result;
     }
    
    public final class DataSourceUtil {
        
        private static final String HOST = "localhost";
        private static final int PORT = 3306;
        private static final String USER_NAME = "root";
        private static final String PASSWORD = "123456";
        
        public static DataSource createDataSource(final String dataSourceName) {
            HikariDataSource result = new HikariDataSource();
            result.setDriverClassName(com.mysql.jdbc.Driver.class.getName());
            result.setJdbcUrl(String.format("jdbc:mysql://%s:%s/%s?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8", HOST, PORT, dataSourceName));
            result.setUsername(USER_NAME);
            result.setPassword(PASSWORD);
            return result;
        }
    }
    
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class ShardingDataSourceFactory {
        
        public static DataSource createDataSource(
                final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException {
            return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
        }
    }
    
    @Getter
    public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource implements AutoCloseable {
        
        private final Map<String, DataSource> dataSourceMap;
        private final DatabaseType databaseType;
        
        public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
            this.dataSourceMap = dataSourceMap;
            databaseType = createDatabaseType();
        }
    }
    
    @Getter
    public class ShardingDataSource extends AbstractDataSourceAdapter {
        
        private final ShardingRuntimeContext runtimeContext;
        
        public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
            super(dataSourceMap);
            checkDataSourceType(dataSourceMap);
            runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
        }
        
        @Override
        public final ShardingConnection getConnection() {
            return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
        }
    }
    

    MasterSlaveDataSource

     DataSource getMasterSlaveDataSource() throws SQLException {
         MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration("ds_master_slave", "ds_master", Arrays.asList("ds_slave0", "ds_slave1"));
         return MasterSlaveDataSourceFactory.createDataSource(createDataSourceMap(), masterSlaveRuleConfig, new Properties());
     }
     
     Map<String, DataSource> createDataSourceMap() {
         Map<String, DataSource> result = new HashMap<>();
         result.put("ds_master", DataSourceUtil.createDataSource("ds_master"));
         result.put("ds_slave0", DataSourceUtil.createDataSource("ds_slave0"));
         result.put("ds_slave1", DataSourceUtil.createDataSource("ds_slave1"));
         return result;
     }
    
    public final class DataSourceUtil {
        
        private static final String HOST = "localhost";
        private static final int PORT = 3306;
        private static final String USER_NAME = "root";
        private static final String PASSWORD = "123456";
        
        public static DataSource createDataSource(final String dataSourceName) {
            HikariDataSource result = new HikariDataSource();
            result.setDriverClassName(com.mysql.jdbc.Driver.class.getName());
            result.setJdbcUrl(String.format("jdbc:mysql://%s:%s/%s?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8", HOST, PORT, dataSourceName));
            result.setUsername(USER_NAME);
            result.setPassword(PASSWORD);
            return result;
        }
    }
    
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class MasterSlaveDataSourceFactory {
        
        public static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException {
            return new MasterSlaveDataSource(dataSourceMap, new MasterSlaveRule(masterSlaveRuleConfig), props);
        }
    }
    
    @Getter
    public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource implements AutoCloseable {
        
        private final Map<String, DataSource> dataSourceMap;
        private final DatabaseType databaseType;
        
        public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
            this.dataSourceMap = dataSourceMap;
            databaseType = createDatabaseType();
        }
    }
    
    @Getter
    public class MasterSlaveDataSource extends AbstractDataSourceAdapter {
        
        private final MasterSlaveRuntimeContext runtimeContext;
        
        public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRule masterSlaveRule, final Properties props) throws SQLException {
            super(dataSourceMap);
            runtimeContext = new MasterSlaveRuntimeContext(dataSourceMap, masterSlaveRule, props, getDatabaseType());
        }
        
        @Override
        public final MasterSlaveConnection getConnection() {
            return new MasterSlaveConnection(getDataSourceMap(), runtimeContext);
        }
    }
    

    DataSource类图

    DataSource

    Connection

    ShardingConnection

    @Getter
    public final class ShardingConnection extends AbstractConnectionAdapter {
        // 绑定的dataSourceMap
        private final Map<String, DataSource> dataSourceMap;
        private final ShardingRuntimeContext runtimeContext;
        private final TransactionType transactionType;
        private final ShardingTransactionManager shardingTransactionManager;
        
        public ShardingConnection(final Map<String, DataSource> dataSourceMap, final ShardingRuntimeContext runtimeContext, final TransactionType transactionType) {
            this.dataSourceMap = dataSourceMap;
            this.runtimeContext = runtimeContext;
            this.transactionType = transactionType;
            shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);
        }
    
        @Override
        public PreparedStatement prepareStatement(final String sql) throws SQLException {
            return new ShardingPreparedStatement(this, sql);
        }
    
        @Override
        public Statement createStatement() {
            return new ShardingStatement(this);
        }
    }
    

    MasterSlaveConnection

    @RequiredArgsConstructor
    @Getter
    public final class MasterSlaveConnection extends AbstractConnectionAdapter {
        // 绑定的dataSourceMap
        private final Map<String, DataSource> dataSourceMap;
        private final MasterSlaveRuntimeContext runtimeContext;
    
        @Override
        public Statement createStatement() {
            return new MasterSlaveStatement(this);
        }
        
        @Override
        public PreparedStatement prepareStatement(final String sql) throws SQLException {
            return new MasterSlavePreparedStatement(this, sql);
        }
    }
    

    Connection类图

    Connection类图

    PreparedStatement

    ShardingPreparedStatement

    public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
        
        @Getter
        private final ShardingConnection connection;
        private final String sql;
        private final PreparedQueryShardingEngine shardingEngine;
        private final PreparedStatementExecutor preparedStatementExecutor;
        private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
        private SQLRouteResult sqlRouteResult;
        private ResultSet currentResultSet;
    
        private ShardingPreparedStatement(
                final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys)
                throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
            this.connection = connection;
            this.sql = sql;
            ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
            shardingEngine = new PreparedQueryShardingEngine(sql, runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine());
            preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
            batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
        }
        
        @Override
        public ResultSet executeQuery() throws SQLException {
            ResultSet result;
            try {
                clearPrevious();
                shard();
                initPreparedStatementExecutor();
                MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), 
                        connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), preparedStatementExecutor.executeQuery());
                result = getResultSet(mergeEngine);
            } finally {
                clearBatch();
            }
            currentResultSet = result;
            return result;
        }
    
        @Override
        public int executeUpdate() throws SQLException {
            try {
                clearPrevious();
                shard();
                initPreparedStatementExecutor();
                return preparedStatementExecutor.executeUpdate();
            } finally {
                clearBatch();
            }
        }
        
        @Override
        public boolean execute() throws SQLException {
            try {
                clearPrevious();
                shard();
                initPreparedStatementExecutor();
                return preparedStatementExecutor.execute();
            } finally {
                clearBatch();
            }
        }
    }
    

    MasterSlavePreparedStatement

    @Getter
    public final class MasterSlavePreparedStatement extends AbstractMasterSlavePreparedStatementAdapter {
        
        private final MasterSlaveConnection connection;
        
        @Getter(AccessLevel.NONE)
        private final MasterSlaveRouter masterSlaveRouter;
    
        public MasterSlavePreparedStatement(
                final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
            this.connection = connection;
            masterSlaveRouter = new MasterSlaveRouter(connection.getRuntimeContext().getRule(), connection.getRuntimeContext().getParseEngine(), 
                    connection.getRuntimeContext().getProps().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
            for (String each : masterSlaveRouter.route(sql, true)) {
                PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
                routedStatements.add(preparedStatement);
            }
        }
    
        @Override
        public ResultSet executeQuery() throws SQLException {
            Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeQuery for DDL");
            return routedStatements.iterator().next().executeQuery();
        }
        
        @Override
        public int executeUpdate() throws SQLException {
            int result = 0;
            for (PreparedStatement each : routedStatements) {
                result += each.executeUpdate();
            }
            return result;
        }
        
        @Override
        public boolean execute() throws SQLException {
            boolean result = false;
            for (PreparedStatement each : routedStatements) {
                result = each.execute();
            }
            return result;
        }
    }
    

    PreparedStatement类图

    PreparedStatement类图

    Statement

    ShardingStatement

    public final class ShardingStatement extends AbstractStatementAdapter {
        
        @Getter
        private final ShardingConnection connection;
        private final StatementExecutor statementExecutor;
        private boolean returnGeneratedKeys;
        private SQLRouteResult sqlRouteResult;
        private ResultSet currentResultSet;
    
        public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
            super(Statement.class);
            this.connection = connection;
            statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);
        }
    
        @Override
        public ResultSet executeQuery(final String sql) throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
            ResultSet result;
            try {
                clearPrevious();
                shard(sql);
                initStatementExecutor();
                MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), 
                        connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), statementExecutor.executeQuery());
                result = getResultSet(mergeEngine);
            } finally {
                currentResultSet = null;
            }
            currentResultSet = result;
            return result;
        }
    
        @Override
        public int executeUpdate(final String sql) throws SQLException {
            try {
                clearPrevious();
                shard(sql);
                initStatementExecutor();
                return statementExecutor.executeUpdate();
            } finally {
                currentResultSet = null;
            }
        }
        
        @Override
        public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
            if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
                returnGeneratedKeys = true;
            }
            try {
                clearPrevious();
                shard(sql);
                initStatementExecutor();
                return statementExecutor.executeUpdate(autoGeneratedKeys);
            } finally {
                currentResultSet = null;
            }
        }
    }
    

    MasterSlaveStatement

    @Getter
    public final class MasterSlaveStatement extends AbstractStatementAdapter {
        
        private final MasterSlaveConnection connection;
        @Getter(AccessLevel.NONE)
        private final MasterSlaveRouter masterSlaveRouter;
        private final int resultSetType;
        private final int resultSetConcurrency;
        private final int resultSetHoldability;
        private final Collection<Statement> routedStatements = new LinkedList<>();
    
        public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
            super(Statement.class);
            this.connection = connection;
            masterSlaveRouter = new MasterSlaveRouter(connection.getRuntimeContext().getRule(), connection.getRuntimeContext().getParseEngine(),
                    connection.getRuntimeContext().getProps().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
            this.resultSetType = resultSetType;
            this.resultSetConcurrency = resultSetConcurrency;
            this.resultSetHoldability = resultSetHoldability;
        }
    
        @Override
        public ResultSet executeQuery(final String sql) throws SQLException {
            if (Strings.isNullOrEmpty(sql)) {
                throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
            }
            clearPrevious();
            Collection<String> dataSourceNames = masterSlaveRouter.route(sql, false);
            Preconditions.checkState(1 == dataSourceNames.size(), "Cannot support executeQuery for DML or DDL");
            Statement statement = connection.getConnection(dataSourceNames.iterator().next()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
            routedStatements.add(statement);
            return statement.executeQuery(sql);
        }
        
        @Override
        public int executeUpdate(final String sql) throws SQLException {
            clearPrevious();
            int result = 0;
            for (String each : masterSlaveRouter.route(sql, false)) {
                Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
                routedStatements.add(statement);
                result += statement.executeUpdate(sql);
            }
            return result;
        }
    }
    

    Statement类图

    Statement类图

    ResultSet

    ShardingResultSet

    public final class ShardingResultSet extends AbstractResultSetAdapter {
        
        private final MergedResult mergeResultSet;
        
        private final Map<String, Integer> columnLabelAndIndexMap;
        
        public ShardingResultSet(final List<ResultSet> resultSets, final MergedResult mergeResultSet, final Statement statement, final SQLRouteResult sqlRouteResult) throws SQLException {
            super(resultSets, statement, sqlRouteResult);
            this.mergeResultSet = mergeResultSet;
            columnLabelAndIndexMap = createColumnLabelAndIndexMap(resultSets.get(0).getMetaData());
        }
        
        private Map<String, Integer> createColumnLabelAndIndexMap(final ResultSetMetaData resultSetMetaData) throws SQLException {
            Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
            for (int columnIndex = resultSetMetaData.getColumnCount(); columnIndex > 0; columnIndex--) {
                result.put(resultSetMetaData.getColumnLabel(columnIndex), columnIndex);
            }
            return result;
        }
    }
    
    ResultSet类图

    shardingjdbc目录结构

    ├── api
    │   ├── EncryptDataSourceFactory.java
    │   ├── MasterSlaveDataSourceFactory.java
    │   ├── ShardingDataSourceFactory.java
    │   └── yaml
    │       ├── YamlEncryptDataSourceFactory.java
    │       ├── YamlMasterSlaveDataSourceFactory.java
    │       └── YamlShardingDataSourceFactory.java
    ├── executor
    │   ├── AbstractStatementExecutor.java
    │   ├── BatchPreparedStatementExecutor.java
    │   ├── PreparedStatementExecutor.java
    │   ├── SQLExecuteCallbackFactory.java
    │   └── StatementExecutor.java
    └── jdbc
        ├── adapter
        │   ├── AbstractConnectionAdapter.java
        │   ├── AbstractDataSourceAdapter.java
        │   ├── AbstractMasterSlavePreparedStatementAdapter.java
        │   ├── AbstractResultSetAdapter.java
        │   ├── AbstractShardingPreparedStatementAdapter.java
        │   ├── AbstractStatementAdapter.java
        │   ├── WrapperAdapter.java
        │   ├── executor
        │   └── invocation
        ├── core
        │   ├── connection
        │   ├── constant
        │   ├── context
        │   ├── datasource
        │   ├── resultset
        │   └── statement
        ├── metadata
        │   └── JDBCTableMetaDataConnectionManager.java
        └── unsupported
            ├── AbstractUnsupportedDatabaseMetaDataResultSet.java
            ├── AbstractUnsupportedGeneratedKeysResultSet.java
            ├── AbstractUnsupportedOperationConnection.java
            ├── AbstractUnsupportedOperationDataSource.java
            ├── AbstractUnsupportedOperationPreparedStatement.java
            ├── AbstractUnsupportedOperationResultSet.java
            ├── AbstractUnsupportedOperationStatement.java
            └── AbstractUnsupportedUpdateOperationResultSet.java
    
    
    

    参考文章

    相关文章

      网友评论

        本文标题:Sharding-JDBC 核心组件介绍

        本文链接:https://www.haomeiwen.com/subject/xfrjahtx.html