美文网首页
sharding-sphere之sql执行那些事

sharding-sphere之sql执行那些事

作者: 一滴水的坚持 | 来源:发表于2018-07-09 21:57 被阅读0次

    以官方例子如下,调试sharding-sphere代码:

        public static void main(final String[] args) throws SQLException {
            DataSource dataSource = getShardingDataSource();
            dropTable(dataSource);
            createTable(dataSource);
            insert(dataSource);
            updateFailure(dataSource);
        }
    

    可以看到,首先获取数据源连接池,然后执行drop语句,创建表,插入数据,再修改。在获取数据源的时候,实质初始化的是sharding-sphere的数据源。

     private static DataSource getShardingDataSource() throws SQLException {
            ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
            TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration();
            orderTableRuleConfig.setLogicTable("t_order");
            orderTableRuleConfig.setActualDataNodes("ds_trans_${0..1}.t_order_${0..1}");
            shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
            
            TableRuleConfiguration orderItemTableRuleConfig = new TableRuleConfiguration();
            orderItemTableRuleConfig.setLogicTable("t_order_item");
            orderItemTableRuleConfig.setActualDataNodes("ds_trans_${0..1}.t_order_item_${0..1}");
            shardingRuleConfig.getTableRuleConfigs().add(orderItemTableRuleConfig);
            
            shardingRuleConfig.getBindingTableGroups().add("t_order, t_order_item");
            shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new StandardShardingStrategyConfiguration("user_id", new ModuloShardingAlgorithm()));
            shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("order_id", new ModuloShardingAlgorithm()));
            return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new HashMap<String, Object>(), new Properties());
        }
    }
    
    public final class ShardingDataSourceFactory {
        public static DataSource createDataSource(Map<String, DataSource> dataSourceMap, ShardingRuleConfiguration shardingRuleConfig, Map<String, Object> configMap, Properties props) throws SQLException {
            return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), configMap, props);
        }
    
        private ShardingDataSourceFactory() {
        }
    }
    

    可以看到,最终初始化的是ShardingDataSource数据源,该数据源实现了datasource接口,最终执行逻辑,sql词法分析,sql语法分析和jdbc强行扯上了不明不白的关系。如图:

    datasource.png
    sharding-sphere.png
    再看drop语句,实质是执行了update语句。
        private static void dropTable(final DataSource dataSource) throws SQLException {
            executeUpdate(dataSource, "DROP TABLE IF EXISTS t_order_item");
            executeUpdate(dataSource, "DROP TABLE IF EXISTS t_order");
        }
    
    ![statement.png](https://img.haomeiwen.com/i3397380/7132d7299fd9ef5b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    private static void executeUpdate(final DataSource dataSource, final String sql) throws SQLException {
            try (
                    Connection conn = dataSource.getConnection();
                    PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
                preparedStatement.executeUpdate();
            }
    }
    
    public ShardingConnection getConnection() {
        return new ShardingConnection(this.shardingContext);
    }
    

    这里拿到的connection是ShardingConnection语句,connections中实质是ShardingConnection。
    而preparedStatement对象,则为ShardingPreparedStatement。

    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) {
       return new ShardingPreparedStatement(this, sql, autoGeneratedKeys);
    }
    
    statement.png
    connnection.png
    statement.png
    从类图来看,可以看出,sharding-sphere是重写了jdbc接口,包含datasource接口,connection接口,preparedStatement接口。
    而在执行sql的时候,则是调用ShardingPreparedStatementexecuteUpdate方法,如下:
    @Override
    public int executeUpdate() throws SQLException {
        try {
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
            return new PreparedStatementExecutor(
                    getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits).executeUpdate();
        } finally {
            if (routeResult != null && connection != null) {
                JDBCShardingRefreshHandler.build(routeResult, connection).execute();
            }
            clearBatch();
        }
    }
    

    可以看到,先做sql路由,获取sql执行单元,然后new一个执行器去执行,在获取执行单元的时候,首先通过sql路由引擎做服务路由,获取sql执行单元,遍历并组装参数,返回执行引擎单元,替代占位符,并返回,交由sql执行器去执行。

    private Collection<PreparedStatementUnit> route() throws SQLException {
        Collection<PreparedStatementUnit> result = new LinkedList<>();
        routeResult = routingEngine.route(getParameters());
        for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
            PreparedStatement preparedStatement = generatePreparedStatement(each);
            routedStatements.add(preparedStatement);
            replaySetParameter(preparedStatement, each.getSqlUnit().getParameterSets().get(0));
            result.add(new PreparedStatementUnit(each, preparedStatement));
        }
        return result;
    }
    
    public int executeUpdate() throws SQLException {
        List<Integer> results = executorEngine.execute(sqlType, preparedStatementUnits, new ExecuteCallback<Integer>() {
            
            @Override
            public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {
                return ((PreparedStatement) baseStatementUnit.getStatement()).executeUpdate();
            }
        });
        return accumulate(results);
    }
    

    sql执行引擎在执行的过程中,遍历执行单元,分别在不同的数据库中执行,最终合并结果集,返回结果。

    public <T> List<T> execute(
            final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
        //异步执行
        ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), executeCallback);
        T firstOutput;
        List<T> restOutputs;
        try {
            firstOutput = syncExecute(sqlType, firstInput, executeCallback);
            restOutputs = restFutures.get();
            // CHECKSTYLE:OFF
        } catch (final Exception ex) {
            // CHECKSTYLE:ON
            event.setException(ex);
            event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
            EventBusInstance.getInstance().post(event);
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
        event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
        EventBusInstance.getInstance().post(event);
        List<T> result = Lists.newLinkedList(restOutputs);
        result.add(0, firstOutput);
        return result;
    }
    

    在异步执行的时候,实质是多线程编程,future等待,最后合并结果。

    private <T> ListenableFuture<List<T>> asyncExecute(
            final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) {
        List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
        for (final BaseStatementUnit each : baseStatementUnits) {
            result.add(executorService.submit(new Callable<T>() {
                
                @Override
                public T call() throws Exception {
                    return executeInternal(sqlType, each, executeCallback, isExceptionThrown, dataMap);
                }
            }));
        }
        return Futures.allAsList(result);
    }
    
    private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback,
                                  final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
        synchronized (baseStatementUnit.getStatement().getConnection()) {
            T result;
            ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
            ExecutorDataMap.setDataMap(dataMap);
            List<AbstractExecutionEvent> events = new LinkedList<>();
            for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
                events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
            }
            for (AbstractExecutionEvent event : events) {
                EventBusInstance.getInstance().post(event);
            }
            try {
                result = executeCallback.execute(baseStatementUnit);
            } catch (final SQLException ex) {
                for (AbstractExecutionEvent each : events) {
                    each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
                    each.setException(ex);
                    EventBusInstance.getInstance().post(each);
                    ExecutorExceptionHandler.handleException(ex);
                }
                return null;
            }
            for (AbstractExecutionEvent each : events) {
                each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
                EventBusInstance.getInstance().post(each);
            }
            return result;
        }
    }
    

    fyi

    相关文章

      网友评论

          本文标题:sharding-sphere之sql执行那些事

          本文链接:https://www.haomeiwen.com/subject/evtxpftx.html