美文网首页
sharding-jdbc(3.0.0)源码解析-insert执

sharding-jdbc(3.0.0)源码解析-insert执

作者: 李_lifuqing | 来源:发表于2019-04-03 12:23 被阅读0次

1.基础介绍

  • sharing-jdbc是一个在客户端的数据源层面实现分库分表的中间件,对应分析源码首先要找到代码执行的入口,对于一个数据库操作入口当然是Statement的相关接口,所以我们应该抛开各种ORM框架,用原始的Statement来分析sharding-jdbc源码
  • Statement是jdbc中用来执行静态sql,并得到返回的接口的抽象接口,本文主要介绍在sharding-jdbc中来实现Statement的实现类:ShardingStatement

2.总体概括

以下为官方网站提供的流程图


image.png

3.看源码

  1. 调用,基础的调用方式回顾,此时的Statement就是ShardingStatement

        String sql="insert into t_order (name) VALUES  (\"我是4\")";
        Connection connection=dataSource.getConnection();
        // ShardingStatement 
        Statement statement=connection.createStatement();
        statement.executeUpdate(sql);
  1. ShardingStatement.executeUpdate;执行更新的sql;重点的方法为sqlRoute,所以后续会主要分析这个方法
 @Override
    public int executeUpdate(final String sql) throws SQLException {
        try {
           //这一步是调用statementExecutor.clear方法,目的是清空上次执行的语句,参数,结果
            clearPrevious();
            //这个方法就实现了,SQL解析,查询优化,SQL路由,SQL改写任务
            sqlRoute(sql);
            //赋值到StatementExecutor(一个用来执行真正sql的包装类)
            initStatementExecutor();
            //对多个库进行真正的更新操作
            return statementExecutor.executeUpdate();
        } finally {
            refreshTableMetaData();
            currentResultSet = null;
        }
    }
  1. ShardingStatement.sqlRoute;SQL解析,查询优化,SQL路由,SQL改写
private void sqlRoute(final String sql) {
    //获取传递参数的上下文对象
        ShardingContext shardingContext = connection.getShardingContext();
//StatementRoutingEngine;用来封装执行分库分表逻辑和主从逻辑,此时调用此类的route方法
        routeResult = new StatementRoutingEngine(shardingContext.getShardingRule(),
            shardingContext.getMetaData().getTable(), shardingContext.getDatabaseType(), shardingContext.isShowSQL(), shardingContext.getMetaData().getDataSource()).route(sql);
    }
  1. StatementRoutingEngine.route:里面就是调用了ShardingRouter.route方法,并将结果传给ShardingMasterSlaveRouter.route方法
public SQLRouteResult route(final String logicSQL) {
       //解析SQL转化为SQLStatement,表示这个SQL的对象类
        SQLStatement sqlStatement = shardingRouter.parse(logicSQL, false);
      //首先调用ShardingRouter.route,得到SQLRouteResult:逻辑SQL经过优化,路由,改写后的结果对象,如下图
        return masterSlaveRouter.route(shardingRouter.route(logicSQL, Collections.emptyList(), sqlStatement));
    }

SQLRouteResult对象内容:


image.png
  1. ParsingSQLRouter.parse:解析逻辑sql,形成SQL执行对象
public SQLStatement parse(final boolean useCache) {
        // SQL解析缓存,有则返回
        Optional<SQLStatement> cachedSQLStatement = getSQLStatementFromCache(useCache);
        if (cachedSQLStatement.isPresent()) {
            return cachedSQLStatement.get();
        }
        //词法解析器
        LexerEngine lexerEngine = LexerEngineFactory.newInstance(dbType, sql);
        lexerEngine.nextToken();
       //语法解析结果
        SQLStatement result = SQLParserFactory.newInstance(dbType, lexerEngine.getCurrentToken().getType(), shardingRule, lexerEngine, shardingTableMetaData).parse();
       //添加缓存
        if (useCache) {
            ParsingResultCache.getInstance().put(sql, result);
        }
        return result;
    }
  1. ParsingSQLRouter.route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement):执行查询优化,SQL路由,SQL改写,当时insert并且没有手动写入id时,则此时会生成分布式ID
@Override
    public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        //判断是否是insert,如果是则生成分布式主键ID
        GeneratedKey generatedKey = null;
        if (sqlStatement instanceof InsertStatement) {
            generatedKey = getGenerateKey(shardingRule, (InsertStatement) sqlStatement, parameters);
        }
       //初始化返回结果
        SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey);
      //调用优化引擎优化SQL
        ShardingConditions shardingConditions = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey).optimize();
       //赋值分布式主键
        if (null != generatedKey) {
            setGeneratedKeys(result, generatedKey);
        }
       //根据CRUD调用不同的路由引擎获取应该操作的物理库和物理表,形成路由结果,后续会分析该方法
        RoutingResult routingResult = route(sqlStatement, shardingConditions);
       //初始化重写引擎
        SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement, shardingConditions, parameters);
       //判断是否路由到一个物理库中
        boolean isSingleRouting = routingResult.isSingleRouting();
       //处理在路由到多个物理库时处理limit语法
        if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
            processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
        }
        //按照路由结果重写SQL,生成物理库可执行的SQL
        SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
        for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
            result.getRouteUnits().add(new RouteUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder, shardingDataSourceMetaData)));
        }
        //是否打印最终的SQL
        if (showSQL) {
            SQLLogger.logSQL(logicSQL, sqlStatement, result.getRouteUnits());
        }
        return result;
    }

7.ParsingSQLRouter.route(final SQLStatement sqlStatement, final ShardingConditions shardingConditions):调用分库分表策略生成分库分表结果

private RoutingResult route(final SQLStatement sqlStatement, final ShardingConditions shardingConditions) {
        Collection<String> tableNames = sqlStatement.getTables().getTableNames();
        RoutingEngine routingEngine;
        if (sqlStatement instanceof UseStatement) {
            routingEngine = new IgnoreRoutingEngine();
        } else if (sqlStatement instanceof DDLStatement || (sqlStatement instanceof DCLStatement && ((DCLStatement) sqlStatement).isGrantForSingleTable())) {
            routingEngine = new TableBroadcastRoutingEngine(shardingRule, sqlStatement);
        } else if (sqlStatement instanceof ShowDatabasesStatement || sqlStatement instanceof ShowTablesStatement) {
            routingEngine = new DatabaseBroadcastRoutingEngine(shardingRule);
        } else if (sqlStatement instanceof DCLStatement) {
            routingEngine = new InstanceBroadcastRoutingEngine(shardingRule, shardingDataSourceMetaData);
        } else if (shardingConditions.isAlwaysFalse()) {
            routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
        } else if (sqlStatement instanceof DALStatement) {
            routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
        } else if (tableNames.isEmpty() && sqlStatement instanceof SelectStatement) {
            routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
        } else if (tableNames.isEmpty()) {
            routingEngine = new DatabaseBroadcastRoutingEngine(shardingRule);
        // CRUD语句会进入下面其中一个
        } else if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
            routingEngine = new StandardRoutingEngine(shardingRule, tableNames.iterator().next(), shardingConditions);
        } else {
            // TODO config for cartesian set
            routingEngine = new ComplexRoutingEngine(shardingRule, tableNames, shardingConditions);
        }
        return routingEngine.route();
    }

StandardRoutingEngine.route:查看进去会看到,进入到如下源码

private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
        Collection<DataNode> result = new LinkedList<>();
        if (shardingConditions.getShardingConditions().isEmpty()) {
            result.addAll(route(tableRule, Collections.<ShardingValue>emptyList(), Collections.<ShardingValue>emptyList()));
        } else {
            //获取配置的分库策略类,里面会调用我们的比如根据Id做hash的分库算法等等
            ShardingStrategy dataBaseShardingStrategy = shardingRule.getDatabaseShardingStrategy(tableRule);
        
            ShardingStrategy tableShardingStrategy = shardingRule.getTableShardingStrategy(tableRule);
            for (ShardingCondition each : shardingConditions.getShardingConditions()) {
                List<ShardingValue> databaseShardingValues = isGettingShardingValuesFromHint(dataBaseShardingStrategy)
                        ? getDatabaseShardingValuesFromHint() : getShardingValues(dataBaseShardingStrategy.getShardingColumns(), each);
                List<ShardingValue> tableShardingValues = isGettingShardingValuesFromHint(tableShardingStrategy)
                        ? getTableShardingValuesFromHint() : getShardingValues(tableShardingStrategy.getShardingColumns(), each);
                Collection<DataNode> dataNodes = route(tableRule, databaseShardingValues, tableShardingValues);
                reviseShardingConditions(each, dataNodes);
                result.addAll(dataNodes);
            }
        }
        return result;
    }

4.总结

此文是一个简单的流程分析,希望帮助大家,对整个流程有个认知,为后续关键节点的实现有个整体的概览

相关文章

网友评论

      本文标题:sharding-jdbc(3.0.0)源码解析-insert执

      本文链接:https://www.haomeiwen.com/subject/tbmabqtx.html