美文网首页
ElasticsearchRepository 查询、聚合、过滤

ElasticsearchRepository 查询、聚合、过滤

作者: 圣瓦伦 | 来源:发表于2020-04-01 23:05 被阅读0次
public List<StatisticPageDTO> getStatisticList(StatisticQueryDTO queryDto) {
        log.info("getStatisticList param:{}", JSON.toJSONString(queryDto));
        PageModel<List<StatisticPageDTO>> model = new PageModel<>();

        NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        if(queryDto.getCompanyId() != null){
            boolQuery.must(QueryBuilders.termsQuery("companyId", queryDto.getCompanyId().toString()));
        }
        if(CollectionUtils.isNotEmpty(queryDto.getVodIds())){
            boolQuery.must(QueryBuilders.termsQuery("vodId", queryDto.getVodIds()));
        }
        if(CollectionUtils.isNotEmpty(queryDto.getUserIds())){
            boolQuery.must(QueryBuilders.termsQuery("userId", queryDto.getUserIds()));
        }
        boolQuery.must(QueryBuilders.termsQuery("isDelete", IsDeleteEnum.NOT.getCode().toString()));


        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("aggUserId").field("userId").size(EsConstant.MAX_PAGE_SIZE)
                .subAggregation(AggregationBuilders.min("minStartTime").field("startTime"))
                .subAggregation(AggregationBuilders.max("maxEndTime").field("endTime"))
                .subAggregation(AggregationBuilders.sum("sumViewDuration").field("viewDuration"))
                .subAggregation(AggregationBuilders.terms("aggCourseId").field("courseId"))
                .subAggregation(AggregationBuilders.terms("aggVodId").field("vodId")
                        .subAggregation(AggregationBuilders.sum("sum_vod_ViewDuration").field("viewDuration")));
        
        //聚合结果时间过滤
        Map<String,String> bucketMap = Maps.newHashMap();
        String code = "";
        boolean startTimeNotNull = queryDto.getBeginTime() != null && queryDto.getEndTime() != null;
        boolean endTimeNotNull = queryDto.getBeginLastTime() != null && queryDto.getEndLastTime() != null;
        if(startTimeNotNull){
            code += "params.min_start_time >= " + queryDto.getBeginTime().getTime() +"L";
            code += " && params.min_start_time <= " + queryDto.getEndTime().getTime() +"L";
            bucketMap.put("min_start_time", "minStartTime");
        }
        if(endTimeNotNull){
            if(startTimeNotNull){
                code += " && ";
            }
            code += " params.max_end_time >= " + queryDto.getBeginLastTime().getTime() +"L";
            code += " && params.max_end_time <= " + queryDto.getEndLastTime().getTime() +"L";
            bucketMap.put("max_end_time", "maxEndTime");
        }

        if(StringUtils.isNotEmpty(code)){
            Script script = new Script(code);
            BucketSelectorPipelineAggregationBuilder bs =
                    PipelineAggregatorBuilders.bucketSelector("time_filter", bucketMap, script);
            aggregationBuilder.subAggregation(bs);
        }
        //排序分页
        List<FieldSortBuilder> fieldSorts = new ArrayList<>();
        fieldSorts.add(new FieldSortBuilder("maxEndTime").order(SortOrder.DESC));
        aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketSort("bucket_field", fieldSorts)
                .from(queryDto.getPageNo()-1).size(queryDto.getPageSize()));

        //cardinality 度量是一个近似算法
        //precisionThreshold:保当字段唯一值在 10000 以内时会得到非常准确的结果
        CardinalityAggregationBuilder cardinality = AggregationBuilders
                .cardinality("total_count").field("userId")
                .precisionThreshold(EsConstant.MAX_PAGE_SIZE);


        //size无法设置为0,最少返回一条记录
        Pageable pageable = PageRequest.of(0, 1);

        SearchQuery query = queryBuilder
                .withQuery(boolQuery)
                .addAggregation(aggregationBuilder)
                .addAggregation(cardinality)
                .withPageable(pageable)
                .build();
        log.info("getStatisticList boolQuery:{}", boolQuery.toString());
        log.info("getStatisticList aggregationBuilder:{}", aggregationBuilder.toString());

        AggregatedPage aggPage =(AggregatedPage<VideoViewReocrdDocument>) vodViewRecordESRepository.search(query);
        //总页数
        Cardinality totalCountAgg = (Cardinality) aggPage.getAggregation("total_count");
        long total = totalCountAgg.getValue();
        model.setTotal(total);
        StringTerms agg = (StringTerms) aggPage.getAggregation("aggUserId");
        List<StringTerms.Bucket> buckets = agg.getBuckets();
        List<StatisticPageDTO> resultList = Lists.newArrayList();
        for (StringTerms.Bucket bucket : buckets) {
            StatisticPageDTO dto = new StatisticPageDTO();
            Long userId = bucket.getKeyAsNumber().longValue();
            dto.setUserId(userId);
            Aggregations aggregations = bucket.getAggregations();
            if(aggregations != null){
                //开始学习时间
                InternalMin minStartTime = aggregations.get("minStartTime");
                if(minStartTime != null){
                    Date startTime = DateUtil.parseEsDate(minStartTime.getValueAsString());
                    dto.setStartTime(DateUtil.formatDate(startTime));
                }
                //最近观看时间
                InternalMax maxEndTime = aggregations.get("maxEndTime");
                if(maxEndTime != null){
                    Date lastTime = DateUtil.parseEsDate(maxEndTime.getValueAsString());
                    dto.setLastTime(DateUtil.formatDate(lastTime));
                }
                //总观看时长
                InternalSum sumViewDuration = aggregations.get("sumViewDuration");
                if(sumViewDuration != null){
                    dto.setTotalDuration(Double.valueOf(sumViewDuration.getValue()).longValue());
                }

                //课程id集合
                StringTerms courseAgg = aggregations.get("aggCourseId");
                if(courseAgg != null){
                    List<StringTerms.Bucket> courseBuckets = courseAgg.getBuckets();
                    List<Long> courseIds = courseBuckets.stream().map(StringTerms.Bucket::getKeyAsNumber).map(Number::longValue).collect(Collectors.toList());
                    dto.setCourseIds(courseIds);
                }

                //视频对应观看时长
                StringTerms vodAgg = aggregations.get("aggVodId");
                if(vodAgg != null){
                    List<StringTerms.Bucket> vodBuckets = vodAgg.getBuckets();
                    Map<String, Integer> vodMap = Maps.newHashMap();
                    for (StringTerms.Bucket vodBucket : vodBuckets) {
                        String vodId = vodBucket.getKeyAsString();
                        Aggregations vodAggregations = vodBucket.getAggregations();
                        if(vodAggregations != null){
                            InternalSum sumVodViewDuration = vodAggregations.get("sum_vod_ViewDuration");
                            if(sumVodViewDuration != null){
                                vodMap.put(vodId, Double.valueOf(sumVodViewDuration.getValue()).intValue());
                            }
                        }
                    }
                    dto.setVodMap(vodMap);
                }
            }
            resultList.add(dto);
        }
        return resultList;
    }

参考文章:https://www.jianshu.com/p/930c803a4ebd

相关文章

网友评论

      本文标题:ElasticsearchRepository 查询、聚合、过滤

      本文链接:https://www.haomeiwen.com/subject/wtnzuhtx.html