public List<StatisticPageDTO> getStatisticList(StatisticQueryDTO queryDto) {
log.info("getStatisticList param:{}", JSON.toJSONString(queryDto));
PageModel<List<StatisticPageDTO>> model = new PageModel<>();
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if(queryDto.getCompanyId() != null){
boolQuery.must(QueryBuilders.termsQuery("companyId", queryDto.getCompanyId().toString()));
}
if(CollectionUtils.isNotEmpty(queryDto.getVodIds())){
boolQuery.must(QueryBuilders.termsQuery("vodId", queryDto.getVodIds()));
}
if(CollectionUtils.isNotEmpty(queryDto.getUserIds())){
boolQuery.must(QueryBuilders.termsQuery("userId", queryDto.getUserIds()));
}
boolQuery.must(QueryBuilders.termsQuery("isDelete", IsDeleteEnum.NOT.getCode().toString()));
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("aggUserId").field("userId").size(EsConstant.MAX_PAGE_SIZE)
.subAggregation(AggregationBuilders.min("minStartTime").field("startTime"))
.subAggregation(AggregationBuilders.max("maxEndTime").field("endTime"))
.subAggregation(AggregationBuilders.sum("sumViewDuration").field("viewDuration"))
.subAggregation(AggregationBuilders.terms("aggCourseId").field("courseId"))
.subAggregation(AggregationBuilders.terms("aggVodId").field("vodId")
.subAggregation(AggregationBuilders.sum("sum_vod_ViewDuration").field("viewDuration")));
//聚合结果时间过滤
Map<String,String> bucketMap = Maps.newHashMap();
String code = "";
boolean startTimeNotNull = queryDto.getBeginTime() != null && queryDto.getEndTime() != null;
boolean endTimeNotNull = queryDto.getBeginLastTime() != null && queryDto.getEndLastTime() != null;
if(startTimeNotNull){
code += "params.min_start_time >= " + queryDto.getBeginTime().getTime() +"L";
code += " && params.min_start_time <= " + queryDto.getEndTime().getTime() +"L";
bucketMap.put("min_start_time", "minStartTime");
}
if(endTimeNotNull){
if(startTimeNotNull){
code += " && ";
}
code += " params.max_end_time >= " + queryDto.getBeginLastTime().getTime() +"L";
code += " && params.max_end_time <= " + queryDto.getEndLastTime().getTime() +"L";
bucketMap.put("max_end_time", "maxEndTime");
}
if(StringUtils.isNotEmpty(code)){
Script script = new Script(code);
BucketSelectorPipelineAggregationBuilder bs =
PipelineAggregatorBuilders.bucketSelector("time_filter", bucketMap, script);
aggregationBuilder.subAggregation(bs);
}
//排序分页
List<FieldSortBuilder> fieldSorts = new ArrayList<>();
fieldSorts.add(new FieldSortBuilder("maxEndTime").order(SortOrder.DESC));
aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketSort("bucket_field", fieldSorts)
.from(queryDto.getPageNo()-1).size(queryDto.getPageSize()));
//cardinality 度量是一个近似算法
//precisionThreshold:保当字段唯一值在 10000 以内时会得到非常准确的结果
CardinalityAggregationBuilder cardinality = AggregationBuilders
.cardinality("total_count").field("userId")
.precisionThreshold(EsConstant.MAX_PAGE_SIZE);
//size无法设置为0,最少返回一条记录
Pageable pageable = PageRequest.of(0, 1);
SearchQuery query = queryBuilder
.withQuery(boolQuery)
.addAggregation(aggregationBuilder)
.addAggregation(cardinality)
.withPageable(pageable)
.build();
log.info("getStatisticList boolQuery:{}", boolQuery.toString());
log.info("getStatisticList aggregationBuilder:{}", aggregationBuilder.toString());
AggregatedPage aggPage =(AggregatedPage<VideoViewReocrdDocument>) vodViewRecordESRepository.search(query);
//总页数
Cardinality totalCountAgg = (Cardinality) aggPage.getAggregation("total_count");
long total = totalCountAgg.getValue();
model.setTotal(total);
StringTerms agg = (StringTerms) aggPage.getAggregation("aggUserId");
List<StringTerms.Bucket> buckets = agg.getBuckets();
List<StatisticPageDTO> resultList = Lists.newArrayList();
for (StringTerms.Bucket bucket : buckets) {
StatisticPageDTO dto = new StatisticPageDTO();
Long userId = bucket.getKeyAsNumber().longValue();
dto.setUserId(userId);
Aggregations aggregations = bucket.getAggregations();
if(aggregations != null){
//开始学习时间
InternalMin minStartTime = aggregations.get("minStartTime");
if(minStartTime != null){
Date startTime = DateUtil.parseEsDate(minStartTime.getValueAsString());
dto.setStartTime(DateUtil.formatDate(startTime));
}
//最近观看时间
InternalMax maxEndTime = aggregations.get("maxEndTime");
if(maxEndTime != null){
Date lastTime = DateUtil.parseEsDate(maxEndTime.getValueAsString());
dto.setLastTime(DateUtil.formatDate(lastTime));
}
//总观看时长
InternalSum sumViewDuration = aggregations.get("sumViewDuration");
if(sumViewDuration != null){
dto.setTotalDuration(Double.valueOf(sumViewDuration.getValue()).longValue());
}
//课程id集合
StringTerms courseAgg = aggregations.get("aggCourseId");
if(courseAgg != null){
List<StringTerms.Bucket> courseBuckets = courseAgg.getBuckets();
List<Long> courseIds = courseBuckets.stream().map(StringTerms.Bucket::getKeyAsNumber).map(Number::longValue).collect(Collectors.toList());
dto.setCourseIds(courseIds);
}
//视频对应观看时长
StringTerms vodAgg = aggregations.get("aggVodId");
if(vodAgg != null){
List<StringTerms.Bucket> vodBuckets = vodAgg.getBuckets();
Map<String, Integer> vodMap = Maps.newHashMap();
for (StringTerms.Bucket vodBucket : vodBuckets) {
String vodId = vodBucket.getKeyAsString();
Aggregations vodAggregations = vodBucket.getAggregations();
if(vodAggregations != null){
InternalSum sumVodViewDuration = vodAggregations.get("sum_vod_ViewDuration");
if(sumVodViewDuration != null){
vodMap.put(vodId, Double.valueOf(sumVodViewDuration.getValue()).intValue());
}
}
}
dto.setVodMap(vodMap);
}
}
resultList.add(dto);
}
return resultList;
}
参考文章:https://www.jianshu.com/p/930c803a4ebd
网友评论