美文网首页ELK大数据学习
Elasticsearch:ES聚合操作,基于Kibana和Ja

Elasticsearch:ES聚合操作,基于Kibana和Ja

作者: xiaogp | 来源:发表于2022-04-09 15:31 被阅读0次

    摘要:ElasticsearchJava

    本文为《Elasticsearch搜索引擎构建入门与实战》第七章内容的笔记

    内容摘要

    • 计算指定文档的字段统计量(sum/mean/max/count)
    • 空值填充统计
    • 分组统计(groupBy)
    • 聚合结果排序(order)
    • 聚合结果分组取topN(窗口函数)
    • 聚合结果的后过滤(having)

    计算指定文档的字段统计量

    (1)sum,max,min,value_count,stats

    es的聚合使用aggs关键字,是DSL下的顶级,以sum为例,对指定文档计算统计量的DSL为

    GET  /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "sum": {
                    "field": "model_1" 
                }
            }
        }
    }
    

    返回如下

     "hits" : {
        "total" : 13,
        "max_score" : 0.0,
        "hits" : [ ]
      },
      "aggregations" : {
        "my_agg" : {
          "value" : 2.240000009536743
        }
      }
    

    以上语法指定的GET的_search请求方式,然后指定了size:0设置返回中没有文档信息,然后命名了一个my_agg作为返回聚合值的字段名,然后采用sun的聚合方式给到my_agg的value属性,注意返回的sum值是小数位有偏差的,原始的加数都是保留两位小数的,其他指标avgmaxmin同理,对于count使用value_count

    GET  /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "value_count": {
                    "field": "model_1" 
                }
            }
        }
    }
    

    返回如下

      "aggregations" : {
        "my_agg" : {
          "value" : 4
        }
      }
    

    在es的聚合语句中指定stats参数就可以一起返回预设置的统计量

    GET  /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "stats": {
                    "field": "model_1" 
                }
            }
        }
    }
    

    返回如下

      "aggregations" : {
        "my_agg" : {
          "count" : 4,
          "min" : 0.0,
          "max" : 0.8299999833106995,
          "avg" : 0.5600000023841858,
          "sum" : 2.240000009536743
        }
      }
    

    (2)和query一起使用

    和query一起使用可以对指定范围的文档做聚合统计

      GET  /hotel/_doc/_search
    {
        "query": {
            "term": {
                "city": "HK"
            }
        },
        "size": 0,
        "aggs": {
            "my_agg": {
                "sum": {
                    "field": "model_1" 
                }
            }
        }
    }
    

    以上结果只会对query的输出求聚合


    (3)Java客户端的使用

    使用query之后再求Sum的代码如下,在query的同级构建aggregation,具体通过AggregationBuilders设置聚合方法和字段,在返回时先使用getAggregations拿到聚合对象,然后拿到聚合指标对象,最后拿到聚合值,也就是从aggregations -> my_agg -> value的三级

    public Double getHotelSum() {
            double res = 0.0;
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .query(QueryBuilders.termQuery("city", "HK"))
                    .aggregation(AggregationBuilders.sum("my_agg").field("model_1"));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    if (searchResponse.getHits().getTotalHits() != 0) {
                        Aggregations aggregations = searchResponse.getAggregations();
                        Sum sum = aggregations.get("my_agg");
                        res = sum.getValue();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return res;
        }
    

    注意如果query到的文档为空,或者聚合的field全为空,则聚合指标值可能存在初始化错误,比如Max返回-inf,Min返回inf,Sum返回0,avg返回inf,因此加了一层命中文档数searchResponse.getHits().getTotalHits()做判断,再看value_count的例子

    public long getModel1Count() {
            long res = 0;
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .aggregation(AggregationBuilders.count("my_agg").field("model_1"));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    Aggregations aggregations = searchResponse.getAggregations();
                    ValueCount count = aggregations.get("my_agg");
                    res = count.getValue();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return res;
        }
    

    空值填充统计

    空值在计算统计值的时候是略过的,比如在求均值时不会将控制的个数作为分母,es在统计的时候可以指定缺失值填充的策略,例如将所有model_1字段为空的在统计时当做0

    GET  /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "avg": {
                    "field": "model_1"  
                }
            }
        }
    }
    

    输出

      "aggregations" : {
        "my_agg" : {
          "value" : 1.9825000315904617
        }
    

    再加入填充逻辑

    GET  /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "avg": {
                    "field": "model_1" ,
                    "missing": 0
                }
            }
        }
    }
    

    输出的均值降低,因为加入了填充0的影响

      "aggregations" : {
        "my_agg" : {
          "value" : 0.49562500789761543
        }
      }
    

    添加填充的逻辑在Java中实现如下,在AggregationBuilders对象中增加missing属性即可

    public Double getHotelAvg() {
            double res = 0.0;
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .aggregation(AggregationBuilders.avg("my_agg").field("model_1").missing(0.0));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    if (searchResponse.getHits().getTotalHits() != 0) {
                        Aggregations aggregations = searchResponse.getAggregations();
                        Avg avg = aggregations.get("my_agg");
                        res = avg.getValue();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return res;
        }
    

    分组统计groupBy

    除了直接统计聚合值,es支持分组统计,就是结构化数据的groupBy,以及更复杂的交叉表统计Pivot
    分组字段一定是一个离散字段,对于keyword型自然支持,对于数值型(range)需要先分箱再作为分组列

    a.使用terms设置分组字段

    直接使用terms设置字段,在命名的自定义字段的下一级,但是不指定聚合方式的化,会返回分组统计count

    GET /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "terms": {
                    "field": "city"
                }
            }
        }
    }
    

    输出如下,同样空值是不被统计的,返回值在doc_count字段下

        "doc_count_error_upper_bound" : 0,
          "sum_other_doc_count" : 0,
          "buckets" : [
            {
              "key" : "HK",
              "doc_count" : 2
            },
            {
              "key" : "SZ",
              "doc_count" : 1
            }
          ]
    

    注意es返回的doc_count是近似值,并不一定准确,因此es给出了doc_count_error_upper_boundsum_other_doc_count,分别表示可能被遗漏的文档数量做大值,除了返回的文档外剩下的文档总数,再看一下boolean类型的字段分组统计


    b.使用ranges设置分组字段

    基于数值字段分箱之后获得每个组,使用fromto指定分箱的起点和终点,注意是包含起始点,不包含终点,左闭右开

    GET /hotel/_doc/_search
    {
      "size": 0,
      "aggs": {
          "my_agg": {
            "range": {
            "field": "degree3",
            "ranges": [
              {
                "from": 1, "to":3
              },
              {
                "from": 3
              }
              ]
          }
        }
      }
    }
    

    返回如下,确实是左闭右开,自动命名了key

      "aggregations" : {
        "my_agg" : {
          "buckets" : [
            {
              "key" : "1.0-3.0",
              "from" : 1.0,
              "to" : 3.0,
              "doc_count" : 2
            },
            {
              "key" : "3.0-*",
              "from" : 3.0,
              "doc_count" : 3
            }
          ]
        }
      }
    

    也可以自定义分组的key名

    GET /hotel/_doc/_search
    {
      "size": 0,
      "aggs": {
          "my_agg": {
            "range": {
            "field": "degree3",
            "ranges": [
              {
                "from": 1, "to":3, "key": "low_degree"
              },
              {
                "from": 3,"key": "high_degree"
              }
            ]
          }
        }
      }
    }
    

    返回带有自命名的key

      "aggregations" : {
        "my_agg" : {
          "buckets" : [
            {
              "key" : "low_degree",
              "from" : 1.0,
              "to" : 3.0,
              "doc_count" : 2
            },
            {
              "key" : "high_degree",
              "from" : 3.0,
              "doc_count" : 3
            }
          ]
        }
    

    c.设置分组聚合指标

    以上默认都是以doc_count聚合获取计数,还可以结合其他统计指标,具体实在分组的同级(terms,range),再写一个aggs,设置聚合的字段和聚合方式,以及空值处理方式等等。以均值为例DSL如下

    GET /hotel/_doc/_search
    {
      "size": 0,
      "aggs": {
        "my_agg": {
          "range": {
          "field": "degree3",
          "ranges": [
            {
              "from": 1, "to":3, "key": "low_degree"
            },
            {
              "from": 3,"key": "high_degree"
              }
             ]
           },
           "aggs": {
             "my_avg": {
               "avg": {
                 "field": "price"
               }
             }
        }
      }
    }
    }
    

    以上是对degree做groupBy,以price求均值聚合,看一下返回

      "aggregations" : {
        "my_agg" : {
          "buckets" : [
            {
              "key" : "low_degree",
              "from" : 1.0,
              "to" : 3.0,
              "doc_count" : 2,
              "my_avg" : {
                "value" : 1514.0
              }
            },
            {
              "key" : "high_degree",
              "from" : 3.0,
              "doc_count" : 3,
              "my_avg" : {
                "value" : 1486990.6666666667
              }
            }
          ]
        }
    

    d.Java客户端groupBy分组统计代码

    分别整理terms,range分组,以及带有聚合指标的代码

    public void getTermsBucket() {
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .aggregation(AggregationBuilders.terms("my_agg").field("degree3"));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    if (searchResponse.getHits().getTotalHits() != 0) {
                        Aggregations aggregations = searchResponse.getAggregations();
                        Terms terms = aggregations.get("my_agg");
                        for (Terms.Bucket bucket : terms.getBuckets()) {
                            String bucketKey = bucket.getKeyAsString();
                            long docCount = bucket.getDocCount();
                            System.out.println(bucketKey + ":" + docCount);
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    输出如下

    3:2
    1:1
    2:1
    4:1
    

    在range分组的代码中使用addRangeRangeAggregator.Range来设置分组起始和结束,三元素分别是key名,起始,结束,没有就是null,输入的必须是double

    public void getRangeBucket() {
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .aggregation(AggregationBuilders.range("my_agg").field("degree3")
                    .addRange(new RangeAggregator.Range("low_degree", null, 3d))
                    .addRange(new RangeAggregator.Range("high_degree", 3d, null)));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    if (searchResponse.getHits().getTotalHits() != 0) {
                        Aggregations aggregations = searchResponse.getAggregations();
                        Range range = aggregations.get("my_agg");
                        for (Range.Bucket bucket : range.getBuckets()) {
                            String bucketKey = bucket.getKeyAsString();
                            long docCount = bucket.getDocCount();
                            System.out.println(bucketKey + ":" + docCount);
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    输出如下

    low_degree:2
    high_degree:3
    

    在分组并且指定聚合指标时,需要在分组之后使用subAggregation创建子聚合逻辑,在其中指定聚合名称好聚合字段,聚合方式,在返回结果时,使用getAggregations对bucket拿到值

    public void getRangeBucket() {
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .aggregation(AggregationBuilders.range("my_agg").field("degree3")
                    .addRange(new RangeAggregator.Range("low_degree", null, 3d))
                    .addRange(new RangeAggregator.Range("high_degree", 3d, null))
                    .subAggregation(AggregationBuilders.avg("my_avg").field("price")));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    if (searchResponse.getHits().getTotalHits() != 0) {
                        Aggregations aggregations = searchResponse.getAggregations();
                        Range range = aggregations.get("my_agg");
                        for (Range.Bucket bucket : range.getBuckets()) {
                            String bucketKey = bucket.getKeyAsString();
                            Avg avg = bucket.getAggregations().get("my_avg");
                            double value = avg.getValue();
                            System.out.println(bucketKey + ":" + value);
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    返回结果如下

    low_degree:1514.0
    high_degree:1486990.6666666667
    

    聚合结果截取和排序(order by)

    可以指定聚合计算之后,根据聚合的key,或者结果的value进行排序,默认根据doc_count的大小进行降序排序,使用_count排序达到同样的效果,写在分组方式terms内

    GET /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "terms": {
                    "field": "city",
                    "order":{
                      "_count": "desc"
                    }
                }
            }
        }
    }
    

    也可以根据_key进行排序,根据分组的key值排序

    GET /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "terms": {
                    "field": "city",
                    "order":{
                      "_key": "desc"
                    }
                }
            }
        }
    }
    

    最要解决的还是对value进行排序,在排序的时候指定自定义的聚合字段名即可

    GET /hotel/_doc/_search
    {
        "size": 0,
        "aggs": {
            "my_agg": {
                "terms": {
                    "field": "city",
                    "order":{
                      "my_avg": "desc"
                    }
                },
                "aggs": {
                "my_avg": {
               "avg": {
                 "field": "price"
                 }
               }
              }
            }
        }
    }
    

    Java代码示例,在子聚合中加入order属性,设置自定义字段以倒序排列BucketOrder.aggregation("my_avg", false)

    public void getTermsBucket() {
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .aggregation(AggregationBuilders.terms("my_agg").field("degree3")
                            .subAggregation(AggregationBuilders.avg("my_avg").field("price"))
                            .order(BucketOrder.aggregation("my_avg", false)));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    if (searchResponse.getHits().getTotalHits() != 0) {
                        Aggregations aggregations = searchResponse.getAggregations();
                        Terms terms = aggregations.get("my_agg");
                        for (Terms.Bucket bucket : terms.getBuckets()) {
                            String bucketKey = bucket.getKeyAsString();
                            Avg avg = bucket.getAggregations().get("my_avg");
                            double value = avg.getValue();
                            System.out.println(bucketKey + ":" + value);
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    聚合结果分组取topN(窗口函数)

    能实现类似窗口函数的功能,比如分组求top1,在例子中想拿到每个实体的最新更新日期的那一条数据详情,先插入几条数据

    PUT /stock
    POST /stock/_doc/_mapping
    {
        "properties": {
            "security_code": {"type": "keyword"},
            "stock_price": {"type": "double"},
            "date":  {"type": "date",  "format": "yyyy-MM-dd"}
        }
    }
    
    POST /_bulk
    { "index":  { "_index": "stock", "_type": "_doc"}}
    {"security_code":"300124","stock_price":3.14,"date":"2021-01-01"}
    { "index":  { "_index": "stock", "_type": "_doc"}}
    {"security_code":"300124","stock_price":9.14,"date":"2021-01-02"}
    { "index":  { "_index": "stock", "_type": "_doc"}}
    {"security_code":"300124","stock_price":4.14,"date":"2021-01-03"}
    { "index":  { "_index": "stock", "_type": "_doc"}}
    {"security_code":"002334","stock_price":2.97,"date":"2021-01-02"}
    { "index":  { "_index": "stock", "_type": "_doc"}}
    {"security_code":"002334","stock_price":3.54,"date":"2021-01-03"}
    { "index":  { "_index": "stock", "_type": "_doc"}}
    {"security_code":"002334","stock_price":7.84,"date":"2021-01-04"}
    { "index":  { "_index": "stock", "_type": "_doc"}}
    {"security_code":"300198","stock_price":9.26,"date":"2021-01-02"}
    { "index":  { "_index": "stock", "_type": "_doc"}}
    {"security_code":"300198","stock_price":3.14,"date":"2021-01-01"}
    

    下面查询最新的一条股价详情,在子聚合中指定top_hitssort排序条件和返回条数size,每个分组返回top1

    GET /stock/_doc/_search
    {
     "size": 0,
    "aggs": {
        "type": {
          "terms": {
            "field": "security_code"
          },"aggs": {
            "latest_price": {
              "top_hits": {
                "sort": [{
                  "date": {"order": "desc"}
                }], 
                "size": 1
              }
            }
          }
        }
      }
    } 
    

    查看输出

       "buckets" : [
            {
              "key" : "002334",
              "doc_count" : 3,
              "latest_price" : {
                "hits" : {
                  "total" : 3,
                  "max_score" : null,
                  "hits" : [
                    {
                      "_index" : "stock",
                      "_type" : "_doc",
                      "_id" : "8-d9HIABDkVv6XsnfJUi",
                      "_score" : null,
                      "_source" : {
                        "security_code" : "002334",
                        "stock_price" : 7.84,
                        "date" : "2021-01-04"
                      },
                      "sort" : [
                        1609718400000
                      ]
                    }
                  ]
                }
              }
            },
            {
              "key" : "300124",
              "doc_count" : 3,
              "latest_price" : {
                "hits" : {
                  "total" : 3,
                  "max_score" : null,
                  "hits" : [
                    {
                      "_index" : "stock",
                      "_type" : "_doc",
                      "_id" : "8Od9HIABDkVv6XsnfJUi",
                      "_score" : null,
                      "_source" : {
                        "security_code" : "300124",
                        "stock_price" : 4.14,
                        "date" : "2021-01-03"
                      },
                      "sort" : [
                        1609632000000
                      ]
                    }
                  ]
                }
              }
            },
            {
              "key" : "300198",
              "doc_count" : 2,
              "latest_price" : {
                "hits" : {
                  "total" : 2,
                  "max_score" : null,
                  "hits" : [
                    {
                      "_index" : "stock",
                      "_type" : "_doc",
                      "_id" : "9Od9HIABDkVv6XsnfJUi",
                      "_score" : null,
                      "_source" : {
                        "security_code" : "300198",
                        "stock_price" : 9.26,
                        "date" : "2021-01-02"
                      },
                      "sort" : [
                        1609545600000
                      ]
                    }
                  ]
                }
              }
            }
          ]
    

    Java客户端的实现

    public void getLatestStockPrice() {
            SearchRequest searchRequest = new SearchRequest("stock");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .aggregation(AggregationBuilders.terms("my_agg").field("security_code")
                            .subAggregation(AggregationBuilders.topHits("latest_price").sort("date").size(1)));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    if (searchResponse.getHits().getTotalHits() != 0) {
                        Aggregations aggregations = searchResponse.getAggregations();
                        Terms terms = aggregations.get("my_agg");
                        for (Terms.Bucket bucket : terms.getBuckets()) {
                            String bucketKey = bucket.getKeyAsString();
                            TopHits topHits = bucket.getAggregations().get("latest_price");
                            topHits.getHits().forEach(s -> System.out.println(s.getSourceAsMap()));
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    打印输出如下

    {date=2021-01-02, security_code=002334, stock_price=2.97}
    {date=2021-01-01, security_code=300124, stock_price=3.14}
    {date=2021-01-01, security_code=300198, stock_price=3.14}
    

    相关文章

      网友评论

        本文标题:Elasticsearch:ES聚合操作,基于Kibana和Ja

        本文链接:https://www.haomeiwen.com/subject/jhgejrtx.html