美文网首页
ES分组取每组第一条的ES写法和Java写法

ES分组取每组第一条的ES写法和Java写法

作者: 王月亮17 | 来源:发表于2022-09-09 09:37 被阅读0次

    例子中按trace_id分组,然后每个分组中按照log_time正序排列取第一条。
    ES写法:

    {
      "query": {
        "bool": {
          "must": [
            {
              "query_string": {
                "query": "log_level:ERROR",
                "fields": [],
                "type": "best_fields",
                "default_operator": "or",
                "max_determinized_states": 10000,
                "enable_position_increments": true,
                "fuzziness": "AUTO",
                "fuzzy_prefix_length": 0,
                "fuzzy_max_expansions": 50,
                "phrase_slop": 0,
                "escape": false,
                "auto_generate_synonyms_phrase_query": true,
                "fuzzy_transpositions": true,
                "boost": 1
              }
            },
            {
              "range": {
                "log_time": {
                  "from": "2021-06-02 18:00:44.727",
                  "to": "2021-06-02 18:05:44.727",
                  "include_lower": true,
                  "include_upper": false,
                  "format": "yyyy-MM-dd HH:mm:ss.SSS",
                  "boost": 1
                }
              }
            }
          ],
          "adjust_pure_negative": true,
          "boost": 1
        }
      },
      "aggs": {
        "group_by_trace_id": {
          "terms": {
            "field": "trace_id",
            "order": {
              "top_hit": "asc"
            }
          },
          "aggs": {
            "min_trace": {
              "min": {
                "field": "log_time"
              }
            },
            "top_test": {
              "top_hits": {
                "sort": {
                  "log_time": "asc"
                },
            "size":1
              }
            },
            "top_hit": {
              "min": {
                "script": "_score"
              }
            }
          }
        }
      }
    }
    

    Java写法:

    MetricData elasticsearchMetric = new MetricData();
            ElasticsearchInfo elasticsearchInfo = new ElasticsearchInfo(metricContract.getDataSourceContract());
            EsRestClientContainer esRestClientContainer = elasticsearchSourceManager.findEsRestClientContainer(elasticsearchInfo);
            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
                    .must(new QueryStringQueryBuilder(metricContract.getQueryString()))
                    .must(QueryBuilders.rangeQuery(metricContract.getDataNameContract().getTimestampField())
                            .from(start.toDateTimeISO().toString(dateTimeFormatter))
                            .to(end.toDateTimeISO().toString(dateTimeFormatter))
                            .includeLower(true)
                            .includeUpper(false)
                            .format(dateTimeFormatter));
            Map<String, String> dataNameProperties = metricContract.getDataNameContract().getSettings();
            String indexPrefix = dataNameProperties.get("indexPrefix");
            String datePattern = dataNameProperties.get("timePattern");
            String[] indices = esRestClientContainer.buildIndices(start, end, indexPrefix, datePattern);
            Long count = null;
            try {
                count = esRestClientContainer.totalCount(boolQueryBuilder, indices);
            } catch (Exception ex) {
                log.error("queryElasticsearchMetricValue 发生异常:", ex);
                throw new RuntimeException("error when totalCount", ex);
            }
            if (metricContract.getAggregationType().equalsIgnoreCase(SymbolExpr.COUNT)) {
                elasticsearchMetric.setMetricValue(count);
            }
            if (count == 0) {
                elasticsearchMetric.setMetricValue(0);
                return elasticsearchMetric;
            }
            SearchRequest searchRequest = new SearchRequest(indices);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.trackScores(false);
            searchSourceBuilder.trackTotalHits(true);
            searchSourceBuilder.query(boolQueryBuilder).from(0).size(10)
                    .sort(metricContract.getDataNameContract().getTimestampField(), SortOrder.DESC);
            attachAggregation(metricContract, searchSourceBuilder);
            // 聚合搜索
            TermsAggregationBuilder termsBuilder = AggregationBuilders.terms("group_by_trace_id").field("trace_id");
            MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("min_trace").field("log_time");
            TopHitsAggregationBuilder topHitsAggregationBuilder = AggregationBuilders.topHits("top_detail").sort("log_time", SortOrder.ASC).size(1);
            MinAggregationBuilder minAggregationBuilderTopHit = AggregationBuilders.min("top_hit").field("_score");
    //        TopHitsAggregationBuilder topHitsAggregationBuilder = AggregationBuilders.topHits("min_trace").("trace_id", SortOrder.ASC).sort("log_time", SortOrder.ASC).size(10);
            termsBuilder.subAggregation(minAggregationBuilder);
            termsBuilder.subAggregation(topHitsAggregationBuilder);
            termsBuilder.subAggregation(minAggregationBuilderTopHit);
            searchSourceBuilder.aggregation(termsBuilder);
            // 执行查询
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = esRestClientContainer.fetchHighLevelClient().search(searchRequest, RequestOptions.DEFAULT);
            ParsedStringTerms stringTerms = searchResponse.getAggregations().get("group_by_trace_id");
            List<? extends Terms.Bucket> buckets = stringTerms.getBuckets();
            if (metricContract.getAggregationType().equalsIgnoreCase(SymbolExpr.COUNT)) {
                if (buckets.size() > 0) {
                    elasticsearchMetric.setMetricValue(buckets.size());
                }
            } else {
                Double numericValue = findAggregationValue(metricContract, searchResponse);
                elasticsearchMetric.setMetricValue(numericValue);
            }
            List<Map<String, Object>> latestDocumentList = new ArrayList<>();
            for (Terms.Bucket bucket : buckets) {
                ParsedTopHits topDetail = bucket.getAggregations().get("top_detail");
                SearchHit[] hits = topDetail.getHits().getHits();
                for (SearchHit hit : hits) {
                    Map<String, Object> latestDocument = hit.getSourceAsMap();
                    latestDocument.put("esDataId", latestDocument.get("id"));
                    latestDocumentList.add(latestDocument);
                    elasticsearchMetric.setLatestDocumentList(latestDocumentList);
                }
            }
            return elasticsearchMetric;
    

    相关文章

      网友评论

          本文标题:ES分组取每组第一条的ES写法和Java写法

          本文链接:https://www.haomeiwen.com/subject/iflvbrtx.html