美文网首页
Mycat路由

Mycat路由

作者: tony_0c73 | 来源:发表于2018-05-06 18:39 被阅读0次

    路由接口

    io.mycat.route.RouteService
    方法:

    RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
                int sqlType, String stmt, String charset, ServerConnection sc)
    

    计算流程概述

    image.png image.png

    conditions 为<表名,字段名,字段值> 3元组

    单表计算流程


    image.png

    代码分析

    AbstractRouteStrategy

        @Override
        public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
                String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
            //对应schema标签checkSQLschema属性,把表示schema的字符去掉
            if (schema.isCheckSQLSchema()) {
                origSQL = RouterUtil.removeSchema(origSQL, schema.getName());
            }
    
            /**
         * 处理一些路由之前的逻辑
         * 全局序列号,父子表插入
         */
            if ( beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
                return null;
            }
    
            /**
             * SQL 语句拦截
             */
            String stmt = MycatServer.getInstance().getSqlInterceptor().interceptSQL(origSQL, sqlType);
            if (!origSQL.equals(stmt) && LOGGER.isDebugEnabled()) {
                LOGGER.debug("sql intercepted to " + stmt + " from " + origSQL);
            }
            RouteResultset rrs = new RouteResultset(stmt, sqlType);
    
            /**
             * 优化debug loaddata输出cache的日志会极大降低性能
             */
            if (LOGGER.isDebugEnabled() && origSQL.startsWith(LoadData.loadDataHint)) {
                rrs.setCacheAble(false);
            }
            /**
             * rrs携带ServerConnection的autocommit状态用于在sql解析的时候遇到
             * select ... for update的时候动态设定RouteResultsetNode的canRunInReadDB属性
             */
            if (sc != null ) {
                rrs.setAutocommit(sc.isAutocommit());
            }
            /**
             * DDL 语句的路由
             */
            if (ServerParse.DDL == sqlType) {
                return RouterUtil.routeToDDLNode(rrs, sqlType, stmt, schema);
            }
    
            /**
             * 检查是否有分片
             */
            if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
                rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
            } else {
                RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
                if (returnedSet == null) {
                    rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
                }
            }
            return rrs;
        }
    

    从上面代码获知路由计算是在方法 routeNormalSqlWithAST中,这个是个抽象方法由子类来实现,目前是DruidMycatRouteStrategy

    DruidMycatRouteStrategy.routeNormalSqlWithAST 路由是通过以下方法进行的

    /**
         *  直接结果路由
         */
        private RouteResultset directRoute(RouteResultset rrs,DruidShardingParseInfo ctx,SchemaConfig schema,
                                            DruidParser druidParser,SQLStatement statement,LayerCachePool cachePool) throws SQLNonTransientException{
            
            //改写sql:如insert语句主键自增长, 在直接结果路由的情况下,进行sql 改写处理
            druidParser.changeSql(schema, rrs, statement,cachePool);
            /**
             * DruidParser 解析过程中已完成了路由的直接返回
             */
            if ( rrs.isFinishedRoute() ) {
                return rrs;
            }
            /**
             * 没有from的select语句或其他
             */
            if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty()))
            {
                return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), druidParser.getCtx().getSql());
            }
            //如果没有路由计算单元,设置一个
            if(druidParser.getCtx().getRouteCalculateUnits().size() == 0) {
                RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit();
                druidParser.getCtx().addRouteCalculateUnit(routeCalculateUnit);
            }
            
            SortedSet<RouteResultsetNode> nodeSet = new TreeSet<RouteResultsetNode>();
            boolean isAllGlobalTable = RouterUtil.isAllGlobalTable(ctx, schema);
            //对sql解析出来的每个路由计算单元进行路由计算,计算结果合并存在nodeSet
            //nodeset是一个set会自动去掉重复元素
            for(RouteCalculateUnit unit: druidParser.getCtx().getRouteCalculateUnits()) {
                RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, druidParser.getCtx(), unit, rrs, isSelect(statement), cachePool);
                if(rrsTmp != null&&rrsTmp.getNodes()!=null) {
                    for(RouteResultsetNode node :rrsTmp.getNodes()) {
                        nodeSet.add(node);
                    }
                }
                if(isAllGlobalTable) {//都是全局表时只计算一遍路由
                    break;
                }
            }
            RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()];
            int i = 0;
            for (RouteResultsetNode aNodeSet : nodeSet) {
                nodes[i] = aNodeSet;
                 //如果是insert语句,并且只是单表,是注册过的表,并且是slot的sql,修改语句
                  if(statement instanceof MySqlInsertStatement &&ctx.getTables().size()==1&&schema.getTables().containsKey(ctx.getTables().get(0))) {
                      RuleConfig rule = schema.getTables().get(ctx.getTables().get(0)).getRule();
                      if(rule!=null&&  rule.getRuleAlgorithm() instanceof SlotFunction){
                         //修改语句
                         aNodeSet.setStatement(ParseUtil.changeInsertAddSlot(aNodeSet.getStatement(),aNodeSet.getSlot()));
                      }
                  }
                i++;
            }       
            rrs.setNodes(nodes);
            //分表
            /**
             *  subTables="t_order$1-2,t_order3"
             *目前分表 1.6 开始支持 幵丏 dataNode 在分表条件下只能配置一个,分表条件下不支持join。
             */
            if(rrs.isDistTable()){
                return this.routeDisTable(statement,rrs);
            }
            return rrs;
        }
    

    路由通过tryRouteForTables 计算获取,代码如下

    /**
         * 多表路由
         */
        public static RouteResultset tryRouteForTables(SchemaConfig schema, DruidShardingParseInfo ctx,
                RouteCalculateUnit routeUnit, RouteResultset rrs, boolean isSelect, LayerCachePool cachePool)
                throws SQLNonTransientException {
            List<String> tables = ctx.getTables();
            if(schema.isNoSharding()||(tables.size() >= 1&&isNoSharding(schema,tables.get(0)))) {
                return routeToSingleNode(rrs, schema.getDataNode(), ctx.getSql());
            }
            //只有一个表的
            if(tables.size() == 1) {
                return RouterUtil.tryRouteForOneTable(schema, ctx, routeUnit, tables.get(0), rrs, isSelect, cachePool);
            }
            Set<String> retNodesSet = new HashSet<String>();
            //每个表对应的路由映射
            Map<String,Set<String>> tablesRouteMap = new HashMap<String,Set<String>>();
    
            //分库解析信息不为空
            Map<String, Map<String, Set<ColumnRoutePair>>> tablesAndConditions = routeUnit.getTablesAndConditions();
            if(tablesAndConditions != null && tablesAndConditions.size() > 0) {
                //为分库表找路由
                RouterUtil.findRouteWithcConditionsForTables(schema, rrs, tablesAndConditions, tablesRouteMap, ctx.getSql(), cachePool, isSelect);
                if(rrs.isFinishedRoute()) {
                    return rrs;
                }
            }
            //为全局表和单库表找路由
            for(String tableName : tables) {
                TableConfig tableConfig = schema.getTables().get(tableName.toUpperCase());
                if(tableConfig == null) {
                    //add 如果表读取不到则先将表名从别名中读取转化后再读取
                    String alias = ctx.getTableAliasMap().get(tableName);
                    if(!StringUtil.isEmpty(alias)){
                        tableConfig = schema.getTables().get(alias.toUpperCase());
                    }
                    if(tableConfig == null){
                        String msg = "can't find table define in schema "+ tableName + " schema:" + schema.getName();
                        LOGGER.warn(msg);
                        throw new SQLNonTransientException(msg);
                    }
                    
                }
                if(tableConfig.isGlobalTable()) {//全局表
                    if(tablesRouteMap.get(tableName) == null) {
                        tablesRouteMap.put(tableName, new HashSet<String>());
                    }
                    tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
                } else if(tablesRouteMap.get(tableName) == null) { //余下的表都是单库表
                    tablesRouteMap.put(tableName, new HashSet<String>());
                    tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
                }
            }
    
            boolean isFirstAdd = true;
            for(Map.Entry<String, Set<String>> entry : tablesRouteMap.entrySet()) {
                if(entry.getValue() == null || entry.getValue().size() == 0) {
                    throw new SQLNonTransientException("parent key can't find any valid datanode ");
                } else {
                    if(isFirstAdd) {
                        retNodesSet.addAll(entry.getValue());
                        isFirstAdd = false;
                    } else {
                        retNodesSet.retainAll(entry.getValue());
                        if(retNodesSet.size() == 0) {//两个表的路由无交集
                            String errMsg = "invalid route in sql, multi tables found but datanode has no intersection "
                                    + " sql:" + ctx.getSql();
                            LOGGER.warn(errMsg);
                            throw new SQLNonTransientException(errMsg);
                        }
                    }
                }
            }
    
            if(retNodesSet != null && retNodesSet.size() > 0) {
                String tableName = tables.get(0);
                TableConfig tableConfig = schema.getTables().get(tableName.toUpperCase());
                if(tableConfig.isDistTable()){
                    routeToDistTableNode(tableName,schema, rrs, ctx.getSql(), tablesAndConditions, cachePool, isSelect);
                    return rrs;
                }
    
                if(retNodesSet.size() > 1 && isAllGlobalTable(ctx, schema)) {
                    // mulit routes ,not cache route result
                    if (isSelect) {
                        rrs.setCacheAble(false);
                        routeToSingleNode(rrs, retNodesSet.iterator().next(), ctx.getSql());
                    }
                    else {//delete 删除全局表的记录
                        routeToMultiNode(isSelect, rrs, retNodesSet, ctx.getSql(),true);
                    }
    
                } else {
                    routeToMultiNode(isSelect, rrs, retNodesSet, ctx.getSql());
                }
            }
            return rrs;
        }
    
    

    对sql解析出来的每个路由计算单元进行路由计算

    路由计算单元

    类:DefaultDruidParser

    /**
    conditionList  是从sql中提取的查询条件包含信息<表名,字段名,值,操作符>, 如果values为多值 操作符为between,建构RangeValue。
    一个sql语句中是用or来分割的,每个List<Condition>为一个or分割的条件list
    
    **/
    private List<RouteCalculateUnit> buildRouteCalculateUnits(SchemaStatVisitor visitor, List<List<Condition>> conditionList) {
            List<RouteCalculateUnit> retList = new ArrayList<RouteCalculateUnit>();
            //遍历condition ,找分片字段
            for(int i = 0; i < conditionList.size(); i++) {
                RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit();
                for(Condition condition : conditionList.get(i)) {
                    List<Object> values = condition.getValues();
                    if(values.size() == 0) {
                        continue;  
                    }
                    if(checkConditionValues(values)) {
                        String columnName = StringUtil.removeBackquote(condition.getColumn().getName().toUpperCase());
                        String tableName = StringUtil.removeBackquote(condition.getColumn().getTable().toUpperCase());
                        
                        if(visitor.getAliasMap() != null && visitor.getAliasMap().get(tableName) != null 
                                && !visitor.getAliasMap().get(tableName).equals(tableName)) {
                            tableName = visitor.getAliasMap().get(tableName);
                        }
    
                        if(visitor.getAliasMap() != null && visitor.getAliasMap().get(StringUtil.removeBackquote(condition.getColumn().getTable().toUpperCase())) == null) {//子查询的别名条件忽略掉,不参数路由计算,否则后面找不到表
                            continue;
                        }
                        
                        String operator = condition.getOperator();
                        
                        //只处理between ,in和=3中操作符
                        if(operator.equals("between")) {
                            RangeValue rv = new RangeValue(values.get(0), values.get(1), RangeValue.EE);
                                    routeCalculateUnit.addShardingExpr(tableName.toUpperCase(), columnName, rv);
                        } else if(operator.equals("=") || operator.toLowerCase().equals("in")){ //只处理=号和in操作符,其他忽略
                                    routeCalculateUnit.addShardingExpr(tableName.toUpperCase(), columnName, values.toArray());
                        }
                    }
                }
                retList.add(routeCalculateUnit);
            }
            return retList;
        }
    

    相关文章

      网友评论

          本文标题:Mycat路由

          本文链接:https://www.haomeiwen.com/subject/jewurftx.html