例子中按trace_id分组,然后每个分组中按照log_time正序排列取第一条。
ES写法:
{
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "log_level:ERROR",
"fields": [],
"type": "best_fields",
"default_operator": "or",
"max_determinized_states": 10000,
"enable_position_increments": true,
"fuzziness": "AUTO",
"fuzzy_prefix_length": 0,
"fuzzy_max_expansions": 50,
"phrase_slop": 0,
"escape": false,
"auto_generate_synonyms_phrase_query": true,
"fuzzy_transpositions": true,
"boost": 1
}
},
{
"range": {
"log_time": {
"from": "2021-06-02 18:00:44.727",
"to": "2021-06-02 18:05:44.727",
"include_lower": true,
"include_upper": false,
"format": "yyyy-MM-dd HH:mm:ss.SSS",
"boost": 1
}
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
},
"aggs": {
"group_by_trace_id": {
"terms": {
"field": "trace_id",
"order": {
"top_hit": "asc"
}
},
"aggs": {
"min_trace": {
"min": {
"field": "log_time"
}
},
"top_test": {
"top_hits": {
"sort": {
"log_time": "asc"
},
"size":1
}
},
"top_hit": {
"min": {
"script": "_score"
}
}
}
}
}
}
Java写法:
MetricData elasticsearchMetric = new MetricData();
ElasticsearchInfo elasticsearchInfo = new ElasticsearchInfo(metricContract.getDataSourceContract());
EsRestClientContainer esRestClientContainer = elasticsearchSourceManager.findEsRestClientContainer(elasticsearchInfo);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
.must(new QueryStringQueryBuilder(metricContract.getQueryString()))
.must(QueryBuilders.rangeQuery(metricContract.getDataNameContract().getTimestampField())
.from(start.toDateTimeISO().toString(dateTimeFormatter))
.to(end.toDateTimeISO().toString(dateTimeFormatter))
.includeLower(true)
.includeUpper(false)
.format(dateTimeFormatter));
Map<String, String> dataNameProperties = metricContract.getDataNameContract().getSettings();
String indexPrefix = dataNameProperties.get("indexPrefix");
String datePattern = dataNameProperties.get("timePattern");
String[] indices = esRestClientContainer.buildIndices(start, end, indexPrefix, datePattern);
Long count = null;
try {
count = esRestClientContainer.totalCount(boolQueryBuilder, indices);
} catch (Exception ex) {
log.error("queryElasticsearchMetricValue 发生异常:", ex);
throw new RuntimeException("error when totalCount", ex);
}
if (metricContract.getAggregationType().equalsIgnoreCase(SymbolExpr.COUNT)) {
elasticsearchMetric.setMetricValue(count);
}
if (count == 0) {
elasticsearchMetric.setMetricValue(0);
return elasticsearchMetric;
}
SearchRequest searchRequest = new SearchRequest(indices);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.trackScores(false);
searchSourceBuilder.trackTotalHits(true);
searchSourceBuilder.query(boolQueryBuilder).from(0).size(10)
.sort(metricContract.getDataNameContract().getTimestampField(), SortOrder.DESC);
attachAggregation(metricContract, searchSourceBuilder);
// 聚合搜索
TermsAggregationBuilder termsBuilder = AggregationBuilders.terms("group_by_trace_id").field("trace_id");
MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("min_trace").field("log_time");
TopHitsAggregationBuilder topHitsAggregationBuilder = AggregationBuilders.topHits("top_detail").sort("log_time", SortOrder.ASC).size(1);
MinAggregationBuilder minAggregationBuilderTopHit = AggregationBuilders.min("top_hit").field("_score");
// TopHitsAggregationBuilder topHitsAggregationBuilder = AggregationBuilders.topHits("min_trace").("trace_id", SortOrder.ASC).sort("log_time", SortOrder.ASC).size(10);
termsBuilder.subAggregation(minAggregationBuilder);
termsBuilder.subAggregation(topHitsAggregationBuilder);
termsBuilder.subAggregation(minAggregationBuilderTopHit);
searchSourceBuilder.aggregation(termsBuilder);
// 执行查询
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = esRestClientContainer.fetchHighLevelClient().search(searchRequest, RequestOptions.DEFAULT);
ParsedStringTerms stringTerms = searchResponse.getAggregations().get("group_by_trace_id");
List<? extends Terms.Bucket> buckets = stringTerms.getBuckets();
if (metricContract.getAggregationType().equalsIgnoreCase(SymbolExpr.COUNT)) {
if (buckets.size() > 0) {
elasticsearchMetric.setMetricValue(buckets.size());
}
} else {
Double numericValue = findAggregationValue(metricContract, searchResponse);
elasticsearchMetric.setMetricValue(numericValue);
}
List<Map<String, Object>> latestDocumentList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
ParsedTopHits topDetail = bucket.getAggregations().get("top_detail");
SearchHit[] hits = topDetail.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, Object> latestDocument = hit.getSourceAsMap();
latestDocument.put("esDataId", latestDocument.get("id"));
latestDocumentList.add(latestDocument);
elasticsearchMetric.setLatestDocumentList(latestDocumentList);
}
}
return elasticsearchMetric;
网友评论