美文网首页elasticsearch
使用multi-search进行统计分析(java)

使用multi-search进行统计分析(java)

作者: 愚公300代 | 来源:发表于2016-09-30 17:18 被阅读1117次
  1. 整体流程
    public Map<String, Long> statOneField(String aggrField) {
        long sBuildQuery = System.currentTimeMillis();
        List<List<String>> stationGroups = splitGroup();
        //多个查询请求
        MultiSearchRequestBuilder requestBuilder = client.prepareMultiSearch();
        for (List<String> stationGroup: stationGroups) {
            SearchRequestBuilder srb = buildQuery(stationGroup, baseStationField, aggrField);
            requestBuilder.add(srb);
        }
        long eBuildQuery = System.currentTimeMillis();
        System.out.println("创建查询时间:" + (eBuildQuery - sBuildQuery));

        //获取查询结果
        MultiSearchResponse sr = requestBuilder.execute().actionGet();
        long eExcuteQuery = System.currentTimeMillis();
        System.out.println("执行查询时间:" + (eExcuteQuery - eBuildQuery));

        Map<String, Long> totalAggr = new TreeMap<>();
        for (MultiSearchResponse.Item item : sr.getResponses()) {
            SearchResponse response = item.getResponse();
            nbHits += response.getHits().getTotalHits();
            nTook += response.getTook().getMillis();
            Map<String, Long> perAggr = parseResponse(response);
            add(totalAggr, perAggr);
        }
        long eAggrQuery = System.currentTimeMillis();
        System.out.println("统计结果时间:" + (eAggrQuery - eExcuteQuery));

        return totalAggr;
    }

2.建立单个查询,每个查询中包含aggregation

    SearchRequestBuilder buildQuery(List<String> stationGroup, String field, String aggrField) {
        assert stationGroup != null && !stationGroup.isEmpty();

        final int SEARCH_SIZE = 0;
        SearchRequestBuilder srb = client
            .prepareSearch().setQuery(QueryBuilders.termsQuery(field, stationGroup)).setSize(SEARCH_SIZE)
            .addAggregation(AggregationBuilders.terms(TOP_NAME).field(aggrField).size(TelephoneConf.TOP_SIZE)
            );

        return srb;
    }

3.处理aggregation结果

    Map<String, Long> parseResponse(SearchResponse response) {
        Map<String, Long> ir = new TreeMap<>();
        Terms agg = response.getAggregations().get(TOP_NAME);
        for (Terms.Bucket entry : agg.getBuckets()) {
            Object key =  entry.getKey();
            long count = entry.getDocCount();
            ir.put(key.toString(), count);
        }

        return ir;
    }

4.map根据值排序

    public static <K, V extends Comparable<? super V>> Map<K, V> sortMapByValue( Map<K, V> map, int topN )
    {
        List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>( map.entrySet() );
        Collections.sort( list, new Comparator<Map.Entry<K, V>>()
        {
            public int compare( Map.Entry<K, V> o1, Map.Entry<K, V> o2 )
            {
                return (o2.getValue()).compareTo( o1.getValue() );
            }
        } );

        int size = list.size() >= topN? topN:list.size();

        Map<K, V> result = new LinkedHashMap<K, V>();
        for (int i=0; i<size; i++){
            Map.Entry<K, V> tmpEntry = list.get(i);
            result.put(tmpEntry.getKey(), tmpEntry.getValue());
        }

        return result;
    }

问题:
如果一次传递的search数量比较多,会出现EsRejectedExecutionException in elasticsearch for parallel search

           SearchResponse response = item.getResponse();
            if (response == null) {
                System.out.println(item.getFailure().getCause());
                System.out.println("null *************************");
                System.out.println(item.getFailureMessage());
            }

            if (response.getHits() == null) {
                System.out.println("null 2 *************************");
                System.out.println(item.getFailureMessage());
            }

出现异常之后,response会成为null指针,并且有的搜索请求的totalHits是不准确的(需要深入了解下原因)。

        //为避免EsRejectedExecutionException,每100次请求,执行一次
        List<String> stationList = new ArrayList<>();
        MultiSearchRequestBuilder requestBuilder = client.prepareMultiSearch();
        int searchCount = 1;
        for (Set<String> stations: stationSets) {
            for (String station: stations) {
                SearchRequestBuilder srb = buildQuery(station, client);
                stationList.add(station);
                requestBuilder.add(srb);
                if (searchCount % 100 == 0) {
                    MultiSearchResponse msr = requestBuilder.execute().actionGet();
                    handleResponse(msr, stationList);
                    stationList.clear();
                    requestBuilder = client.prepareMultiSearch();
                }
                searchCount ++;
            }
        }

        if (!stationList.isEmpty()) {
            MultiSearchResponse msr = requestBuilder.execute().actionGet();
            handleResponse(msr, stationList);
            stationList.clear();
        }

相关文章

网友评论

    本文标题:使用multi-search进行统计分析(java)

    本文链接:https://www.haomeiwen.com/subject/vrfryttx.html