美文网首页
sharding-jdbc 执行流程源码分析-初始化

sharding-jdbc 执行流程源码分析-初始化

作者: 田才 | 来源:发表于2020-04-25 16:33 被阅读0次

    和其他应对高并发,数据量大的方案比,分库分表通常是最实用,最朴素的一个方案,既简单又有效。但是分库分表后怎么可以对业务层代码影响降到最小,是程序员们需要解决的问题,下边看一下一个开源组件 sharding-jdbc,应用程序只需引入jar然后通过编写分片策略方法、和相关的配置,即可实现分库分表。
    本篇文章主要解释以下内容,基于 4.0.1 版本

    研究源码前,首先思考几个问题

    1、sharding-jdbc 为什么可以和 Mybatis\ibaits\hibernate\jpa\spring data template 结合使用,换句话说:为什么不依赖orm
    2、内部运行流程分别是怎么实现的
    (1)初始化过程
    (2)sql 解析
    (3)sql 提取
    (4)sql 路由
    (5)sql 替换
    (6)sql 执行

    1、sharding-jdbc 为什么不依赖orm

    sharding-jdbc 的执行入口是 ShardingDataSource 此类实现了 javax.sql.DataSource ,javax 的 x 是extension 的意思,也就是扩展包,为了使 java 基础包更加通用,在上边加了一层扩展。javax.sql.DataSource 接口只有一个方法 getConnection , 几乎所有的 orm 框架都是用 DataSource 接口获取数据库连接的。 所以才使得 sharding-jdbc 不依赖具体的 orm 层框架成为可能。

    其中 ShardingDataSource 构造函数就是,初始化的入口。
    其中 getConnection 方法,是执行阶段的入口。

    public class ShardingDataSource extends AbstractDataSourceAdapter {
        private final ShardingRuntimeContext runtimeContext;
        //初始化阶段
        public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
            //匹配数据库类型
            super(dataSourceMap);
            checkDataSourceType(dataSourceMap);
            runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
        }
        private void checkDataSourceType(final Map<String, DataSource> dataSourceMap) {
            for (DataSource each : dataSourceMap.values()) {
                Preconditions.checkArgument(!(each instanceof MasterSlaveDataSource), "Initialized data sources can not be master-slave data sources.");
            }
        }
        //执行阶段
        @Override
        public final ShardingConnection getConnection() {
            return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
        }
    }
    

    2、内部运行流程分别是怎么实现的

    (1)初始化阶段

    涉及的相关类 uml 图


    image.png

    初始化分为3个步骤:

    • 根据配置信息 shardingRuleConfiguration 创建分片规则 ShardingRule
    • 匹配数据库类型 databaseType
    • 创建 sql 解析引擎 SQLParseEngine 、sql 执行引擎 ShardingExecuteEngine
    • 保存数据库、数据表信息。

    下边我们一个一个分析,以 spring namespace 方法创建为例。

    1、根据配置信息 shardingRuleConfiguration 创建分片规则 ShardingRule
    public class SpringShardingDataSource extends ShardingDataSource {
        public SpringShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfiguration, final Properties props) throws SQLException {
            //new ShardingRule 创建分片规则
            super(dataSourceMap, new ShardingRule(shardingRuleConfiguration, dataSourceMap.keySet()), props);
        }
    }
    

    spring namespace 会根据 xml 的配置创建好用户配置信息类ShardingRuleConfiguration 包括:

    • TableRuleConfiguration:逻辑表名 + 可路由到的数据数据节点名(数据源名+表名)支持表达式
    • bindingTableGroups 绑定表组:例如order + order_item 表
    • broadcastTables 广播表:例如配置信息需要每个数据库都有一张这样的表,需要修改的时候,那么需要广播对吧!
    • defaultDataSourceName 默认数据源名称
    • defaultDatabaseShardingStrategyConfig 默认分库策略
    • defaultTableShardingStrategyConfig 默认分表策略
      其中分片策略配置包括:暗示策略Hint基于ThraedLocal
      表达式分片策略Inline基于groovy 的 Inline 表达式
      复合分片策略 Complex 基于多列 、不分库策略None,标准策略Standard基于单列。

    然后根据 shardingRuleConfiguration和数据源集合,创建分片规则 ShardingRule

    public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) {
            Preconditions.checkArgument(null != shardingRuleConfig, "ShardingRuleConfig cannot be null.");
            Preconditions.checkArgument(null != dataSourceNames && !dataSourceNames.isEmpty(), "Data sources cannot be empty.");
            this.ruleConfiguration = shardingRuleConfig;
            //根据配置创建所有数据源集合,主从复制需要特殊处理(只是保留一个主从公共数据源)
            shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames);
            //根据分片配置,创建分片规则
            tableRules = createTableRules(shardingRuleConfig);
            broadcastTables = shardingRuleConfig.getBroadcastTables();
            //创建 表分片key与分片规则完全一致 的规则
            bindingTableRules = createBindingTableRules(shardingRuleConfig.getBindingTableGroups());
            //创建默认的分库策略
            defaultDatabaseShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig());
            defaultTableShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultTableShardingStrategyConfig());
            defaultShardingKeyGenerator = createDefaultKeyGenerator(shardingRuleConfig.getDefaultKeyGeneratorConfig());
            //主从规则
            masterSlaveRules = createMasterSlaveRules(shardingRuleConfig.getMasterSlaveRuleConfigs());
            encryptRule = createEncryptRule(shardingRuleConfig.getEncryptRuleConfig());
        }
    

    其中最重要的方法 createTableRules 根据分片配置以逻辑表为单位创建数据节点,通常一个TableRule(一个逻辑表规则) 对应多个分表,分库。

       //以每个表为单位创建分片规则
        private Collection<TableRule> createTableRules(final ShardingRuleConfiguration shardingRuleConfig) {
            Collection<TableRuleConfiguration> tableRuleConfigurations = shardingRuleConfig.getTableRuleConfigs();
            Collection<TableRule> result = new ArrayList<>(tableRuleConfigurations.size());
            for (TableRuleConfiguration each : tableRuleConfigurations) {
                //创建分片规则
                result.add(new TableRule(each, shardingDataSourceNames, getDefaultGenerateKeyColumn(shardingRuleConfig)));
            }
            return result;
        }
    

    然后创建初始化 TableRule

    1、将逻辑表名转成小写
    2、获取用户配置的真实表名,因为可以配置表达式所以这里需要计算一下。
    3、将真实 datasource + 表名 (逗号分隔) 封装成 DataNode
    如果用户没有配置真实表,那么默认使用逻辑表名充当真实表名
    4、根据分库策略配置项,创建分库策略。根据分表策略配置项,创建分表策略
    5、非默认策略必须配置一个想要路由到的真实表

        //tableRuleConfig 为分片规则配置项
        public TableRule(final TableRuleConfiguration tableRuleConfig, final ShardingDataSourceNames shardingDataSourceNames, final String defaultGenerateKeyColumn) {
            //将逻辑表名转成小写
            logicTable = tableRuleConfig.getLogicTable().toLowerCase();
            //将真实表名表达式,转换为多个 datasource +表名
            List<String> dataNodes = new InlineExpressionParser(tableRuleConfig.getActualDataNodes()).splitAndEvaluate();
            //按照顺序存储 DataNode (datasource +表名)
            dataNodeIndexMap = new HashMap<>(dataNodes.size(), 1);
            //将真实 datasource + 表名 封装成 DataNode
            actualDataNodes = isEmptyDataNodes(dataNodes)
                ? generateDataNodes(tableRuleConfig.getLogicTable(), shardingDataSourceNames.getDataSourceNames()) : generateDataNodes(dataNodes, shardingDataSourceNames.getDataSourceNames());
            //单纯存放真实表名,去重
            actualTables = getActualTables();
            //根据分库策略配置项,创建分库策略
            databaseShardingStrategy = null == tableRuleConfig.getDatabaseShardingStrategyConfig() ? null : ShardingStrategyFactory.newInstance(tableRuleConfig.getDatabaseShardingStrategyConfig());
            //根据分表策略配置项,创建分表策略
            tableShardingStrategy = null == tableRuleConfig.getTableShardingStrategyConfig() ? null : ShardingStrategyFactory.newInstance(tableRuleConfig.getTableShardingStrategyConfig());
            generateKeyColumn = getGenerateKeyColumn(tableRuleConfig.getKeyGeneratorConfig(), defaultGenerateKeyColumn);
            shardingKeyGenerator = containsKeyGeneratorConfiguration(tableRuleConfig)
                    ? new ShardingKeyGeneratorServiceLoader().newService(tableRuleConfig.getKeyGeneratorConfig().getType(), tableRuleConfig.getKeyGeneratorConfig().getProperties()) : null;
            //非默认策略必须配置一个想要路由到的真实表
            checkRule(dataNodes);
        }
    

    将真实 datasource + 表名 (逗号分隔) 封装成 DataNode

        private List<DataNode> generateDataNodes(final String logicTable, final Collection<String> dataSourceNames) {
            List<DataNode> result = new LinkedList<>();
            int index = 0;
            //为每个数据源创建一套
            for (String each : dataSourceNames) {
                DataNode dataNode = new DataNode(each, logicTable);
                result.add(dataNode);
                //保存数据节点的顺序
                dataNodeIndexMap.put(dataNode, index);
                actualDatasourceNames.add(each);
                //按照数据源维度,存放真实数据表
                addActualTable(dataNode.getDataSourceName(), dataNode.getTableName());
                index++;
            }
            return result;
        }
        public DataNode(final String dataNode) {
            if (!isValidDataNode(dataNode)) {
                throw new ShardingConfigurationException("Invalid format for actual data nodes: '%s'", dataNode);
            }
            //根据.分隔 前半部分为数据源,后边为真实表
            List<String> segments = Splitter.on(DELIMITER).splitToList(dataNode);
            dataSourceName = segments.get(0);
            tableName = segments.get(1);
        }
    

    根据分库策略配置项,创建分库策略。根据分表策略配置项,创建分表策略,这里分表策略和分库策略的创建逻辑都是一样的。包括默认的分库分表策略。

    1、标准分片策略
    2、Inline表达式分片策略
    3、复合分片策略
    4、threadLocal 分片策略

      public static ShardingStrategy newInstance(final ShardingStrategyConfiguration shardingStrategyConfig) {
            //标准分片策略
            if (shardingStrategyConfig instanceof StandardShardingStrategyConfiguration) {
                return new StandardShardingStrategy((StandardShardingStrategyConfiguration) shardingStrategyConfig);
            }
            //Inline表达式分片策略
            if (shardingStrategyConfig instanceof InlineShardingStrategyConfiguration) {
                return new InlineShardingStrategy((InlineShardingStrategyConfiguration) shardingStrategyConfig);
            }
            //复合分片策略
            if (shardingStrategyConfig instanceof ComplexShardingStrategyConfiguration) {
                return new ComplexShardingStrategy((ComplexShardingStrategyConfiguration) shardingStrategyConfig);
            }
            //threadLocal 分片策略
            if (shardingStrategyConfig instanceof HintShardingStrategyConfiguration) {
                return new HintShardingStrategy((HintShardingStrategyConfiguration) shardingStrategyConfig);
            }
            return new NoneShardingStrategy();
        }
    
      //标准分片策略
      public StandardShardingStrategy(final StandardShardingStrategyConfiguration standardShardingStrategyConfig) {
            Preconditions.checkNotNull(standardShardingStrategyConfig.getShardingColumn(), "Sharding column cannot be null.");
            Preconditions.checkNotNull(standardShardingStrategyConfig.getPreciseShardingAlgorithm(), "precise sharding algorithm cannot be null.");
            //用于分片的列
            shardingColumn = standardShardingStrategyConfig.getShardingColumn();
            //是必选的 用于处理=和IN的分片
            preciseShardingAlgorithm = standardShardingStrategyConfig.getPreciseShardingAlgorithm();
            //是可选的 用于处理BETWEEN AND分片,如果不配置 RangeShardingAlgorithm,SQL中的 BETWEEN AND 将按照全库路由处理
            rangeShardingAlgorithm = standardShardingStrategyConfig.getRangeShardingAlgorithm();
        }
        /**
         * Inline表达式分片策略。使用Groovy的Inline表达式,提供对SQL语句中的=和IN的分片操作支持。
         * InlineShardingStrategy只支持单分片键,对于简单的分片算法,可以通过简单的配置使用,从而避免繁琐的Java代码开发,
         * 如: tuser${user_id % 8} 表示t_user表按照user_id按8取模分成8个表,表名称为t_user_0 到 t_user_7。
         */
        public InlineShardingStrategy(final InlineShardingStrategyConfiguration inlineShardingStrategyConfig) {
            Preconditions.checkNotNull(inlineShardingStrategyConfig.getShardingColumn(), "Sharding column cannot be null.");
            Preconditions.checkNotNull(inlineShardingStrategyConfig.getAlgorithmExpression(), "Sharding algorithm expression cannot be null.");
            shardingColumn = inlineShardingStrategyConfig.getShardingColumn();
            String algorithmExpression = InlineExpressionParser.handlePlaceHolder(inlineShardingStrategyConfig.getAlgorithmExpression().trim());
            closure = new InlineExpressionParser(algorithmExpression).evaluateClosure();
        }
        //复合分片策略。提供对SQL语句中的 =, IN 和 BETWEEN AND 的分片操作支持。
        public ComplexShardingStrategy(final ComplexShardingStrategyConfiguration complexShardingStrategyConfig) {
            Preconditions.checkNotNull(complexShardingStrategyConfig.getShardingColumns(), "Sharding columns cannot be null.");
            Preconditions.checkNotNull(complexShardingStrategyConfig.getShardingAlgorithm(), "Sharding algorithm cannot be null.");
            shardingColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
            shardingColumns.addAll(Splitter.on(",").trimResults().splitToList(complexShardingStrategyConfig.getShardingColumns()));
            shardingAlgorithm = complexShardingStrategyConfig.getShardingAlgorithm();
        }
        //通过Hint而非SQL解析的方式分片的策略
        public HintShardingStrategy(final HintShardingStrategyConfiguration hintShardingStrategyConfig) {
            Preconditions.checkNotNull(hintShardingStrategyConfig.getShardingAlgorithm(), "Sharding algorithm cannot be null.");
            shardingColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
            shardingAlgorithm = hintShardingStrategyConfig.getShardingAlgorithm();
        }
    

    这里可能有个误区,分片策略不是自定义的吗?怎么会出现在源码中。sharding jdbc 分片策略不能自定义,分片算法是自定义的,上边代码中,getShardingAlgorithm() 就是在获取用户自定义的算法。

    2、匹配数据库类型 databaseType

    在初始化 ShardingDataSource 时候会调用父类AbstractDataSourceAdapter 构造方法,根据数据库连接的url进行匹配数据库类型

       public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
            this.dataSourceMap = dataSourceMap;
            databaseType = createDatabaseType();
        }
        //根据数据数据库连接 url 匹配找到数据库类型
        private DatabaseType createDatabaseType(final DataSource dataSource) throws SQLException {
            if (dataSource instanceof AbstractDataSourceAdapter) {
                return ((AbstractDataSourceAdapter) dataSource).databaseType;
            }
            try (Connection connection = dataSource.getConnection()) {
                return DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL());
            }
        }
        private static boolean matchStandardURL(final String url, final DatabaseType databaseType) {
            return url.startsWith(String.format("jdbc:%s:", databaseType.getName().toLowerCase()));
        }
    
    3、创建 sql 解析引擎 SQLParseEngine 、sql 执行引擎 ShardingExecuteEngine

    1、创建 ShardingRuntimeContext 调用父类 AbstractRuntimeContext 的构造方法
    2、创建运行时候需要的一些配置信息,包括是否打印sql,单次执行消耗的数据库连接数、并行执行线程池大小等等
    3、创建 sql 执行引擎
    4、创建 sql 解析引擎

        protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
            this.rule = rule;
            //运行时候的一些配置信息
            this.props = new ShardingProperties(null == props ? new Properties() : props);
            this.databaseType = databaseType;
            //创建 sql 执行
            executeEngine = new ShardingExecuteEngine(this.props.<Integer>getValue(ShardingPropertiesConstant.EXECUTOR_SIZE));
            //根据数据库类型初始化,sql 解析引擎
            parseEngine = SQLParseEngineFactory.getSQLParseEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
            ConfigurationLogger.log(rule.getRuleConfiguration());
            ConfigurationLogger.log(props);
        }
    
    创建 sql 执行引擎

    sql 执行引擎 ShardingExecuteEngine 复合了sql 执行服务ShardingExecutorService,在创建ShardingExecuteEngine 时同时创建了 ShardingExecutorService 一对一的关系,这里涉及到了guava 的 MoreExecutors 对jdk Executor 的增强

        public ShardingExecutorService(final int executorSize, final String nameFormat) {
            executorService = MoreExecutors.listeningDecorator(getExecutorService(executorSize, nameFormat));
            //java 系统默认创建的线程都是用户线程,也就是说如果 用户线程不结束,main 方法不会结束的。
            //在线程 start 之前当调用 setDaemon 时,此线程就是守护线程了,main 方法不会等待此线程运行结束
            //非主动结束进程  或调用 System.exit 时无论什么线程都会结束
            //MoreExecutors 是 gava 对 jdk 线程池的增强
            //加 60 秒的 jvm 关闭钩子, 在 jvm 中已经没有用户线程在运行了,那么 等待 60 秒后关闭线程池
            MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
        }
        //创建线程池,如果配置参数 executorSize 为0那么创建 CachedThreadPool 核心线程数量为0 最大线程数量为 Integer.MAX_VALUE
        private ExecutorService getExecutorService(final int executorSize, final String nameFormat) {
            //线程名字为 ShardingSphere 开头
            ThreadFactory shardingThreadFactory = ShardingThreadFactoryBuilder.build(nameFormat);
            return 0 == executorSize ? Executors.newCachedThreadPool(shardingThreadFactory) : Executors.newFixedThreadPool(executorSize, shardingThreadFactory);
        }
    
    sql 解析引擎

    1、根据数据库名称创建 SQLParseEngine
    这里还有一个误区,不是每个数据都应该有对应的 SQLParseEngine 吗?其实类都是同一个 SQLParseEngine 只是其中 String databaseTypeName 不同而已,虽然调用了构造函数但是此处的初始化还没有完成,真正完成在第一次解析sql的时候。
    2、第一次解析sql。SQLParseEngine.parse0
    (1)是否用缓存中已解析好的结果
    (2)创建 SQLParseKernel 并且调用了 ParseRuleRegistry.getInstance() 完整初始化过程。

     public static SQLParseEngine getSQLParseEngine(final String databaseTypeName) {
            if (ENGINES.containsKey(databaseTypeName)) {
                return ENGINES.get(databaseTypeName);
            }
            //这里同步保证每个数据库只有一个 SQLParseEngine 实例
            synchronized (ENGINES) {
                if (ENGINES.containsKey(databaseTypeName)) {
                    return ENGINES.get(databaseTypeName);
                }
                SQLParseEngine result = new SQLParseEngine(databaseTypeName);
                ENGINES.put(databaseTypeName, result);
                return result;
            }
        }
    
    第一次解析sql。SQLParseEngine.parse0

    因为第一次执行解析 sql 所以虚拟机会加载 ParseRuleRegistry 到内存,并且调用 ParseRuleRegistry 中的静态代码块。
    1、利用 java spi 机制加载 SQLParserEntry 的实现类到内存,SQLParserEntry 的实现类包括MySQLParserEntry,OracleParserEntry ,分别在不同的模块中,所以可以按需引入。
    2、加载 sql 解析规范例如 sql-statement-rule-definition.xml 文件并且解析存放在Map<String, SQLStatementRuleDefinition> sqlStatementRuleDefinitions 中。

     private SQLStatement parse0(final String sql, final boolean useCache) {
            //是否用缓存中已解析好的结果
            if (useCache) {
                Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
                if (cachedSQLStatement.isPresent()) {
                    return cachedSQLStatement.get();
                }
            }
            // ParseRuleRegistry.getInstance() 获取定的一些规范,和配置信息
            SQLStatement result = new SQLParseKernel(ParseRuleRegistry.getInstance(), databaseTypeName, sql).parse();
            if (useCache) {
                cache.put(sql, result);
            }
            return result;
        }
    

    ParseRuleRegistry.getInstance()

     static {
            //spi 机制加载注册的 sql 语法解析类型,例如 oracl mysql
            NewInstanceServiceLoader.register(SQLParserEntry.class);
            instance = new ParseRuleRegistry();
        }
    
    //key 接口类型, value 实现类集合
    private static final Map<Class, Collection<Class<?>>> SERVICE_MAP = new HashMap<>();
       /**
         * 根据 ServiceLoader java util spi 机制获取相应的 service 目录默认在 META-INF/services/
         */
        public static <T> void register(final Class<T> service) {
            for (T each : ServiceLoader.load(service)) {
                registerServiceClass(service, each);
            }
        }
    

    加载 sql 解析规范例如 sql-statement-rule-definition.xml 文件并且解析存放在Map<String, SQLStatementRuleDefinition> sqlStatementRuleDefinitions 中,key 为数据库名称,value 为规则类。

        /**
         * load sql 解析规范 例如sql-statement-rule-definition.xml
         */
        private void initParseRuleDefinition() {
            ExtractorRuleDefinitionEntity generalExtractorRuleEntity = extractorRuleLoader.load(RuleDefinitionFileConstant.getExtractorRuleDefinitionFile());
            FillerRuleDefinitionEntity generalFillerRuleEntity = fillerRuleLoader.load(RuleDefinitionFileConstant.getFillerRuleDefinitionFile());
            for (SQLParserEntry each : NewInstanceServiceLoader.newServiceInstances(SQLParserEntry.class)) {
                String databaseTypeName = each.getDatabaseTypeName();
                fillerRuleDefinitions.put(databaseTypeName, createFillerRuleDefinition(generalFillerRuleEntity, databaseTypeName));
                sqlStatementRuleDefinitions.put(databaseTypeName, createSQLStatementRuleDefinition(generalExtractorRuleEntity, databaseTypeName));
            }
        }
    
    那随后的执行阶段就可以根据数据库名取到对应的解析方法了。
        public SQLAST parse() {
            //根据数据库名获取,对应的解析类
            SQLParser sqlParser = SQLParserFactory.newInstance(databaseTypeName, sql);
            ....略
            //根据数据库名获取,对应的解析规则类
            SQLStatementRule rule = parseRuleRegistry.getSQLStatementRule(databaseTypeName, parseTree.getClass().getSimpleName());
            if (null == rule) {
                throw new SQLParsingException(String.format("Unsupported SQL of `%s`", sql));
            }
            return new SQLAST((ParserRuleContext) parseTree, getParameterMarkerIndexes((ParserRuleContext) parseTree), rule);
        }
    

    至此 sharding jdbc 4.0.1 初始化阶段已经完成,关于
    2、内部运行流程分别是怎么实现的
    (1)初始化过程
    (2)sql 解析
    (3)sql 提取
    (4)sql 路由
    (5)sql 替换
    (6)sql 执行
    中的 2、3、4、5、6 下次分析。

    相关文章

      网友评论

          本文标题:sharding-jdbc 执行流程源码分析-初始化

          本文链接:https://www.haomeiwen.com/subject/wduhwhtx.html