1、前言
之前我们自定义了 mybatis 的插件,实际上 mybatis 的插件功能相当简单,mybatis 插件注入是在 sqlSessionFactory.openSession 的时候,具体可以参照源码。既然他的插件这么好用,我们可以参照使用它实现分库分表的功能。
2、设计
分库分表场景中,将 MyBatis Spring 集成使用,选择具体分库的功能并不是直接 MyBatis 中完成的,而是在 Spring 中配置了多个数据源,并通过 spring 拦截器实现的。具体的分表功能是通过 MyBatis 中添加 个分表插件实现的,在该插件中拦截 Executor update() 方法和 query() 方法,并根据用户传入的用户 ID 计算分表的编号后缀。之后,该插件会将表名与编号后缀组合形成分表名称,解析并修改 SQL 语句,最终得到可以在当前分库中直接执行的 SQL 语句。
对于分库分表的规则:
用户所在数据库 ID = 用户 ID(或者其他 key) % 数据库数量
用户所在数据表 ID = 用户 ID / 数据库数量 % 每个数据库表数量(至于为啥是这样算,不用计较太多,我看过各种版本的,也有直接 id % 表数量的,能唯一找到就行)
3、实现
首先定义好分库策略、分表策略接口。并实现具体的分库策略、分表策略。
package cn.blogxin.sharding.plugin.strategy.database;
/**
* 分库策略
*
*/
public interface ShardingDataBaseStrategy {
/**
* 计算获取对应分库序号
*
* @param sharingDataBaseCount 分库数量
* @param shardingKey 分表key
* @return 分库序号
*/
Integer calculate(int sharingDataBaseCount, String shardingKey);
}
默认分库实现:
package cn.blogxin.sharding.plugin.strategy.database;
/**
* 默认分库策略,将分表从小到大均匀分配至各分库中
* 比如:
* 2个库,10个表
* 0-4表在0库,5-9表在1库
*
*/
public class DefaultShardingDataBaseStrategy implements ShardingDataBaseStrategy {
@Override
public Integer calculate(int sharingDataBaseCount, String shardingKey) {
return Math.abs(shardingKey.hashCode() % sharingDataBaseCount);
}
}
分表接口:
package cn.blogxin.sharding.plugin.strategy;
import cn.blogxin.sharding.plugin.Sharding;
/**
* 分表策略
*
*/
public interface ShardingTableStrategy {
String UNDERLINE = "_";
/**
* 获取分表位的实际表名
*
* @param sharding Sharding信息
* @param shardingKey 分库分表 key
* @return 带分表位的实际表名
*/
String getTargetTableName(Sharding sharding, String shardingKey);
/**
* 计算分表
*
* @param sharding Sharding信息
* @param shardingCount 库数量
* @param shardingKey 分库分表 key
* @return 计算分表
*/
Integer calculateTableSuffix(Sharding sharding, Integer shardingCount, String shardingKey);
}
分表基类:
package cn.blogxin.sharding.plugin.strategy;
import cn.blogxin.sharding.plugin.Sharding;
import cn.blogxin.sharding.plugin.ShardingContext;
import cn.blogxin.sharding.plugin.bean.ShardingDataSourceInfo;
import com.google.common.collect.Maps;
import java.util.Map;
/**
* 带分库的分表策略,使用分库插件时,分表插件必须继承该类
*
*/
public abstract class AbstractShardingStrategyWithDataBase implements ShardingTableStrategy {
private static Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap = Maps.newHashMap();
public static void setShardingDataSourceInfoMap(Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap) {
AbstractShardingStrategyWithDataBase.shardingDataSourceInfoMap = shardingDataSourceInfoMap;
}
// 这边做的不是特别好,分库分表职责没有分开
@Override
public String getTargetTableName(Sharding sharding, String shardingKey) {
// 确定库名
ShardingDataSourceInfo shardingDataSourceInfo = shardingDataSourceInfoMap.get(sharding.databaseName());
if (shardingDataSourceInfo != null) {
int databaseNum = shardingDataSourceInfo.getShardingDataBaseStrategy().calculate(shardingDataSourceInfo.getShardingCount(), shardingKey);
// 设置上下文的数据库
ShardingContext.setShardingDatabase(sharding.databaseName() + ShardingContext.getMasterSalve() + databaseNum);
}
// 确定表名
Integer tableSuffix = calculateTableSuffix(sharding, shardingDataSourceInfo.getShardingCount(), shardingKey);
return getTableName(sharding.tableName(), tableSuffix);
}
private String getTableName(String tableName, Integer shardingKey) {
return tableName + UNDERLINE + shardingKey;
}
}
分表默认实现:
package cn.blogxin.sharding.plugin.strategy;
import cn.blogxin.sharding.plugin.Sharding;
/**
* hash 分表策略,key / databaseCount % tableCount
*
*/
public class HashShardingStrategyWithDataBase extends AbstractShardingStrategyWithDataBase {
@Override
public Integer calculateTableSuffix(Sharding sharding, Integer shardingCount, String shardingKey) {
return Math.abs(shardingKey.hashCode()) / shardingCount % sharding.count();
}
}
然后设置好 spring 的动态数据源,key 是什么,value 为数据源,后续进行 mybatis 的自定义拦截器查询时,会将数据源 key 设置到上下文,然后根据 key 获取相应的数据源、获取连接执行 sql 操作。
package cn.blogxin.sharding.plugin;
import cn.blogxin.sharding.plugin.bean.Database;
import cn.blogxin.sharding.plugin.bean.ShardingDataSourceInfo;
import cn.blogxin.sharding.plugin.strategy.AbstractShardingStrategyWithDataBase;
import cn.blogxin.sharding.plugin.strategy.database.ShardingDataBaseStrategy;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.collections.MapUtils;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Set;
/**
* 加载分库分表插件
*
*/
@Configuration
@AutoConfigureBefore(DataSourceAutoConfiguration.class)
@ConditionalOnProperty(name = "sharding.databases", havingValue = "enable")
@EnableConfigurationProperties(ShardingProperties.class)
public class ShardingDataSourceConfiguration {
@Resource
private ShardingProperties shardingProperties;
private DataSource shardingDataSource() {
Map<String, Database> databases = shardingProperties.getDatabases();
Preconditions.checkArgument(!CollectionUtils.isEmpty(databases), "不存在分库配置");
Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap = Maps.newHashMap();
Map<Object, Object> targetDataSources = Maps.newHashMap();
DataSource dataSource = null;
for (Map.Entry<String, Database> entry : databases.entrySet()) {
String dataBaseName = entry.getKey(); // 你说你用啥获取不好,非要在配置文件的 key 中获取,容易造成疑惑
Database database = entry.getValue();
ShardingDataSourceInfo shardingDataSourceInfo = new ShardingDataSourceInfo();
shardingDataSourceInfo.setShardingCount(database.getShardingCount());
shardingDataSourceInfo.setShardingDataBaseStrategy(createShardingDataBaseStrategy(database.getShardingStrategy()));
shardingDataSourceInfoMap.put(dataBaseName, shardingDataSourceInfo);
// 每个库对应的数据源
Set<Map.Entry<String, Map<Integer, DataSourceProperties>>> entries = database.getDataSource().entrySet();
for (Map.Entry<String, Map<Integer, DataSourceProperties>> masterSlave : entries) {
String masterSlaveKey = masterSlave.getKey();
Map<Integer, DataSourceProperties> masterSlaveValue = masterSlave.getValue();
for (Map.Entry<Integer, DataSourceProperties> propertiesEntry : masterSlaveValue.entrySet()) {
// 设置数据源的 key
String shardingDataBaseKey = dataBaseName + masterSlaveKey + propertiesEntry.getKey();
dataSource = createDataSource(propertiesEntry.getValue(), HikariDataSource.class);
// 设置数据源
targetDataSources.put(shardingDataBaseKey, dataSource);
}
}
}
Preconditions.checkArgument(MapUtils.isNotEmpty(targetDataSources), "找不到database配置");
Preconditions.checkNotNull(dataSource, "找不到database配置");
AbstractShardingStrategyWithDataBase.setShardingDataSourceInfoMap(shardingDataSourceInfoMap);
ShardingDataSource shardingDataSource = new ShardingDataSource();
shardingDataSource.setTargetDataSources(targetDataSources);
/**
* 用于创建LazyConnectionDataSourceProxy时获取真实数据库连接,来获取实际数据库的自动提交配置和隔离级别
*/
shardingDataSource.setDefaultTargetDataSource(dataSource);
shardingDataSource.setLenientFallback(false);
shardingDataSource.afterPropertiesSet();
return shardingDataSource;
}
@Bean
public DataSource dataSource() {
LazyConnectionDataSourceProxy dataSourceProxy = new LazyConnectionDataSourceProxy();
dataSourceProxy.setTargetDataSource(shardingDataSource());
return dataSourceProxy;
}
@SuppressWarnings("unchecked")
private <T> T createDataSource(DataSourceProperties properties,
Class<? extends DataSource> type) {
return (T) properties.initializeDataSourceBuilder().type(type).build();
}
private ShardingDataBaseStrategy createShardingDataBaseStrategy(String shardingDataBaseStrategyClassName) {
try {
return (ShardingDataBaseStrategy) Class.forName(shardingDataBaseStrategyClassName).newInstance();
} catch (Exception e) {
throw new RuntimeException("初始化ShardingDataBaseStrategy失败。ShardingDataBaseStrategy=" + shardingDataBaseStrategyClassName);
}
}
/**
* 根据分库上下文路由DataSource
*
* @author kris
*/
public static class ShardingDataSource extends AbstractRoutingDataSource {
/**
* ShardingContext.getShardingDatabase() 为库名+分库序号
*
* 实际上怎么拿数据源进行连接时,就是根据这里的 key 来决定的。实际上整个流程是,这边的类先加载装配给 spring,当用户
* 执行 sql 被 mybatis 拦截器拦截时,会设置数据库的 key,然后执行完拦截器。后面执行到这里拿出数据库的 key 来决定
* 连接哪个库,最后再执行 sql。
*
* 也就是 spring 先把接口定义好暴露给我们,它的流程已经进行了方法调用,后续我们只要实现接口就能实现真正的功能调用,有点类似于
* 模板方法模式
* @return
*/
@Override
protected Object determineCurrentLookupKey() {
return ShardingContext.getShardingDatabase();
}
}
}
最后定义好 mybatis 的拦截器,这边主要替换 sql 语句,将表替换成需要操作的表。设置数据源 key,以便后续执行 sql 时选择对应的连接。
package cn.blogxin.sharding.plugin.interceptor;
import cn.blogxin.sharding.plugin.Sharding;
import cn.blogxin.sharding.plugin.ShardingContext;
import cn.blogxin.sharding.plugin.ShardingTableConfiguration;
import cn.blogxin.sharding.plugin.strategy.DefaultShardingStrategyWithDataBase;
import cn.blogxin.sharding.plugin.strategy.ShardingTableStrategy;
import com.google.common.collect.Maps;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.Map;
import java.util.Properties;
/**
* mybatis分表插件
*
* @see ShardingTableConfiguration
*/
@Intercepts({@Signature(method = "prepare", type = StatementHandler.class, args = {Connection.class, Integer.class})})
public class ShardingInterceptor implements Interceptor {
private final static Logger logger = LoggerFactory.getLogger(ShardingInterceptor.class);
private static final String DELEGATE_BOUND_SQL_SQL = "delegate.boundSql.sql";
private static final String DELEGATE_MAPPED_STATEMENT_ID = "delegate.mappedStatement.id";
private static final String DELEGATE_PARAMETER_HANDLER_PARAMETER_OBJECT = "delegate.parameterHandler.parameterObject";
private static final String PARAM_1 = "param1";
private static final String POINT = ".";
private static final ShardingTableStrategy DEFAULT_SHARDING_STRATEGY = new DefaultShardingStrategyWithDataBase();
private static final Map<String, ShardingTableStrategy> SHARDING_STRATEGY_MAP = Maps.newConcurrentMap();
@Override
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler = (StatementHandler) realTarget(invocation.getTarget());
MetaObject metaObject = SystemMetaObject.forObject(statementHandler);
String id = (String) metaObject.getValue(DELEGATE_MAPPED_STATEMENT_ID);
String className = id.substring(0, id.lastIndexOf(POINT));
Sharding sharding = Class.forName(className).getDeclaredAnnotation(Sharding.class);
if (sharding != null && sharding.sharding()) {
String sql = (String) metaObject.getValue(DELEGATE_BOUND_SQL_SQL);
sql = sql.replaceAll(sharding.tableName(), getTargetTableName(metaObject, sharding));
metaObject.setValue(DELEGATE_BOUND_SQL_SQL, sql);
}
return invocation.proceed();
}
private String getTargetTableName(MetaObject metaObject, Sharding sharding) throws Exception {
String shardingKey = getShardingKey(metaObject);
String targetTableName;
if (!StringUtils.isEmpty(shardingKey)) {
targetTableName = getShardingStrategy(sharding).getTargetTableName(sharding, shardingKey);
} else if (StringUtils.isEmpty(shardingKey) && !StringUtils.isEmpty(ShardingContext.getShardingTable())) {
targetTableName = DEFAULT_SHARDING_STRATEGY.getTargetTableName(sharding, ShardingContext.getShardingTable());
} else {
throw new RuntimeException("没有找到分表信息。shardingKey=" + shardingKey + ",ShardingContext=" + ShardingContext.getShardingTable());
}
return targetTableName;
}
private ShardingTableStrategy getShardingStrategy(Sharding sharding) throws Exception {
String strategyClassName = sharding.strategy();
ShardingTableStrategy shardingStrategy = SHARDING_STRATEGY_MAP.get(strategyClassName);
if (shardingStrategy == null) {
ShardingTableStrategy strategy = (ShardingTableStrategy) Class.forName(strategyClassName).newInstance();
SHARDING_STRATEGY_MAP.putIfAbsent(strategyClassName, strategy);
shardingStrategy = SHARDING_STRATEGY_MAP.get(strategyClassName);
}
return shardingStrategy;
}
/**
* 默认取第一个参数作为分表键
* @param metaObject
* @return
*/
private String getShardingKey(MetaObject metaObject) {
String shardingKey = null;
Object parameterObject = metaObject.getValue(DELEGATE_PARAMETER_HANDLER_PARAMETER_OBJECT);
if (parameterObject instanceof String) {
shardingKey = (String) parameterObject;
} else if (parameterObject instanceof Map) {
Map<String, Object> parameterMap = (Map<String, Object>) parameterObject;
Object param1 = parameterMap.get(PARAM_1);
if (param1 instanceof String) {
shardingKey = (String) param1;
}
}
return shardingKey;
}
@Override
public Object plugin(Object target) {
if (target instanceof StatementHandler) {
return Plugin.wrap(target, this);
}
return target;
}
@Override
public void setProperties(Properties properties) {
}
private Object realTarget(Object target) {
if (Proxy.isProxyClass(target.getClass())) {
MetaObject metaObject = SystemMetaObject.forObject(target);
return realTarget(metaObject.getValue("h.target"));
}
return target;
}
}
4、缺点
大家只是考虑了分库分表,有没有想过分库分表后数据的迁移工作。一种策略是新数据库直接走分库分表,老数据还是走原来的逻辑,然后跑 job 把老数据放入新数据中,直到老数据跑完,那么原来代码中的强制逻辑就可以去掉。
还有一点是最烦的,你的数据爆炸了,你又得增加库跟表,但是你根据 hash 来算的,你原来的数据要重新迁移,是不是很崩溃!!!所以,一般一次性考虑好,后面的话直接归档算了,别折腾太多。就算你上一致性 hash 算法,还是得迁移数据,数据多少的问题。
还有,生产上除非你有十足的把我,否则还是乖乖用 sharding-jdbc。因为除了基本的根据 key 进行插入、删除、查询的单个数据情况,还有批量插入、查询、根据时间查询各种,要做的工作量不小。
网友评论