elasticsearch 聚合统计
创建普通聚合器
aggregation := elastic.NewSamplerAggregation()
agg_sum := elastic.NewSumAggregation().Field("聚合求和字段")
aggregation.SubAggregation("agg_sum",agg_sum)
字段分组聚合
# 分组聚合
group_by_schId := elastic.NewTermsAggregation().Field("SchId")
group_by_schId下可接子聚合
时间聚合
//需要时间聚合分组的字段名称, 类型需要为date, 格式没有要求
aggField := "Time"
//时间分组间隔;默认5分钟平均值
aggInterval := "5m"//一天 1d,一个月 1M;5分钟 5m
// 返回值格式化,HH大写,不然不能区分上午、下午
aggFormat := "yyyy-MM-dd HH:mm:ss"
// 为空的话则填充0
aggMinDocCount := int64(0)
//设置时区, 这样就相当于东八区的时间
aggTimeZone := "+08:00"
aggs := elastic.NewDateHistogramAggregation().
Field(aggField).
Interval(aggInterval).
Format(aggFormat).
MinDocCount(aggMinDocCount).
TimeZone(aggTimeZone)
//.Offset("+6h")
//sub Aggregation
sumAggInbound := elastic.NewAvgAggregation().Field("Inbound")
aggs.SubAggregation("Inbound", sumAggInbound)
聚合统计并且显示统计的原始数据
//按SchId分组统计
group_by_schId := elastic.NewTermsAggregation().Field("SchId")
//每个组内显示第一条原始数据
topHis := elastic.NewTopHitsAggregation().Size(1)
aggs.SubAggregation("top_source", topHis)
//聚合结果解析
//基础数据
if topVal, ok := result["top_source"]; ok {
if topVal != nil {
//topMap := make(map[string]interface{}, 0)
var res *elastic.SearchResult
topValBytes, _ := json.Marshal(topVal)
_ = json.Unmarshal(topValBytes, &res)
if res != nil || res.Hits != nil {
hit := res.Hits.Hits[0]
var item = make(map[string]interface{})
jsonByte, marErr := hit.Source.MarshalJSON()
if marErr == nil {
unmarErr := json.Unmarshal(jsonByte, &item)
if unmarErr == nil && item != nil {
//TODO 根据字段提取值
//演示提取字段LinkId
if link, ok := item["LinkId"]; ok {
if link != nil {
listLinkModel.LinkId = int(link.(float64))
}
}
}
}
}
}
}
//es 分组聚合统计显;组内当size==1时;此方法可解析数据
func TPFunAggsHitSourceUnmarshal(source interface{}) (sourceMap map[string]interface{}, err error) {
if source != nil {
//topMap := make(map[string]interface{}, 0)
var res *elastic.SearchResult
topValBytes, _ := json.Marshal(source)
_ = json.Unmarshal(topValBytes, &res)
if res != nil && res.Hits != nil && len(res.Hits.Hits)>0 {
hit := res.Hits.Hits[0]
jsonByte, marErr := hit.Source.MarshalJSON()
if marErr == nil {
err = json.Unmarshal(jsonByte, &sourceMap)
} else {
err = marErr
}
}
}
return sourceMap, err
}
网友评论