和其他应对高并发,数据量大的方案比,分库分表通常是最实用,最朴素的一个方案,既简单又有效。但是分库分表后怎么可以对业务层代码影响降到最小,是程序员们需要解决的问题,下边看一下一个开源组件 sharding-jdbc,应用程序只需引入jar然后通过编写分片策略方法、和相关的配置,即可实现分库分表。
本篇文章主要解释以下内容,基于 4.0.1 版本
研究源码前,首先思考几个问题
1、sharding-jdbc 为什么可以和 Mybatis\ibaits\hibernate\jpa\spring data template 结合使用,换句话说:为什么不依赖orm
2、内部运行流程分别是怎么实现的
(1)初始化过程
(2)sql 解析
(3)sql 提取
(4)sql 路由
(5)sql 替换
(6)sql 执行
1、sharding-jdbc 为什么不依赖orm
sharding-jdbc 的执行入口是 ShardingDataSource 此类实现了 javax.sql.DataSource ,javax 的 x 是extension 的意思,也就是扩展包,为了使 java 基础包更加通用,在上边加了一层扩展。javax.sql.DataSource 接口只有一个方法 getConnection , 几乎所有的 orm 框架都是用 DataSource 接口获取数据库连接的。 所以才使得 sharding-jdbc 不依赖具体的 orm 层框架成为可能。
其中 ShardingDataSource 构造函数就是,初始化的入口。
其中 getConnection 方法,是执行阶段的入口。
public class ShardingDataSource extends AbstractDataSourceAdapter {
private final ShardingRuntimeContext runtimeContext;
//初始化阶段
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
//匹配数据库类型
super(dataSourceMap);
checkDataSourceType(dataSourceMap);
runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
}
private void checkDataSourceType(final Map<String, DataSource> dataSourceMap) {
for (DataSource each : dataSourceMap.values()) {
Preconditions.checkArgument(!(each instanceof MasterSlaveDataSource), "Initialized data sources can not be master-slave data sources.");
}
}
//执行阶段
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}
}
2、内部运行流程分别是怎么实现的
(1)初始化阶段
涉及的相关类 uml 图
image.png
初始化分为3个步骤:
- 根据配置信息 shardingRuleConfiguration 创建分片规则 ShardingRule
- 匹配数据库类型 databaseType
- 创建 sql 解析引擎 SQLParseEngine 、sql 执行引擎 ShardingExecuteEngine
- 保存数据库、数据表信息。
下边我们一个一个分析,以 spring namespace 方法创建为例。
1、根据配置信息 shardingRuleConfiguration 创建分片规则 ShardingRule
public class SpringShardingDataSource extends ShardingDataSource {
public SpringShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfiguration, final Properties props) throws SQLException {
//new ShardingRule 创建分片规则
super(dataSourceMap, new ShardingRule(shardingRuleConfiguration, dataSourceMap.keySet()), props);
}
}
spring namespace 会根据 xml 的配置创建好用户配置信息类ShardingRuleConfiguration 包括:
- TableRuleConfiguration:逻辑表名 + 可路由到的数据数据节点名(数据源名+表名)支持表达式
- bindingTableGroups 绑定表组:例如order + order_item 表
- broadcastTables 广播表:例如配置信息需要每个数据库都有一张这样的表,需要修改的时候,那么需要广播对吧!
- defaultDataSourceName 默认数据源名称
- defaultDatabaseShardingStrategyConfig 默认分库策略
- defaultTableShardingStrategyConfig 默认分表策略
其中分片策略配置包括:暗示策略Hint基于ThraedLocal
表达式分片策略Inline基于groovy 的 Inline 表达式
复合分片策略 Complex 基于多列 、不分库策略None,标准策略Standard基于单列。
然后根据 shardingRuleConfiguration和数据源集合,创建分片规则 ShardingRule
public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) {
Preconditions.checkArgument(null != shardingRuleConfig, "ShardingRuleConfig cannot be null.");
Preconditions.checkArgument(null != dataSourceNames && !dataSourceNames.isEmpty(), "Data sources cannot be empty.");
this.ruleConfiguration = shardingRuleConfig;
//根据配置创建所有数据源集合,主从复制需要特殊处理(只是保留一个主从公共数据源)
shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames);
//根据分片配置,创建分片规则
tableRules = createTableRules(shardingRuleConfig);
broadcastTables = shardingRuleConfig.getBroadcastTables();
//创建 表分片key与分片规则完全一致 的规则
bindingTableRules = createBindingTableRules(shardingRuleConfig.getBindingTableGroups());
//创建默认的分库策略
defaultDatabaseShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig());
defaultTableShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultTableShardingStrategyConfig());
defaultShardingKeyGenerator = createDefaultKeyGenerator(shardingRuleConfig.getDefaultKeyGeneratorConfig());
//主从规则
masterSlaveRules = createMasterSlaveRules(shardingRuleConfig.getMasterSlaveRuleConfigs());
encryptRule = createEncryptRule(shardingRuleConfig.getEncryptRuleConfig());
}
其中最重要的方法 createTableRules 根据分片配置以逻辑表为单位创建数据节点,通常一个TableRule(一个逻辑表规则) 对应多个分表,分库。
//以每个表为单位创建分片规则
private Collection<TableRule> createTableRules(final ShardingRuleConfiguration shardingRuleConfig) {
Collection<TableRuleConfiguration> tableRuleConfigurations = shardingRuleConfig.getTableRuleConfigs();
Collection<TableRule> result = new ArrayList<>(tableRuleConfigurations.size());
for (TableRuleConfiguration each : tableRuleConfigurations) {
//创建分片规则
result.add(new TableRule(each, shardingDataSourceNames, getDefaultGenerateKeyColumn(shardingRuleConfig)));
}
return result;
}
然后创建初始化 TableRule
1、将逻辑表名转成小写
2、获取用户配置的真实表名,因为可以配置表达式所以这里需要计算一下。
3、将真实 datasource + 表名 (逗号分隔) 封装成 DataNode
如果用户没有配置真实表,那么默认使用逻辑表名充当真实表名
4、根据分库策略配置项,创建分库策略。根据分表策略配置项,创建分表策略
5、非默认策略必须配置一个想要路由到的真实表
//tableRuleConfig 为分片规则配置项
public TableRule(final TableRuleConfiguration tableRuleConfig, final ShardingDataSourceNames shardingDataSourceNames, final String defaultGenerateKeyColumn) {
//将逻辑表名转成小写
logicTable = tableRuleConfig.getLogicTable().toLowerCase();
//将真实表名表达式,转换为多个 datasource +表名
List<String> dataNodes = new InlineExpressionParser(tableRuleConfig.getActualDataNodes()).splitAndEvaluate();
//按照顺序存储 DataNode (datasource +表名)
dataNodeIndexMap = new HashMap<>(dataNodes.size(), 1);
//将真实 datasource + 表名 封装成 DataNode
actualDataNodes = isEmptyDataNodes(dataNodes)
? generateDataNodes(tableRuleConfig.getLogicTable(), shardingDataSourceNames.getDataSourceNames()) : generateDataNodes(dataNodes, shardingDataSourceNames.getDataSourceNames());
//单纯存放真实表名,去重
actualTables = getActualTables();
//根据分库策略配置项,创建分库策略
databaseShardingStrategy = null == tableRuleConfig.getDatabaseShardingStrategyConfig() ? null : ShardingStrategyFactory.newInstance(tableRuleConfig.getDatabaseShardingStrategyConfig());
//根据分表策略配置项,创建分表策略
tableShardingStrategy = null == tableRuleConfig.getTableShardingStrategyConfig() ? null : ShardingStrategyFactory.newInstance(tableRuleConfig.getTableShardingStrategyConfig());
generateKeyColumn = getGenerateKeyColumn(tableRuleConfig.getKeyGeneratorConfig(), defaultGenerateKeyColumn);
shardingKeyGenerator = containsKeyGeneratorConfiguration(tableRuleConfig)
? new ShardingKeyGeneratorServiceLoader().newService(tableRuleConfig.getKeyGeneratorConfig().getType(), tableRuleConfig.getKeyGeneratorConfig().getProperties()) : null;
//非默认策略必须配置一个想要路由到的真实表
checkRule(dataNodes);
}
将真实 datasource + 表名 (逗号分隔) 封装成 DataNode
private List<DataNode> generateDataNodes(final String logicTable, final Collection<String> dataSourceNames) {
List<DataNode> result = new LinkedList<>();
int index = 0;
//为每个数据源创建一套
for (String each : dataSourceNames) {
DataNode dataNode = new DataNode(each, logicTable);
result.add(dataNode);
//保存数据节点的顺序
dataNodeIndexMap.put(dataNode, index);
actualDatasourceNames.add(each);
//按照数据源维度,存放真实数据表
addActualTable(dataNode.getDataSourceName(), dataNode.getTableName());
index++;
}
return result;
}
public DataNode(final String dataNode) {
if (!isValidDataNode(dataNode)) {
throw new ShardingConfigurationException("Invalid format for actual data nodes: '%s'", dataNode);
}
//根据.分隔 前半部分为数据源,后边为真实表
List<String> segments = Splitter.on(DELIMITER).splitToList(dataNode);
dataSourceName = segments.get(0);
tableName = segments.get(1);
}
根据分库策略配置项,创建分库策略。根据分表策略配置项,创建分表策略,这里分表策略和分库策略的创建逻辑都是一样的。包括默认的分库分表策略。
1、标准分片策略
2、Inline表达式分片策略
3、复合分片策略
4、threadLocal 分片策略
public static ShardingStrategy newInstance(final ShardingStrategyConfiguration shardingStrategyConfig) {
//标准分片策略
if (shardingStrategyConfig instanceof StandardShardingStrategyConfiguration) {
return new StandardShardingStrategy((StandardShardingStrategyConfiguration) shardingStrategyConfig);
}
//Inline表达式分片策略
if (shardingStrategyConfig instanceof InlineShardingStrategyConfiguration) {
return new InlineShardingStrategy((InlineShardingStrategyConfiguration) shardingStrategyConfig);
}
//复合分片策略
if (shardingStrategyConfig instanceof ComplexShardingStrategyConfiguration) {
return new ComplexShardingStrategy((ComplexShardingStrategyConfiguration) shardingStrategyConfig);
}
//threadLocal 分片策略
if (shardingStrategyConfig instanceof HintShardingStrategyConfiguration) {
return new HintShardingStrategy((HintShardingStrategyConfiguration) shardingStrategyConfig);
}
return new NoneShardingStrategy();
}
//标准分片策略
public StandardShardingStrategy(final StandardShardingStrategyConfiguration standardShardingStrategyConfig) {
Preconditions.checkNotNull(standardShardingStrategyConfig.getShardingColumn(), "Sharding column cannot be null.");
Preconditions.checkNotNull(standardShardingStrategyConfig.getPreciseShardingAlgorithm(), "precise sharding algorithm cannot be null.");
//用于分片的列
shardingColumn = standardShardingStrategyConfig.getShardingColumn();
//是必选的 用于处理=和IN的分片
preciseShardingAlgorithm = standardShardingStrategyConfig.getPreciseShardingAlgorithm();
//是可选的 用于处理BETWEEN AND分片,如果不配置 RangeShardingAlgorithm,SQL中的 BETWEEN AND 将按照全库路由处理
rangeShardingAlgorithm = standardShardingStrategyConfig.getRangeShardingAlgorithm();
}
/**
* Inline表达式分片策略。使用Groovy的Inline表达式,提供对SQL语句中的=和IN的分片操作支持。
* InlineShardingStrategy只支持单分片键,对于简单的分片算法,可以通过简单的配置使用,从而避免繁琐的Java代码开发,
* 如: tuser${user_id % 8} 表示t_user表按照user_id按8取模分成8个表,表名称为t_user_0 到 t_user_7。
*/
public InlineShardingStrategy(final InlineShardingStrategyConfiguration inlineShardingStrategyConfig) {
Preconditions.checkNotNull(inlineShardingStrategyConfig.getShardingColumn(), "Sharding column cannot be null.");
Preconditions.checkNotNull(inlineShardingStrategyConfig.getAlgorithmExpression(), "Sharding algorithm expression cannot be null.");
shardingColumn = inlineShardingStrategyConfig.getShardingColumn();
String algorithmExpression = InlineExpressionParser.handlePlaceHolder(inlineShardingStrategyConfig.getAlgorithmExpression().trim());
closure = new InlineExpressionParser(algorithmExpression).evaluateClosure();
}
//复合分片策略。提供对SQL语句中的 =, IN 和 BETWEEN AND 的分片操作支持。
public ComplexShardingStrategy(final ComplexShardingStrategyConfiguration complexShardingStrategyConfig) {
Preconditions.checkNotNull(complexShardingStrategyConfig.getShardingColumns(), "Sharding columns cannot be null.");
Preconditions.checkNotNull(complexShardingStrategyConfig.getShardingAlgorithm(), "Sharding algorithm cannot be null.");
shardingColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
shardingColumns.addAll(Splitter.on(",").trimResults().splitToList(complexShardingStrategyConfig.getShardingColumns()));
shardingAlgorithm = complexShardingStrategyConfig.getShardingAlgorithm();
}
//通过Hint而非SQL解析的方式分片的策略
public HintShardingStrategy(final HintShardingStrategyConfiguration hintShardingStrategyConfig) {
Preconditions.checkNotNull(hintShardingStrategyConfig.getShardingAlgorithm(), "Sharding algorithm cannot be null.");
shardingColumns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
shardingAlgorithm = hintShardingStrategyConfig.getShardingAlgorithm();
}
这里可能有个误区,分片策略不是自定义的吗?怎么会出现在源码中。sharding jdbc 分片策略不能自定义,分片算法是自定义的,上边代码中,getShardingAlgorithm() 就是在获取用户自定义的算法。
2、匹配数据库类型 databaseType
在初始化 ShardingDataSource 时候会调用父类AbstractDataSourceAdapter 构造方法,根据数据库连接的url进行匹配数据库类型
public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
this.dataSourceMap = dataSourceMap;
databaseType = createDatabaseType();
}
//根据数据数据库连接 url 匹配找到数据库类型
private DatabaseType createDatabaseType(final DataSource dataSource) throws SQLException {
if (dataSource instanceof AbstractDataSourceAdapter) {
return ((AbstractDataSourceAdapter) dataSource).databaseType;
}
try (Connection connection = dataSource.getConnection()) {
return DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL());
}
}
private static boolean matchStandardURL(final String url, final DatabaseType databaseType) {
return url.startsWith(String.format("jdbc:%s:", databaseType.getName().toLowerCase()));
}
3、创建 sql 解析引擎 SQLParseEngine 、sql 执行引擎 ShardingExecuteEngine
1、创建 ShardingRuntimeContext 调用父类 AbstractRuntimeContext 的构造方法
2、创建运行时候需要的一些配置信息,包括是否打印sql,单次执行消耗的数据库连接数、并行执行线程池大小等等
3、创建 sql 执行引擎
4、创建 sql 解析引擎
protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
this.rule = rule;
//运行时候的一些配置信息
this.props = new ShardingProperties(null == props ? new Properties() : props);
this.databaseType = databaseType;
//创建 sql 执行
executeEngine = new ShardingExecuteEngine(this.props.<Integer>getValue(ShardingPropertiesConstant.EXECUTOR_SIZE));
//根据数据库类型初始化,sql 解析引擎
parseEngine = SQLParseEngineFactory.getSQLParseEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
ConfigurationLogger.log(rule.getRuleConfiguration());
ConfigurationLogger.log(props);
}
创建 sql 执行引擎
sql 执行引擎 ShardingExecuteEngine 复合了sql 执行服务ShardingExecutorService,在创建ShardingExecuteEngine 时同时创建了 ShardingExecutorService 一对一的关系,这里涉及到了guava 的 MoreExecutors 对jdk Executor 的增强
public ShardingExecutorService(final int executorSize, final String nameFormat) {
executorService = MoreExecutors.listeningDecorator(getExecutorService(executorSize, nameFormat));
//java 系统默认创建的线程都是用户线程,也就是说如果 用户线程不结束,main 方法不会结束的。
//在线程 start 之前当调用 setDaemon 时,此线程就是守护线程了,main 方法不会等待此线程运行结束
//非主动结束进程 或调用 System.exit 时无论什么线程都会结束
//MoreExecutors 是 gava 对 jdk 线程池的增强
//加 60 秒的 jvm 关闭钩子, 在 jvm 中已经没有用户线程在运行了,那么 等待 60 秒后关闭线程池
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}
//创建线程池,如果配置参数 executorSize 为0那么创建 CachedThreadPool 核心线程数量为0 最大线程数量为 Integer.MAX_VALUE
private ExecutorService getExecutorService(final int executorSize, final String nameFormat) {
//线程名字为 ShardingSphere 开头
ThreadFactory shardingThreadFactory = ShardingThreadFactoryBuilder.build(nameFormat);
return 0 == executorSize ? Executors.newCachedThreadPool(shardingThreadFactory) : Executors.newFixedThreadPool(executorSize, shardingThreadFactory);
}
sql 解析引擎
1、根据数据库名称创建 SQLParseEngine
这里还有一个误区,不是每个数据都应该有对应的 SQLParseEngine 吗?其实类都是同一个 SQLParseEngine 只是其中 String databaseTypeName 不同而已,虽然调用了构造函数但是此处的初始化还没有完成,真正完成在第一次解析sql的时候。
2、第一次解析sql。SQLParseEngine.parse0
(1)是否用缓存中已解析好的结果
(2)创建 SQLParseKernel 并且调用了 ParseRuleRegistry.getInstance() 完整初始化过程。
public static SQLParseEngine getSQLParseEngine(final String databaseTypeName) {
if (ENGINES.containsKey(databaseTypeName)) {
return ENGINES.get(databaseTypeName);
}
//这里同步保证每个数据库只有一个 SQLParseEngine 实例
synchronized (ENGINES) {
if (ENGINES.containsKey(databaseTypeName)) {
return ENGINES.get(databaseTypeName);
}
SQLParseEngine result = new SQLParseEngine(databaseTypeName);
ENGINES.put(databaseTypeName, result);
return result;
}
}
第一次解析sql。SQLParseEngine.parse0
因为第一次执行解析 sql 所以虚拟机会加载 ParseRuleRegistry 到内存,并且调用 ParseRuleRegistry 中的静态代码块。
1、利用 java spi 机制加载 SQLParserEntry 的实现类到内存,SQLParserEntry 的实现类包括MySQLParserEntry,OracleParserEntry ,分别在不同的模块中,所以可以按需引入。
2、加载 sql 解析规范例如 sql-statement-rule-definition.xml 文件并且解析存放在Map<String, SQLStatementRuleDefinition> sqlStatementRuleDefinitions 中。
private SQLStatement parse0(final String sql, final boolean useCache) {
//是否用缓存中已解析好的结果
if (useCache) {
Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
if (cachedSQLStatement.isPresent()) {
return cachedSQLStatement.get();
}
}
// ParseRuleRegistry.getInstance() 获取定的一些规范,和配置信息
SQLStatement result = new SQLParseKernel(ParseRuleRegistry.getInstance(), databaseTypeName, sql).parse();
if (useCache) {
cache.put(sql, result);
}
return result;
}
ParseRuleRegistry.getInstance()
static {
//spi 机制加载注册的 sql 语法解析类型,例如 oracl mysql
NewInstanceServiceLoader.register(SQLParserEntry.class);
instance = new ParseRuleRegistry();
}
//key 接口类型, value 实现类集合
private static final Map<Class, Collection<Class<?>>> SERVICE_MAP = new HashMap<>();
/**
* 根据 ServiceLoader java util spi 机制获取相应的 service 目录默认在 META-INF/services/
*/
public static <T> void register(final Class<T> service) {
for (T each : ServiceLoader.load(service)) {
registerServiceClass(service, each);
}
}
加载 sql 解析规范例如 sql-statement-rule-definition.xml 文件并且解析存放在Map<String, SQLStatementRuleDefinition> sqlStatementRuleDefinitions 中,key 为数据库名称,value 为规则类。
/**
* load sql 解析规范 例如sql-statement-rule-definition.xml
*/
private void initParseRuleDefinition() {
ExtractorRuleDefinitionEntity generalExtractorRuleEntity = extractorRuleLoader.load(RuleDefinitionFileConstant.getExtractorRuleDefinitionFile());
FillerRuleDefinitionEntity generalFillerRuleEntity = fillerRuleLoader.load(RuleDefinitionFileConstant.getFillerRuleDefinitionFile());
for (SQLParserEntry each : NewInstanceServiceLoader.newServiceInstances(SQLParserEntry.class)) {
String databaseTypeName = each.getDatabaseTypeName();
fillerRuleDefinitions.put(databaseTypeName, createFillerRuleDefinition(generalFillerRuleEntity, databaseTypeName));
sqlStatementRuleDefinitions.put(databaseTypeName, createSQLStatementRuleDefinition(generalExtractorRuleEntity, databaseTypeName));
}
}
那随后的执行阶段就可以根据数据库名取到对应的解析方法了。
public SQLAST parse() {
//根据数据库名获取,对应的解析类
SQLParser sqlParser = SQLParserFactory.newInstance(databaseTypeName, sql);
....略
//根据数据库名获取,对应的解析规则类
SQLStatementRule rule = parseRuleRegistry.getSQLStatementRule(databaseTypeName, parseTree.getClass().getSimpleName());
if (null == rule) {
throw new SQLParsingException(String.format("Unsupported SQL of `%s`", sql));
}
return new SQLAST((ParserRuleContext) parseTree, getParameterMarkerIndexes((ParserRuleContext) parseTree), rule);
}
至此 sharding jdbc 4.0.1 初始化阶段已经完成,关于
2、内部运行流程分别是怎么实现的
(1)初始化过程
(2)sql 解析
(3)sql 提取
(4)sql 路由
(5)sql 替换
(6)sql 执行
中的 2、3、4、5、6 下次分析。
网友评论