美文网首页
WideTableMultiDimSQLParser 解析说明:

WideTableMultiDimSQLParser 解析说明:

作者: 光剑书架上的书 | 来源:发表于2022-03-09 14:03 被阅读0次

    WideTableMultiDimSQLParser 解析说明

    1.ClickHouse 数组交并差运算

    --交 t[1] ∩ t[2] : arrayIntersect(t[1], t[2])
    select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
    from (
             select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
                    array(
                                (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                                (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                                (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                        )                                               t
             ) t;
    
    --并 t[1] ∪ t[2]: arrayConcat(t[1], t[2])
    select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
    from (
             select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                    array(
                                (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                                (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                                (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                        )                                         t
             ) t;
    
    --差 t[1]-t[2] : arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])
    select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
    from (
             select arrayIntersect(t[3], arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])) as res,
                    array(
                                (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                                (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                                (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                        )                                                                                             t
             ) t;
    
    --并
    select length(arrayDistinct(t.res))
    from (
             select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                    array(
                                (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                                (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                                (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                        )                                         t
             ) t;
    
    
    

    ClickHouse :

    (arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
    (select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
    (select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
    (select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
    (select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
    (select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
    (select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))
    

    2.Hive 数组交并差运算:

    select
        array_intersect(array(1, 2), array(2, 3)) i,
        array_union(array(1, 2), array(2, 3)) u,
        array_except(array(1, 2), array(2, 3)) e;
    

    Hive:

    (array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
    (select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
    (select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
    (select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
    (select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
    (select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
    (select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))  
    

    附源码

    
    data class TagIdx(var kexprId: Int, var tagCode: String, var tagOptionCode: String, var conditionExpr: String, var index: Int)
    
    fun isLeafNode(e: KunLunExpression) = CollectionUtils.isEmpty(e.subExpression)
    
    fun tagOptionConditions(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): List<TagIdx> {
        val tagIdxList = mutableListOf<TagIdx>()
        //递归解析rule表达式,打平成过滤条件列表
        val kexpr: KunLunExpression = requestDTO.expression
        parseTagIdx(kexpr, tagIdxList, tableMappingMap)
        // 设置 index 字段值,用索引下标+1
        tagIdxList.forEachIndexed { index, tagIdx ->
            tagIdx.index = index + 1
        }
        return tagIdxList
    }
    
    
    fun parseTagIdx(kexpr: KunLunExpression, tagIdxList: MutableList<TagIdx>, tableMappingMap: Map<String, List<KTableMapping>>) {
        val fieldCondition = kexpr.fieldCondition
        if (null != fieldCondition) {
    
            val dimFilter = StringBuilder()
            // 维度过滤条件,每个标签 TableCode 上都有自己的维度.真正用于过滤的是 FieldCode,所以 fieldCondition 这里加上: tagDimCondition
            val dimConditionList = kexpr.fieldCondition.dimConditionList
    
            if (CollectionUtils.isEmpty(dimConditionList)) {
                dimFilter.append(" 1=1 ")
            } else {
                val lastIndex = dimConditionList.size - 1
                dimConditionList.forEachIndexed { index, dimField ->
    
                    val dimTagCode = dimField.tableCode
                    val dimFieldCode = dimField.fieldCode
                    val dimKTableMapping = tableMappingMap[dimTagCode]!![0]
                    val dimPhysicalField = dimKTableMapping.fields.first { it.srcField.columnCode == dimFieldCode }.dstField
                    val dimPhysicalcolumnCode = dimPhysicalField.columnCode
                    val dimFieldValueType = dimPhysicalField.fieldType
                    val v = parseFieldValue(dimField, dimFieldValueType)
                    val singleValue = v.get(0)?.sqlCondition
    
                    if (index != lastIndex) {
                        dimFilter.append(" $dimPhysicalcolumnCode = $singleValue and ")
                    } else {
                        dimFilter.append(" $dimPhysicalcolumnCode = $singleValue ")
                    }
                }
            }
    
            val tagCode = fieldCondition.tableCode
            val fieldCode = fieldCondition.fieldCode
            val KTableMapping = tableMappingMap[tagCode]!![0]
    
            val physicalField = KTableMapping.fields.first { it.srcField.columnCode == fieldCode }.dstField
            val physicalcolumnCode = physicalField.columnCode
            val fieldValueType = physicalField.fieldType
            val targetFieldCode = KTableMapping.targetField.columnCode
            val dbName = KTableMapping.physicDBName
            val tableName = KTableMapping.getkTableCode()
            val filterConditionClause = genFilterConditionClause(fieldCondition, physicalcolumnCode, fieldValueType)
    
            val line = "select collect_set($targetFieldCode) from $dbName.$tableName where ( $dimFilter ) and ( $filterConditionClause )"
            val tagIdx = TagIdx(kexprId = kexpr.tfId, tagCode = tagCode, tagOptionCode = fieldCode, conditionExpr = line, index = -1) // index 先设置默认值 -1
            tagIdxList.add(tagIdx)
        }
        // 递归子语句
        kexpr.subExpression?.forEach {
            parseTagIdx(it, tagIdxList, tableMappingMap)
        }
    }
    
    fun genFilterConditionClause(fieldCondition: FieldCondition, physicalField: String, fieldValueType: KFieldValueType): String {
        val fv = parseFieldValue(fieldCondition, fieldValueType)
        if (CollectionUtils.isEmpty(fv)) {
            throw IllegalArgumentException("fieldCondition must have fieldValue!")
        }
        val size = fv.size
        // 多值(1,2,3,4)
        val listValue = StringBuilder()
        listValue.append("(")
        fv.forEachIndexed { index, fieldValue ->
            if (index == size - 1)
                listValue.append(fieldValue?.sqlCondition)
            else
                listValue.append(fieldValue?.sqlCondition).append(",")
        }
        listValue.append(")")
        // 单值
        val singleValue = fv.get(0)?.sqlCondition
        val singleValueNoQuote = fv.get(0)?.qlCondition
    
        var conditionExpr = ""
        conditionExpr = when (fieldCondition.operator) {
            ArithmeticOperatorEnum.LIKE -> "  like '%${singleValueNoQuote}%' "
            ArithmeticOperatorEnum.EQUAL -> "    = ${singleValue} "
            ArithmeticOperatorEnum.GREATER_EQUAL_THAN -> "    >= ${singleValue} "
            ArithmeticOperatorEnum.LESS_THAN -> "    < ${singleValue} "
            ArithmeticOperatorEnum.LESS_EQUAL_THAN -> "    <= ${singleValue} "
            ArithmeticOperatorEnum.GREATER_THAN -> "    > ${singleValue} "
            ArithmeticOperatorEnum.BETWEEN -> "    between ${fv.get(0)?.sqlCondition} and ${fv.get(1)?.sqlCondition} "
            ArithmeticOperatorEnum.IN -> "    in ${listValue} "
            ArithmeticOperatorEnum.NOT_IN -> "    not in ${listValue} "
    
            else -> throw IllegalStateException("${fieldCondition.operator} not supported yet")
        }
    
        return " $physicalField $conditionExpr "
    }
    
    /**
     * 解析 fieldValue 值
     */
    fun parseFieldValue(fieldCondition: FieldCondition, fieldValueType: KFieldValueType): List<FieldValue<*>?> {
        val values = fieldCondition.values
        if (values == null || values.isEmpty()) {
            ExceptionHelper.bizError("illegal value size,values length must greater than 0.")
        }
    
        // 特征值类型
        lateinit var clazz: Class<out FieldValue<*>>
        when (fieldValueType) {
            KFieldValueType.STRING -> clazz = StringFieldValue::class.java
            KFieldValueType.LONG -> clazz = LongFieldValue::class.java
            KFieldValueType.DOUBLE -> clazz = DoubleFieldValue::class.java
            else -> ExceptionHelper.bizError("$fieldValueType fieldValueType not supported!")
        }
        return FieldValue.create(clazz, *values.toTypedArray())
    }
    
    
    
    
    
    /**
     * 递归遍历KunLun表达式,并添加tagCode/ objectSet.
     */
    fun recurExtractTagCodeAndObjectSet(expression: KunLunExpression, tagBaseFieldList: MutableList<TagBaseField>, objectSetList: MutableList<String>) {
    
        // 子表达式为空,递归结束
        if (isLeafNode(expression)) {
            val fieldCondition = expression.fieldCondition
    
            // 添加分群
            if (StringUtils.isNotEmpty(fieldCondition.objectSetId)) {
                objectSetList.add(fieldCondition.objectSetId)
            } else {
                // 添加标签
                val tagBaseField = TagBaseField()
                tagBaseField.tableCode = fieldCondition.tableCode
                tagBaseField.fieldCode = fieldCondition.fieldCode
                tagBaseFieldList.add(tagBaseField)
            }
            return
        }
    
        // 递归遍历子节点
        for (subExpression in expression.subExpression) {
            recurExtractTagCodeAndObjectSet(subExpression, tagBaseFieldList, objectSetList)
        }
    }
    
    @Service
    class CommonParseUtils {
    
    
        fun getTableMappingMap(tenant: Tenant, requestDTO: SQLQueryReqDTO): Map<String, List<KTableMapping>> {
            // 标签 & 分群
            val tagBaseFieldList: MutableList<TagBaseField> = mutableListOf()
            val objectSetList: MutableList<String> = mutableListOf()
            recurExtractTagCodeAndObjectSet(requestDTO.getExpression(), tagBaseFieldList, objectSetList)
            // META
            val tableMappingList: List<KTableMapping> = getTagCodeTableMapping(tenant.id, tagBaseFieldList, requestDTO.getDriverType())
            return tableMappingList.groupBy { it.tableCode }
        }
    
        /**
         * 获取KunLun表达式中所有标签对应物理表的映射关系.
         */
        fun getTagCodeTableMapping(tenantId: Long, tagBaseFieldList: List<TagBaseField>, driverType: DriverType): List<KTableMapping> {
            if (CollectionUtils.isEmpty(tagBaseFieldList)) {
                return emptyList()
            }
    
            // 获取映射关系
            // TODO 元数据: kTableMappings
            val kTableMappings: List<KTableMapping> = ArrayList()
    
            val tagCodeTableMapping = kTableMappings.stream().collect(Collectors.toMap({ obj: KTableMapping -> obj.tableCode }, Function.identity()))
    
            // check
            for (tagBaseField in tagBaseFieldList) {
                val kTableMapping = tagCodeTableMapping[tagBaseField.tableCode] ?: throw ExceptionHelper.bizError(String.format("tag code [%s] is non-exists", tagBaseField.tableCode))
                val fields = kTableMapping.fields
                val existsTagOption = fields.stream().noneMatch { kFieldMapping: KFieldMapping -> kFieldMapping.srcField.columnCode == tagBaseField.fieldCode }
                if (!existsTagOption) {
                    throw ExceptionHelper.bizError(String.format("tag option [%s] is non-exists", tagBaseField.fieldCode))
                }
            }
            return kTableMappings
        }
    
    }
    
    
    
    
    /**
     * 宽表多维标签CH SQL 解析器
     * @author chenguangjian.jk
     * @date 2022-03-09 02:28:48
     */
    @Service
    class WideTableMultiDimCHSQLParser {
        val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
    
        @Resource
        lateinit var commonParseUtils: CommonParseUtils
    
        /**
         * 宽表多维标签预估 SQL
         */
        fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
            val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
            // Parse KunLunExpression
            return WIDE_TABLE_COUNT_SQL_TEMPLATE(
                expr = expr(requestDTO, tableMappingMap),
                arrayLines = arrayLines(requestDTO, tableMappingMap)
            )
        }
    
    
        /**
         * 宽表多维标签圈选 SQL
         */
        fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
            val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
            
            val csvFile = ""
            // Parse KunLunExpression
            return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
                expr = expr(requestDTO, tableMappingMap),
                arrayLines = arrayLines(requestDTO, tableMappingMap),
                csvFile = csvFile,
            )
        }
    
    
        fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
            val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
            val exprMap = tagIdxs.groupBy { it.kexprId }
            return genWhereClause(exprMap, requestDTO.expression)
        }
    
    
        private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String {
            val subExpression = kunLunExpression.subExpression
            if (CollectionUtils.isEmpty(subExpression)) { // 叶子节点
                return ""
            }
    
            val w = StringBuffer()
            val size = subExpression.size
            val logic = kunLunExpression.logic
    
            w.append("(")
    
            if (logic == LogicOperatorEnum.AND) {
                w.append("arrayIntersect(")
            } else if (logic == LogicOperatorEnum.OR) {
                w.append("arrayConcat(")
            } else if (logic == LogicOperatorEnum.EXCEPT) {
                w.append("arrayMap(x->multiIf(x not in arrayIntersect(")
            } else {
                throw IllegalArgumentException("logic $logic not supported!")
            }
    
            var firstTagIdx: Int = 1
            subExpression.forEachIndexed { index, e ->
                // 最叶子节点
                if (isLeafNode(e)) {
                    val targetTagIdx = exprMap[e.tfId]?.get(0)
                    val tagIdx = targetTagIdx!!.index
    
                    // 计算差集使用
                    if (index == 0) {
                        firstTagIdx = tagIdx
                    }
    
                    if (index != size - 1) {
                        w.append("t[$tagIdx],")
                    } else {
                        w.append("t[$tagIdx]")
                    }
                }
                // 递归非叶子节点
                else {
                    w.append(genWhereClause(exprMap, e))
                }
            }
    
            if (logic == LogicOperatorEnum.AND || logic == LogicOperatorEnum.OR) {
                w.append("))")
            } else if (logic == LogicOperatorEnum.EXCEPT) {
                w.append("), x, NULL), t[$firstTagIdx]))")
            }
    
            return w.toString()
        }
    
    
        /**
         * 生成 arrayLines (最后一行没有: , 逗号)
        (select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
        (select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
        (select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
         */
        fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
            val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
            val size = tagIdxs.size
            val arrayLines = StringBuffer()
    
            tagIdxs.forEachIndexed { index, tagIdx ->
                if (index != size - 1) {
                    arrayLines.append("(${tagIdx.conditionExpr}), \n")
                } else {
                    arrayLines.append("(${tagIdx.conditionExpr})  \n")
                }
            }
            return arrayLines.toString()
        }
    
    
        /**
        select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
        from (
        select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
        array(
        (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
        (select groupUniqArray(UserID) from hits_v1 where Age > 18),
        (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
        ) t
        ) t
         */
        private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr: String,
            arrayLines: String,
        ) = """
    select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
    from (
        select $expr as res,
        array(
        $arrayLines
        ) t
    ) t
    """
    
    
        /**
        select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
        from (
        select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
        array(
        (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
        (select groupUniqArray(UserID) from hits_v1 where Age > 18),
        (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
        ) t
        ) t
        INTO OUTFILE 'tos:///xxx' FORMAT CSV
        settings distributed_perfect_shard=1,max_execution_time = 600
         */
        private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr: String,
            arrayLines: String,
            csvFile: String,
        ) = """
    select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
    from (
        select $expr as res,
        array(
        $arrayLines
        ) t
    ) t
    INTO OUTFILE 'tos:///xxx' FORMAT CSV
    settings distributed_perfect_shard=1,max_execution_time = 600
    """
    
    
    }
    
    
    /**
    tagIdxList=[{"conditionExpr":"select groupUniqArray(user_id) from db1.table1 where (  cate_id = '1001'  ) and (  f1     = '1'   )","index":1,"kexprId":684563482,"tagCode":"t1","tagOptionCode":"f1"},{"conditionExpr":"select groupUniqArray(user_id) from db2.table2 where (  cate_id = '1002'  ) and (  f2     = '22'   )","index":2,"kexprId":684642314,"tagCode":"t2","tagOptionCode":"f2"},{"conditionExpr":"select groupUniqArray(user_id) from db2.table2 where (  shop_id = '798322'  ) and (  f3     = 333   )","index":3,"kexprId":568144263,"tagCode":"t2","tagOptionCode":"f3"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1004'  ) and (  f4     = '4'   )","index":4,"kexprId":684626037,"tagCode":"t3","tagOptionCode":"f4"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1005'  ) and (  f5     = 5   )","index":5,"kexprId":684627036,"tagCode":"t3","tagOptionCode":"f5"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1006'  ) and (  f6     = 6   )","index":6,"kexprId":684628027,"tagCode":"t3","tagOptionCode":"f6"}]
    (arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
    (select groupUniqArray(user_id) from db1.table1 where (  cate_id = '1001'  ) and (  f1     = '1'   )),
    (select groupUniqArray(user_id) from db2.table2 where (  cate_id = '1002'  ) and (  f2     = '22'   )),
    (select groupUniqArray(user_id) from db2.table2 where (  shop_id = '798322'  ) and (  f3     = 333   )),
    (select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1004'  ) and (  f4     = '4'   )),
    (select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1005'  ) and (  f5     = 5   )),
    (select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1006'  ) and (  f6     = 6   ))
     */
    fun main() {
        val requestDTO = SQLQueryReqDTO()
        val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
        val expression = KunLunExpression()
        expression.logic = LogicOperatorEnum.EXCEPT
        val subExpressionList = arrayListOf<KunLunExpression>()
        val e1 = KunLunExpression()
        val e2 = KunLunExpression()
        val e3 = KunLunExpression()
    
        val dimList = listOf(
            FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
            FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
        )
    
        e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
        e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
        e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
        e3.logic = LogicOperatorEnum.AND
    
        val e3SubExpressionList = arrayListOf<KunLunExpression>()
        val e31 = KunLunExpression()
        val e32 = KunLunExpression()
        val e33 = KunLunExpression()
        e3SubExpressionList.add(e31)
        e3SubExpressionList.add(e32)
        e3SubExpressionList.add(e33)
        e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
        e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
        e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
        e3.subExpression = e3SubExpressionList
    
        subExpressionList.add(e1)
        subExpressionList.add(e2)
        subExpressionList.add(e3)
        expression.subExpression = subExpressionList
        requestDTO.expression = expression
    
        // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
        // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
        // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
        // KFieldMapping(KField srcField, KField dstField)
    
        tableMappingMap["t1"] = listOf(KTableMapping(
            "t1",
            "table1",
            "db1",
            KField("user_id", "", KFieldValueType.STRING, ""),
            KSource(0, "db1", "table1", "user_id"),
            listOf(
                KFieldMapping(
                    KField("f1", "", KFieldValueType.STRING, ""), // srcField
                    KField("f1", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                    KField("cate_id", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                    KField("shop_id", "", KFieldValueType.LONG, "") // dstField
                ),
            )
        ))
    
        tableMappingMap["t2"] = listOf(KTableMapping(
            "t2",
            "table2",
            "db2",
            KField("user_id", "", KFieldValueType.STRING, ""),
            KSource(0, "db2", "table2", "user_id"),
            listOf(
                KFieldMapping(
                    KField("f2", "", KFieldValueType.STRING, ""), // srcField
                    KField("f2", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("f3", "", KFieldValueType.LONG, ""), // srcField
                    KField("f3", "", KFieldValueType.LONG, "") // dstField
                ),
                KFieldMapping(
                    KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                    KField("cate_id", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                    KField("shop_id", "", KFieldValueType.LONG, "") // dstField
                ),
            )
        ))
    
        tableMappingMap["t3"] = listOf(KTableMapping(
            "t3",
            "table3",
            "db3",
            KField("user_id", "", KFieldValueType.STRING, ""),
            KSource(0, "db3", "table3", "user_id"),
            listOf(
                KFieldMapping(
                    KField("f4", "", KFieldValueType.STRING, ""), // srcField
                    KField("f4", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("f5", "", KFieldValueType.LONG, ""), // srcField
                    KField("f5", "", KFieldValueType.LONG, "") // dstField
                ),
                KFieldMapping(
                    KField("f6", "", KFieldValueType.LONG, ""), // srcField
                    KField("f6", "", KFieldValueType.LONG, "") // dstField
                ),
                KFieldMapping(
                    KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                    KField("cate_id", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                    KField("shop_id", "", KFieldValueType.LONG, "") // dstField
                ),
            )
        ))
    
        val WideTableMultiDimCHSQLParser = WideTableMultiDimCHSQLParser()
        val expr = WideTableMultiDimCHSQLParser.expr(requestDTO, tableMappingMap)
        val arrayLines = WideTableMultiDimCHSQLParser.arrayLines(requestDTO, tableMappingMap)
    
        println(expr)
        println(arrayLines)
    }
    
    
    
    
    
    
    
    
    
    /**
     * 宽表多维标签 HIVE SQL 解析器
     * @author chenguangjian.jk
     * @date 2022-03-09 02:28:48
     */
    @Service
    class WideTableMultiDimHiveSQLParser {
    
        val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
        @Resource
        lateinit var commonParseUtils: CommonParseUtils
        /**
         * 宽表多维标签预估 SQL
         */
        fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
            val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
            // Parse KunLunExpression
            return WIDE_TABLE_COUNT_SQL_TEMPLATE(
                expr = expr(requestDTO, tableMappingMap),
                arrayLines = arrayLines(requestDTO, tableMappingMap)
            )
        }
    
    
        /**
         * 宽表多维标签圈选 SQL
         */
        fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
            val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
           
            val csvFile = ""
            // Parse KunLunExpression
            return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
                expr = expr(requestDTO, tableMappingMap),
                arrayLines = arrayLines(requestDTO, tableMappingMap),
                csvFile = csvFile,
            )
        }
    
    
        fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
            val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
            val exprMap = tagIdxs.groupBy { it.kexprId }
            return genWhereClause(exprMap, requestDTO.expression)
        }
    
    
        private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String {
            val subExpression = kunLunExpression.subExpression
            if (CollectionUtils.isEmpty(subExpression)) { // 叶子节点
                return ""
            }
    
            val w = StringBuffer()
            val size = subExpression.size
            val logic = kunLunExpression.logic
    
            w.append("(")
    
            if (logic == LogicOperatorEnum.AND) {
                w.append("array_intersect(")
            } else if (logic == LogicOperatorEnum.OR) {
                w.append("array_union(")
            } else if (logic == LogicOperatorEnum.EXCEPT) {
                w.append("array_except(")
            } else {
                throw IllegalArgumentException("logic $logic not supported!")
            }
    
            var firstTagIdx: Int = 1
            subExpression.forEachIndexed { index, e ->
                // 最叶子节点
                if (isLeafNode(e)) {
                    val targetTagIdx = exprMap[e.tfId]?.get(0)
                    val tagIdx = targetTagIdx!!.index
    
                    // 计算差集使用
                    if (index == 0) {
                        firstTagIdx = tagIdx
                    }
    
                    if (index != size - 1) {
                        w.append("t[$tagIdx],")
                    } else {
                        w.append("t[$tagIdx]")
                    }
                }
                // 递归非叶子节点
                else {
                    w.append(genWhereClause(exprMap, e))
                }
            }
    
            w.append("))")
            return w.toString()
        }
    
    
        /**
         * 生成 arrayLines (最后一行没有: , 逗号)
        (select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
        (select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
        (select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
         */
        fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
            val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
            val size = tagIdxs.size
            val arrayLines = StringBuffer()
    
            tagIdxs.forEachIndexed { index, tagIdx ->
                if (index != size - 1) {
                    arrayLines.append("(${tagIdx.conditionExpr}), \n")
                } else {
                    arrayLines.append("(${tagIdx.conditionExpr})  \n")
                }
            }
            return arrayLines.toString()
        }
    
    
    
        /**
        select  size(t.res) as cnt
        from (
        select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
        array(
        (select collect_set(UserID) from hits_v1 where Sex = 1),
        (select collect_set(UserID) from hits_v1 where Age > 18),
        (select collect_set(UserID) from hits_v1 where RequestNum > 0)
        ) t
        ) t
         */
        private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr: String,
            arrayLines: String,
        ) = """
    select size(t.res) as cnt
    from (
        select $expr as res,
        array(
        $arrayLines
        ) t
    ) t
    """
    
    
        /**
        select explode(t.res) as ids
        from (
        select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
        array(
        (select collect_set(UserID) from hits_v1 where Sex = 1),
        (select collect_set(UserID) from hits_v1 where Age > 18),
        (select collect_set(UserID) from hits_v1 where RequestNum > 0)
        ) t
        ) t
         */
        private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr: String,
            arrayLines: String,
            csvFile: String,
        ) = """
    select explode(t.res) as ids
    from (
        select $expr as res,
        array(
        $arrayLines
        ) t
    ) t
    """
    
    
    
    
    
    
    }
    
    
    /**
    WideTableMultiDimCHSQLParser - tagIdxList=[{"conditionExpr":"select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )","index":1,"kexprId":-316732738,"tagCode":"t1","tagOptionCode":"f1"},{"conditionExpr":"select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )","index":2,"kexprId":-316653905,"tagCode":"t2","tagOptionCode":"f2"},{"conditionExpr":"select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )","index":3,"kexprId":-315132611,"tagCode":"t2","tagOptionCode":"f3"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )","index":4,"kexprId":127438862,"tagCode":"t3","tagOptionCode":"f4"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )","index":5,"kexprId":127439854,"tagCode":"t3","tagOptionCode":"f5"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   )","index":6,"kexprId":-316668196,"tagCode":"t3","tagOptionCode":"f6"}]
    (array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
    (select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
    (select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
    (select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
    (select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
    (select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
    (select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))
     */
    fun main() {
        val requestDTO = SQLQueryReqDTO()
        val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
        val expression = KunLunExpression()
        expression.logic = LogicOperatorEnum.EXCEPT
        val subExpressionList = arrayListOf<KunLunExpression>()
        val e1 = KunLunExpression()
        val e2 = KunLunExpression()
        val e3 = KunLunExpression()
    
        val dimList = listOf(
            FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
            FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
        )
    
        e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
        e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
        e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
        e3.logic = LogicOperatorEnum.AND
    
        val e3SubExpressionList = arrayListOf<KunLunExpression>()
        val e31 = KunLunExpression()
        val e32 = KunLunExpression()
        val e33 = KunLunExpression()
        e3SubExpressionList.add(e31)
        e3SubExpressionList.add(e32)
        e3SubExpressionList.add(e33)
        e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
        e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
        e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
        e3.subExpression = e3SubExpressionList
    
        subExpressionList.add(e1)
        subExpressionList.add(e2)
        subExpressionList.add(e3)
        expression.subExpression = subExpressionList
        requestDTO.expression = expression
    
        // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
        // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
        // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
        // KFieldMapping(KField srcField, KField dstField)
    
        tableMappingMap["t1"] = listOf(KTableMapping(
            "t1",
            "table1",
            "db1",
            KField("user_id", "", KFieldValueType.STRING, ""),
            KSource(0, "db1", "table1", "user_id"),
            listOf(
                KFieldMapping(
                    KField("f1", "", KFieldValueType.STRING, ""), // srcField
                    KField("f1", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                    KField("cate_id", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                    KField("shop_id", "", KFieldValueType.LONG, "") // dstField
                ),
            )
        ))
    
        tableMappingMap["t2"] = listOf(KTableMapping(
            "t2",
            "table2",
            "db2",
            KField("user_id", "", KFieldValueType.STRING, ""),
            KSource(0, "db2", "table2", "user_id"),
            listOf(
                KFieldMapping(
                    KField("f2", "", KFieldValueType.STRING, ""), // srcField
                    KField("f2", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("f3", "", KFieldValueType.LONG, ""), // srcField
                    KField("f3", "", KFieldValueType.LONG, "") // dstField
                ),
            )
        ))
    
        tableMappingMap["t3"] = listOf(KTableMapping(
            "t3",
            "table3",
            "db3",
            KField("user_id", "", KFieldValueType.STRING, ""),
            KSource(0, "db3", "table3", "user_id"),
            listOf(
                KFieldMapping(
                    KField("f4", "", KFieldValueType.STRING, ""), // srcField
                    KField("f4", "", KFieldValueType.STRING, "") // dstField
                ),
                KFieldMapping(
                    KField("f5", "", KFieldValueType.LONG, ""), // srcField
                    KField("f5", "", KFieldValueType.LONG, "") // dstField
                ),
                KFieldMapping(
                    KField("f6", "", KFieldValueType.LONG, ""), // srcField
                    KField("f6", "", KFieldValueType.LONG, "") // dstField
                ),
            )
        ))
    
        val WideTableMultiDimHiveSQLParser = WideTableMultiDimHiveSQLParser()
        val expr = WideTableMultiDimHiveSQLParser.expr(requestDTO, tableMappingMap)
        val arrayLines = WideTableMultiDimHiveSQLParser.arrayLines(requestDTO, tableMappingMap)
    
        println(expr)
        println(arrayLines)
    }
    

    相关文章

      网友评论

          本文标题:WideTableMultiDimSQLParser 解析说明:

          本文链接:https://www.haomeiwen.com/subject/qrutdrtx.html