美文网首页
StarRocks Elasticsearch Catalog原

StarRocks Elasticsearch Catalog原

作者: LittleMagic | 来源:发表于2024-09-25 20:34 被阅读0次

    前言

    Elasticsearch不仅是强大的全文搜索引擎,在很多场景下(特别是TiDB、ShardingSphere等框架成熟之前)也被当做分布式HTAP数据库使用,在存储、更新海量数据的同时,提供高效的点查和部分聚合查询能力。StarRocks从3.1版本开始支持Elasticsearch Catalog,极大方便了ES数据的联邦查询。本文简要分析其原理,并提出一个小问题和对应的临时解决方法。

    元数据获取阶段

    当用户创建一个ES Catalog时,本质是创建了ElasticsearchConnector和它对应的ElasticsearchMetadata,后者持有该Catalog的全部配置信息和访问ES集群的EsRestClient。这点和2.x版本中旧有的ES外表不同,每张ES外表都会对应一个EsRestClient,会导致目标ES集群的HTTP连接数比正常偏多,ES Catalog则基本不存在这个问题。

    每个ES Catalog只有一个默认数据库default_db,以下则是ES实例中的索引,在FE中称为EsTable,相当于复用了原ES外表的实现(当然ES Catalog会自动获取并推断字段,无需自己建表)。每个EsTable对象都持有一个EsMetaStateTracker用于同步元数据,其中又分为3个阶段(phase),按顺序分别为:

    • VersionPhase:通过GET /请求获取ES集群的版本号;
    • MappingPhase:通过GET /indexName/_mapping请求获取索引的Mapping信息,同时解析keyword类型字段(包括text内嵌的keyword)和存在doc_values的字段(即允许排序、聚合的字段),并存入上下文;
    • PartitionPhase:通过GET /indexName/_search_shards请求获取索引的分片信息,再通过GET /_nodes/http请求获取ES集群数据节点的地址,将分片ID和所在节点的映射关系存入EsShardPartitions容器。

    FE计划阶段

    ES Catalog查询对应的物理节点是EsScanNode,在生成Fragment的过程中除了维护Catalog的信息外,还会负责计算ScanRangeLocation,即每个BE节点负责请求的ES分片的对应关系,同时会尽量做colocate分配,使得BE节点和请求的ES分片所在节点是同一个(当然实际部署中这种情况不多见)。另外执行EXPLAIN语句时,会打印查询谓词翻译出来的ES DSL,如下所示。注意这个DSL只是示意作用,实际执行时BE会重新生成一次。

    MySQL [default_db]> EXPLAIN SELECT id,waybillCode,orderTime FROM realtimewaybillmonitor_202409 WHERE yn <= 0 AND orderTime >= hours_sub(now(), 1) AND waybillCode LIKE 'JDX%' AND length(sku) > 3 LIMIT 1000;
    +-------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | Explain String                                                                                                                                              |
    +-------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | PLAN FRAGMENT 0                                                                                                                                             |
    |  OUTPUT EXPRS:13: id | 130: waybillCode | 71: orderTime                                                                                                     |
    |   PARTITION: UNPARTITIONED                                                                                                                                  |
    |                                                                                                                                                             |
    |   RESULT SINK                                                                                                                                               |
    |                                                                                                                                                             |
    |   2:EXCHANGE                                                                                                                                                |
    |      limit: 1000                                                                                                                                            |
    |                                                                                                                                                             |
    | PLAN FRAGMENT 1                                                                                                                                             |
    |  OUTPUT EXPRS:                                                                                                                                              |
    |   PARTITION: RANDOM                                                                                                                                         |
    |                                                                                                                                                             |
    |   STREAM DATA SINK                                                                                                                                          |
    |     EXCHANGE ID: 02                                                                                                                                         |
    |     UNPARTITIONED                                                                                                                                           |
    |                                                                                                                                                             |
    |   1:Project                                                                                                                                                 |
    |   |  <slot 13> : 13: id                                                                                                                                     |
    |   |  <slot 71> : 71: orderTime                                                                                                                              |
    |   |  <slot 130> : 130: waybillCode                                                                                                                          |
    |   |  limit: 1000                                                                                                                                            |
    |   |                                                                                                                                                         |
    |   0:EsScanNode                                                                                                                                              |
    |      TABLE: realtimewaybillmonitor_202409                                                                                                                   |
    |      PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%', length(14: sku) > 3                                      |
    |      LOCAL_PREDICATES: length(14: sku) > 3                                                                                                                  |
    |      REMOTE_PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%'                                                    |
    |      ES_QUERY_DSL: {"bool":{"must":[{"range":{"yn":{"lte":0}}},{"range":{"orderTime":{"gte":"2024-09-26 15:46:17"}}},{"wildcard":{"waybillCode":"JDX*"}}]}} |
    |      ES index/type: realtimewaybillmonitor_202409/realtimewaybillmonitor                                                                                    |
    |      limit: 1000                                                                                                                                            |
    +-------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    可见上述查询的前三个谓词都可以下推到ES,但是第四个谓词无法下推,需要将结果拉取到SR端再进行过滤。

    BE执行阶段

    BE接收到前述EsScanNode后,将能够下推到ES的谓词封装为EsPredicate,分为几种情况:

    • 二元谓词,且一侧需为字面量,形如yn <= 0orderTime >= hours_sub(now(), 1)(右侧可以做常量折叠)都符合条件;
    • 函数调用谓词,支持esquery()(直接透传DSL的SR内置函数)、IS NULLIS NOT NULLLIKE,其他的均无法下推。即如果把上述示例的waybillCode LIKE 'JDX%'改成starts_with(waybillCode, 'JDX') = 1,这个条件就不能下推了;
    • INNOT IN谓词,对应terms query,简单直接;
    • 复合的AND谓词,实际上是对以上三种情况的组合做分解。

    下推到ES的谓词会从谓词列表中删除。接下来每个BE会分别创建ESScanReader以扫描ES数据,这里需要注意,如果不是所有谓词都下推到了ES(即谓词列表中还有剩余),那么为了保证结果准确,原始查询中的LIMIT子句也不能下推。

    上一节中的查询实际生成的DSL JSON如下所示。如果无法命中doc_values,则会改用source查询。

    {
        "query": {
            "bool": {
                "filter": [{
                    "bool": {
                        "should": [{
                            "range": {
                                "yn": {
                                    "lte": "0"
                                }
                            }
                        }]
                    }
                }, {
                    "bool": {
                        "should": [{
                            "range": {
                                "orderTime": {
                                    "gte": "1727336859000"
                                }
                            }
                        }]
                    }
                }, {
                    "bool": {
                        "should": [{
                            "wildcard": {
                                "waybillCode": "JDX*"
                            }
                        }]
                    }
                }]
            }
        },
        "stored_fields": "_none_",
        "docvalue_fields": ["waybillCode", "orderTime", "yn", "id", "sku"],
        "sort": ["_doc"],
        "size": 4096
    }
    

    正式执行查询时,又分为两种情况。

    • LIMIT子句下推到了ES,那么BE会认为这是一个"exactly-once"的查询(代码中如此),可以类比流式处理引擎中exactly-once的含义,即“只查询一次就可以了”。此时组装的搜索请求URL形如{target}/{index}/{type}/_search?terminate_after={limit}&preference=_shards:{shards}&{filter_path}
    • 若没有LIMIT子句下推到ES,则需要执行Scroll查询,分页获取结果。此时组装的搜索请求URL形如{target}/{index}/{type}/_search?scroll={keep_alive}&preference=_shards:{shards}&{filter_path}。Scroll上下文的TTL由BE参数es_scroll_keepalive设定,默认是5m

    接下来ESScanReader每次请求上述URL获取一批数据,调用超时由BE参数es_http_timeout_ms设定,默认是5000(即5秒),在网络环境欠佳时,应适当调大。获取到的数据经过JSON解析,获取到doc_values或者_source,逐行填充到Chunk中(没有值的则填充默认值)。这里实际上可以优化为按列填充,代码中也有相应的TODO标记。

    ES数组类型的问题

    ES没有显式的数组类型,当某字段插入了多个值时,它会自然地变为数组类型,但在索引Mapping中无法直接区分该字段是否为数组。在我们的历史ES集群中,有大量ES索引含有实际为数组的字段,使用SR ES Catalog查询时则会抛出异常或只返回第一个值,影响体验。这里提出一个不优雅的临时解决方案,在Catalog参数中增加array_fields配置项,让用户创建ES Catalog时手动指定数组字段。

    // Fields that should be treated as arrays when building Elasticsearch external table.                      
    // Since Elasticsearch makes no distinction between scalar and array types, we should manually specify them.
    // The format is: `field1,index2:field2...`                                                                 
    // which means `field1` in all indices and `field2` in `index2` are arrays.                                 
    @Config(key = KEY_ARRAY_FIELDS,                                                                             
            desc = "Fields that should be treated as arrays when building Elasticsearch external table. " +     
                    "The format is: `field1,index2:field2,...`.",                                               
            defaultValue = "")                                                                                  
    private String arrayFields;                                                                                 
    

    然后在ElasticsearchMetadata中获取并缓存每个索引中的数组字段名。

    private Map<String, Set<String>> indicesWithArrayFields;                                                     
                                                                                                                 
    public ElasticsearchMetadata(EsRestClient esRestClient, Map<String, String> properties, String catalogName) {
        this.esRestClient = esRestClient;                                                                        
        this.properties = properties;                                                                            
        this.catalogName = catalogName;                                                                          
                                                                                                                 
        this.indicesWithArrayFields = Arrays.stream(StringUtils.split(properties.get(KEY_ARRAY_FIELDS), ","))     
                .map(s -> StringUtils.split(s, ":"))                                                              
                .filter(kv -> kv.length <= 2)                                                                    
                .collect(                                                                                        
                        Collectors.toMap(                                                                        
                                kv -> kv.length == 2 ? kv[0] : "",                                               
                                kv -> new HashSet<>(Collections.singletonList(kv.length == 2 ? kv[1] : kv[0])),  
                                (v1, v2) -> {                                                                    
                                    v1.addAll(v2);                                                               
                                    return v1;                                                                   
                                }                                                                                
                        )                                                                                        
                );                                                                                               
    }                                                                                                            
    

    构建EsTable时,会调用EsUtil.convertColumnSchema()方法创建ES表的Schema,将对应索引的arrayFields参数传递给它,并将数组字段重新用ArrayType包装起来即可。

    public static List<Column> convertColumnSchema(EsRestClient client, String index, Set<String> arrayFields)
            throws AnalysisException {                                                                        
        List<Column> columns = new ArrayList<>();                                                             
        String mappings = client.getMapping(index);                                                           
        JSONObject properties = parseProperties(index, mappings);                                             
        if (null == properties) {                                                                             
            return columns;                                                                                   
        }                                                                                                     
        for (String columnName : properties.keySet()) {                                                       
            JSONObject columnAttr = (JSONObject) properties.get(columnName);                                  
            // default set json.                                                                              
            Type type = Type.JSON;                                                                            
            if (columnAttr.has("type")) {                                                                     
                type = convertType(columnAttr.get("type").toString());                                        
                if (arrayFields.contains(columnName)) {                                                       
                    type = new ArrayType(type);                                                               
                }                                                                                             
            }                                                                                                 
            Column column = new Column(columnName, type, true);                                               
            columns.add(column);                                                                              
        }                                                                                                     
        return columns;                                                                                       
    }                                                                                                         
    

    The End

    大家晚安。

    相关文章

      网友评论

          本文标题:StarRocks Elasticsearch Catalog原

          本文链接:https://www.haomeiwen.com/subject/qmsxljtx.html