- 整体流程
public Map<String, Long> statOneField(String aggrField) {
long sBuildQuery = System.currentTimeMillis();
List<List<String>> stationGroups = splitGroup();
//多个查询请求
MultiSearchRequestBuilder requestBuilder = client.prepareMultiSearch();
for (List<String> stationGroup: stationGroups) {
SearchRequestBuilder srb = buildQuery(stationGroup, baseStationField, aggrField);
requestBuilder.add(srb);
}
long eBuildQuery = System.currentTimeMillis();
System.out.println("创建查询时间:" + (eBuildQuery - sBuildQuery));
//获取查询结果
MultiSearchResponse sr = requestBuilder.execute().actionGet();
long eExcuteQuery = System.currentTimeMillis();
System.out.println("执行查询时间:" + (eExcuteQuery - eBuildQuery));
Map<String, Long> totalAggr = new TreeMap<>();
for (MultiSearchResponse.Item item : sr.getResponses()) {
SearchResponse response = item.getResponse();
nbHits += response.getHits().getTotalHits();
nTook += response.getTook().getMillis();
Map<String, Long> perAggr = parseResponse(response);
add(totalAggr, perAggr);
}
long eAggrQuery = System.currentTimeMillis();
System.out.println("统计结果时间:" + (eAggrQuery - eExcuteQuery));
return totalAggr;
}
2.建立单个查询,每个查询中包含aggregation
SearchRequestBuilder buildQuery(List<String> stationGroup, String field, String aggrField) {
assert stationGroup != null && !stationGroup.isEmpty();
final int SEARCH_SIZE = 0;
SearchRequestBuilder srb = client
.prepareSearch().setQuery(QueryBuilders.termsQuery(field, stationGroup)).setSize(SEARCH_SIZE)
.addAggregation(AggregationBuilders.terms(TOP_NAME).field(aggrField).size(TelephoneConf.TOP_SIZE)
);
return srb;
}
3.处理aggregation结果
Map<String, Long> parseResponse(SearchResponse response) {
Map<String, Long> ir = new TreeMap<>();
Terms agg = response.getAggregations().get(TOP_NAME);
for (Terms.Bucket entry : agg.getBuckets()) {
Object key = entry.getKey();
long count = entry.getDocCount();
ir.put(key.toString(), count);
}
return ir;
}
4.map根据值排序
public static <K, V extends Comparable<? super V>> Map<K, V> sortMapByValue( Map<K, V> map, int topN )
{
List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>( map.entrySet() );
Collections.sort( list, new Comparator<Map.Entry<K, V>>()
{
public int compare( Map.Entry<K, V> o1, Map.Entry<K, V> o2 )
{
return (o2.getValue()).compareTo( o1.getValue() );
}
} );
int size = list.size() >= topN? topN:list.size();
Map<K, V> result = new LinkedHashMap<K, V>();
for (int i=0; i<size; i++){
Map.Entry<K, V> tmpEntry = list.get(i);
result.put(tmpEntry.getKey(), tmpEntry.getValue());
}
return result;
}
问题:
如果一次传递的search数量比较多,会出现EsRejectedExecutionException in elasticsearch for parallel search
SearchResponse response = item.getResponse();
if (response == null) {
System.out.println(item.getFailure().getCause());
System.out.println("null *************************");
System.out.println(item.getFailureMessage());
}
if (response.getHits() == null) {
System.out.println("null 2 *************************");
System.out.println(item.getFailureMessage());
}
出现异常之后,response会成为null指针,并且有的搜索请求的totalHits是不准确的(需要深入了解下原因)。
//为避免EsRejectedExecutionException,每100次请求,执行一次
List<String> stationList = new ArrayList<>();
MultiSearchRequestBuilder requestBuilder = client.prepareMultiSearch();
int searchCount = 1;
for (Set<String> stations: stationSets) {
for (String station: stations) {
SearchRequestBuilder srb = buildQuery(station, client);
stationList.add(station);
requestBuilder.add(srb);
if (searchCount % 100 == 0) {
MultiSearchResponse msr = requestBuilder.execute().actionGet();
handleResponse(msr, stationList);
stationList.clear();
requestBuilder = client.prepareMultiSearch();
}
searchCount ++;
}
}
if (!stationList.isEmpty()) {
MultiSearchResponse msr = requestBuilder.execute().actionGet();
handleResponse(msr, stationList);
stationList.clear();
}
网友评论