美文网首页
20.ES的API

20.ES的API

作者: 不怕天黑_0819 | 来源:发表于2021-05-24 19:08 被阅读0次

本文集主要是总结自己在项目中使用ES 的经验教训,包括各种实战和调优。

具体可以查看云笔记上的内容。


本文包括SearchRequestBuilder 常用方法说明、更新文档、新增文档、删除文档、删除索引库、批处理、分组、聚合函数、多级嵌套搜索、获取集群集群状态、Multi Get、批量修改数据、判断某一个字段是否存在

注:版本原因可能导致部分方法有所变化,但原理是不变的


SearchRequestBuilder 常用方法说明:

(1) setIndices(String... indices):上文中描述过,参数可为一个或多个字符串,表示要进行检索的index;
(2) setTypes(String... types):参数可为一个或多个字符串,表示要进行检索的type,当参数为0个或者不调用此方法时,表示查询所有的type; setSearchType(SearchType searchType):执行检索的类别,值为org.elasticsearch.action.search.SearchType的元素,SearchType是一个枚举类型的类,

QUERY_AND_FETCH:

主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。

这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。

最原始(也可能是最快的)实现就是简单的在所有相关的shard上执行检索并返回结果。每个shard返回一定尺寸的结果。由于每个shard已经返回了一定尺寸的hit,这种类型实际上是返回多个shard的一定尺寸的结果给调用者。

QUERY_THEN_FETCH:

主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。

这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式。

查询是针对所有的块执行的,但返回的是足够的信息,而不是文档内容(Document)。结果会被排序和分级,基于此,只有相关的块的文档对象会被返回。由于被取到的仅仅是这些,故而返回的hit的大小正好等于指定的size。这对于有许多块的index来说是很便利的(返回结果不会有重复的,因为 块被分组了)。

DFS_QUERY_AND_FETCH和DFS_QUERY_THEN_FETCH:

这两种方式和前面两种的区别在于将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种问题都解决但是效率是最差的。

特点:

一个交互两次,一个交互一次;一个统一打分规则一个不统一;一个分片返回详细数据一个分片返回id。
其值如下所示:
QUERY_THEN_FETCH:查询是针对所有的块执行的,但返回的是足够的信息,而不是文档内容(Document)。结果会被排序和分级,基于此,只有相关的块的文档对象会被返回。由于被取到的仅仅是这些,故而返回的hit的大小正好等于指定的size。这对于有许多块的index来说是很便利的(返回结果不会有重复的,因为块被分组了)
QUERY_AND_FETCH:最原始(也可能是最快的)实现就是简单的在所有相关的shard上执行检索并返回结果。每个shard返回一定尺寸的结果。由于每个shard已经返回了一定尺寸的hit,这种类型实际上是返回多个shard的一定尺寸的结果给调用者。
DFS_QUERY_THEN_FETCH:与QUERY_THEN_FETCH相同,预期一个初始的散射相伴用来为更准确的score计算分配了的term频率。
DFS_QUERY_AND_FETCH:与QUERY_AND_FETCH相同,预期一个初始的散射相伴用来为更准确的score计算分配了的term频率。
SCAN:在执行了没有进行任何排序的检索时执行浏览。此时将会自动的开始滚动结果集。
COUNT:只计算结果的数量,也会执行facet。
(4) setSearchType(String searchType),与setSearchType(SearchType searchType)类似,区别在于其值为字符串型的SearchType,值可为dfs_query_then_fetch、dfsQueryThenFetch、dfs_query_and_fetch、dfsQueryAndFetch、query_then_fetch、queryThenFetch、query_and_fetch或queryAndFetch;
(5) setScroll(Scroll scroll)、setScroll(TimeValue keepAlive)和setScroll(String keepAlive),设置滚动,参数为Scroll时,直接用new Scroll(TimeValue)构造一个Scroll,为TimeValue或String时需要将TimeValue和String转化为Scroll;
(6) setTimeout(TimeValue timeout)和setTimeout(String timeout),设置搜索的超时时间;
(7) setQuery,设置查询使用的Query;
(8) setFilter,设置过滤器;
(9) setMinScore,设置Score的最小数量;
(10) setFrom,从哪一个Score开始查;
(11) setSize,需要查询出多少条结果;


更新文档:

 public void testUpdate() throws IOException  
    {  
        XContentBuilder source = XContentFactory.jsonBuilder()  
            .startObject()  
            .field("name", "will")  
            .endObject();  
          
        UpdateResponse updateResponse = transportClient  
                .prepareUpdate(index, type, "6").setDoc(source).get();  
          
        System.out.println(updateResponse.getVersion());  
    } 

新增文档:通过prepareIndex新增,参数可以为json、map、javaBean、XContentBuilder四种。

新增文档的四种方式,可以参考文集的其他文章。


删除文档:

 public void testDelete()  
    {  
        String id = "9";  
        DeleteResponse deleteResponse = transportClient.prepareDelete(index,  
                type, id).get();  
     }

删除索引库:不可逆,慎用

 public void testDeleteeIndex()     {  
      transportClient.admin().indices().prepareDelete("shb01","shb02").get();  
    }  

批处理:可以将多种request加到bulk中。

public void testBulk() throws IOException  
    {  
        //1:生成bulk  
        BulkRequestBuilder bulk = transportClient.prepareBulk();  
          
        //2:新增  
        IndexRequest add = new IndexRequest(index, type, "10");  
        add.source(XContentFactory.jsonBuilder()  
                    .startObject()  
                    .field("name", "Henrry").field("age", 30)  
                    .endObject());  
          
        //3:删除  
        DeleteRequest del = new DeleteRequest(index, type, "1");  
          
        //4:修改  
        XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("name", "jack_1").field("age", 19).endObject();  
        UpdateRequest update = new UpdateRequest(index, type, "2");  
        update.doc(source);  
          
        bulk.add(del);  
        bulk.add(add);  
        bulk.add(update);  
        //5:执行批处理  
        BulkResponse bulkResponse = bulk.get();  
        if(bulkResponse.hasFailures())  
        {  
            BulkItemResponse[] items = bulkResponse.getItems();  
            for(BulkItemResponse item : items)  
            {  
                System.out.println(item.getFailureMessage());  
            }  
        }  
        else  
        {  
            System.out.println("全部执行成功");  
        }  
    }  

分组、统计:

public void testGroupBy()
{
SearchResponse searchResponse = transportClient.prepareSearch(index).setTypes(type)
.setQuery(QueryBuilders.matchAllQuery())
.setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(AggregationBuilders.terms("group_age")
.field("age").size(0))//根据age分组,默认返回10,size(0)也是10
.get();

    Terms terms = searchResponse.getAggregations().get("group_age");  
    List<Bucket> buckets = terms.getBuckets();  
    for(Bucket bt : buckets)  
    {  
        System.out.println(bt.getKey() + " " + bt.getDocCount());  
    }  
}  //这个方法写的没有层次。见下其他示例

分组、统计:

初始化Bulider:SearchRequestBuilder sbuilder = client.prepareSearch("player").setTypes("player");
group by/count:
例如要计算每个球队的球员数,如果使用SQL语句,应表达如下:
select team, count(*) as player_count from player group by team;
ES的java api:
TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");
sbuilder.addAggregation(teamAgg);
SearchResponse response = sbuilder.execute().actionGet();
group by 多个field:
例如要计算每个球队每个位置的球员数,如果使用SQL语句,应表达如下:
select team, position, count(*) as pos_count from player group by team, position;
ES的java api:
TermsBuilder teamAgg= AggregationBuilders.terms("player_count").field("team");
TermsBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
sbuilder.addAggregation(teamAgg.subAggregation(posAgg));
SearchResponse response = sbuilder.execute().actionGet();  

聚合函数:

专门讲聚合的示例:http://blog.csdn.net/xialei199023/article/details/48298635

https://www.jianshu.com/p/68e785acf8fe

http://www.cnblogs.com/xing901022/p/4951603.html

/** 
     * 聚合函数,本例之编写了sum,其他的聚合函数也可以实现 
     *  
     */  
    @Test  
    public void testMethod()  
    {  
        SearchResponse searchResponse = transportClient.prepareSearch(index).setTypes(type)  
                .setQuery(QueryBuilders.matchAllQuery())  
                .setSearchType(SearchType.QUERY_THEN_FETCH)  
                .addAggregation(AggregationBuilders.terms("group_name").field("name")  
                        .subAggregation(AggregationBuilders.sum("sum_age").field("age")))  
                .get();  
          
        Terms terms = searchResponse.getAggregations().get("group_name");  
        List<Bucket> buckets = terms.getBuckets();  
        for(Bucket bt : buckets)  
        {  
            Sum sum = bt.getAggregations().get("sum_age");  
            System.out.println(bt.getKey() + "  " + bt.getDocCount() + " "+ sum.getValue());  
        }  
          
    }  

聚合函数:

max/min/sum/avg

例如要计算每个球队年龄最大/最小/总/平均的球员年龄,如果使用SQL语句,应表达如下:

select team, max(age) as max_age from player group by team;

ES的java api:

TermsBuilder teamAgg= AggregationBuilders.terms("player_count").field("team"); MaxBuilder ageAgg= AggregationBuilders.max("max_age").field("age"); sbuilder.addAggregation(teamAgg.subAggregation(ageAgg)); SearchResponse response = sbuilder.execute().actionGet();

对多个field求max/min/sum/avg

例如要计算每个球队球员的平均年龄,同时又要计算总年薪,如果使用SQL语句,应表达如下:

select team, avg(age)as avg_age, sum(salary) as total_salary from player group by team;

ES的java api:

TermsBuilder teamAgg= AggregationBuilders.terms("team"); AvgBuilder ageAgg= AggregationBuilders.avg("avg_age").field("age"); SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary"); sbuilder.addAggregation(teamAgg.subAggregation(ageAgg).subAggregation(salaryAgg)); SearchResponse response = sbuilder.execute().actionGet();

聚合后对Aggregation结果排序

例如要计算每个球队总年薪,并按照总年薪倒序排列,如果使用SQL语句,应表达如下:

select team, sum(salary) as total_salary from player group by team order by total_salary desc;

ES的java api:

TermsBuilder teamAgg= AggregationBuilders.terms("team").order(Order.aggregation("total_salary ", false); SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary"); sbuilder.addAggregation(teamAgg.subAggregation(salaryAgg)); SearchResponse response = sbuilder.execute().actionGet();

需要特别注意的是,排序是在TermAggregation处执行的,Order.aggregation函数的第一个参数是aggregation的名字,第二个参数是boolean型,true表示正序,false表示倒序。 

Aggregation结果条数的问题

默认情况下,search执行后,仅返回10条聚合结果,如果想返回更多的结果,需要在构建TermsBuilder 时指定size:

TermsBuilder teamAgg= AggregationBuilders.terms("team").size(15);

Aggregation结果的解析/输出

得到response后:

Map<String, Aggregation> aggMap = response.getAggregations().asMap(); StringTerms teamAgg= (StringTerms) aggMap.get("keywordAgg"); Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator(); while (teamBucketIt .hasNext()) { Bucket buck = teamBucketIt .next(); //球队名 String team = buck.getKey(); //记录数 long count = buck.getDocCount(); //得到所有子聚合 Map subaggmap = buck.getAggregations().asMap(); //avg值获取方法 double avg_age= ((InternalAvg) subaggmap.get("avg_age")).getValue(); //sum值获取方法 double total_salary = ((InternalSum) subaggmap.get("total_salary")).getValue(); //... //max/min以此类推 }


总结:

综上,聚合操作主要是调用了SearchRequestBuilder的addAggregation方法,通常是传入一个TermsBuilder,子聚合调用TermsBuilder的subAggregation方法,可以添加的子聚合有TermsBuilder、SumBuilder、AvgBuilder、MaxBuilder、MinBuilder等常见的聚合操作。

从实现上来讲,SearchRequestBuilder在内部保持了一个私有的 SearchSourceBuilder实例, SearchSourceBuilder内部包含一个List<AbstractAggregationBuilder>,每次调用addAggregation时会调用 SearchSourceBuilder实例,添加一个AggregationBuilder。

同样的,TermsBuilder也在内部保持了一个List<AbstractAggregationBuilder>,调用addAggregation方法(来自父类addAggregation)时会添加一个AggregationBuilder。


多级嵌套搜索:

http://blog.csdn.net/napoay/article/details/52060659


获取集群集群状态:

public static void clusterHealthAdmin(@RequestParam(required = false) String title) {

ClusterHealthResponse healths = client.admin().cluster().prepareHealth().get();

String clusterName = healths.getClusterName();

int numberOfDataNodes = healths.getNumberOfDataNodes();

int numberOfNodes = healths.getNumberOfNodes();

for (ClusterIndexHealth health : healths.getIndices().values()) {

String index = health.getIndex();

int numberOfShards = health.getNumberOfShards();

int numberOfReplicas = health.getNumberOfReplicas();

ClusterHealthStatus status = health.getStatus();

System.out.println(index+"--------"+status);

}

}


Multi Get:

MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } }


批量修改数据:

private static String searchElasticsearchDataByScroll(String index, String type, int size, String routing, QueryBuilder query, String updateField, String updateValue) {

Map<String, Object> hitMap;

SearchResponse response = client.prepareSearch(index)

.setTypes(type)

.setQuery(query)

.setScroll(new TimeValue(6000))

.setSize(size).execute().actionGet();

BulkRequestBuilder bulkRequest = client.prepareBulk();

do {

for (SearchHit searchHit : response.getHits().getHits()) {

hitMap = searchHit.getSource();

if (hitMap.containsKey(updateValue)) {

hitMap.put(updateField, hitMap.get(updateValue));

} else {

try {

hitMap.put(updateField, Integer.parseInt(updateValue));//默认如果在hit中找不到要更新的值,则将updateValue当做int来进行更新

} catch (NumberFormatException e) {

log.error("can not solve updateValue。", e);

break;

}

}

bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setRouting(hitMap.get(routing).toString()).setDoc(hitMap));

}

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if (bulkResponse.hasFailures()) {

log.warn("bulk updateElasticsearch has fail, reason is" + bulkResponse.buildFailureMessage());

}

response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();

} while(response.getHits().getHits().length != 0)

return "";

}


判断某一个字段是否存在

public List<String> searchMediaInfoWithoutTopicId(String createTime) throws JsonProcessingException {

SearchRequestBuilder searchRequestBuilder = elasticSearchService.prepareSearch(SUBSCRIBE_WEMEDIA_INDEX, SUBSCRIBE_WEMEDIA_TYPE);

ExistsQueryBuilder existsQueryBuilder = QueryBuilders.existsQuery("topicId");

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().mustNot(existsQueryBuilder);查出来不包含该字段的数据 https://es.xiaoleilu.com/080_Structured_Search/30_existsmissing.html 还可以使用missing方法

if (createTime != null) {

RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("createTime").lt(createTime);

boolQueryBuilder.must(rangeQueryBuilder);

}

searchRequestBuilder.setQuery(boolQueryBuilder);

searchRequestBuilder.setFrom(0).setSize(1000).addSort("createTime", SortOrder.DESC);

SearchResult result = elasticSearchService.getSearchResult(searchRequestBuilder);

List<String> list = new ArrayList<String>();

for (Map<String, Object> element : result.getList()) {

if (element.containsKey("wemediaId")) {

list.add((String)element.get("wemediaId"));

}

}

return list;

}


参考链接:https://elasticsearch.cn/article/102

相关文章

网友评论

      本文标题:20.ES的API

      本文链接:https://www.haomeiwen.com/subject/coxorktx.html