ShardingSphere-jdbc sql执行过程
执行过程
- 调用ShardingSphereDataSource的getConnection方法
- 在DriverStateContext中的STATE容器中拿到之前在构造ShardingSphereDataSource时已存入的OKDriverState
- 通过OKDriverState的getConnection方法构造ShardingSphereConnection
- 通过调用ShardingSphereConnection中的createStatement方法,构造ShardingSphereStatement
- 在ShardingSpherePreparedStatement的构造方法中,完成多个变量的初始化
5.1 jdbcExecutor,本质是ShardingSphereDataSource中生成的executorEngine,存放在metaDataContexts
5.2 ShardingSphereSQLParserEngine,sql解析引擎,根据不同的数据库类型
5.3 BatchPreparedStatementExecutor,封装需要执行的sql单元JDBCExecutionUnit - 调用jdbcExecutor中的execute方法将sql的执行封装成任务由executorEngine中的线程池调度执行
- 调用ListenableFuture的get方法获取sql的执行结果
方法解析
在example
模块例子中,调用以下sql
CREATE TABLE IF NOT EXISTS t_order (order_id BIGINT NOT NULL AUTO_INCREMENT, user_id INT NOT NULL, address_id BIGINT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_id))
调用链如下:
public Connection getConnection() {
return DriverStateContext.getConnection(schemaName, getDataSourceMap(), contextManager, TransactionTypeHolder.get());
}
private static final Map<String, DriverState> STATES;
static {
// TODO add singleton cache with TypedSPI init
ShardingSphereServiceLoader.register(DriverState.class);
Collection<DriverState> driverStates = ShardingSphereServiceLoader.getSingletonServiceInstances(DriverState.class);
STATES = new HashMap<>();
for (DriverState each : driverStates) {
STATES.put(each.getType(), each);
}
}
public static Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
return STATES.get(contextManager.getMetaDataContexts().getStateContext().getCurrentState()).getConnection(schemaName, dataSourceMap, contextManager, transactionType);
}
public final class OKDriverState implements DriverState {
@Override
public Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
return new ShardingSphereConnection(schemaName, dataSourceMap, contextManager, TransactionTypeHolder.get());
}
@Override
public String getType() {
return "OK";
}
}
DriverStateContext
中的成员变量STATES
,是在TypedSPI
调用init
的时候初始化,也就是在生成ShardingSphereDataSource
的过程中,这里实际是从OKDriverState
中getConnection,构建一个ShardingSphereConnection
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new ShardingSpherePreparedStatement(this, sql);
}
public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
}
private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
metaDataContexts = connection.getContextManager().getMetaDataContexts();
this.sql = sql;
statements = new ArrayList<>();
parameterSets = new ArrayList<>();
ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine(
DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType()));
sqlStatement = sqlParserEngine.parse(sql, true);
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
driverJDBCExecutor = new DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
// TODO Consider FederateRawExecutor
federateExecutor = new FederateJDBCExecutor(connection.getSchemaName(), metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getProps(), jdbcExecutor);
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getSchemaName());
kernelProcessor = new KernelProcessor();
}
通过调用ShardingSphereConnection
的prepareStatement
方法,构造一个ShardingSpherePreparedStatement
,在ShardingSpherePreparedStatement
的构造方法中初始化各种成员变量,与后续sql的执行有关
public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
if (executionGroupContext.getInputGroups().isEmpty()) {
return Collections.emptyList();
}
return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback)
: parallelExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback);
}
- executionGroupContext--把需要执行的sql封装成JDBCExecutionUnit
- callback--ShardingSphereStatement里通过createExecuteCallback生成的JDBCExecutorCallback
private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback) throws SQLException {
ExecutionGroup<I> firstInputs = executionGroups.next();
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(executionGroups, callback);
return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
while (executionGroups.hasNext()) {
result.add(asyncExecute(executionGroups.next(), callback));
}
return result;
}
private <I, O> ListenableFuture<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final ExecutorCallback<I, O> callback) {
Map<String, Object> dataMap = ExecutorDataMap.getValue();
return executorServiceManager.getExecutorService().submit(() -> callback.execute(executionGroup.getInputs(), false, dataMap));
}
private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
List<O> result = new LinkedList<>(firstResults);
for (ListenableFuture<Collection<O>> each : restFutures) {
try {
result.addAll(each.get());
} catch (final InterruptedException | ExecutionException ex) {
return throwException(ex);
}
}
return result;
}
- 把执行sql封装成线程池任务丢入线程池中调度执行
- 把执行sql任务的调用结果封装成future收集起来
- 通过future,.get获取sql执行结果
private <T> List<T> doExecute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final Collection<RouteUnit> routeUnits,
final JDBCExecutorCallback<T> callback, final SQLStatement sqlStatement) throws SQLException {
List<T> results = jdbcExecutor.execute(executionGroupContext, callback);
refreshMetadata(sqlStatement, routeUnits);
return results;
}
- jdbcExecutor.execute方法是真正执行sql的地方,在
JDBCExecutor
中存放了由ShardingSphereDataSource
构造的executorEngine
,所以本质上是由executorEngine
来执行sql -
refreshMetadata
方法中,sqlStatement
存放了sql语句中一些元数据信息(表字段,约束等),routeUnits
存放了路由信息(数据源+字表) - 最后根据
routeUnits
的数量,返回相同数量的results
,相当于在每个routeUnits
中都执行一遍sql
网友评论