美文网首页ELK大数据学习
Elasticsearch:Java RestHighLevel

Elasticsearch:Java RestHighLevel

作者: xiaogp | 来源:发表于2022-03-06 18:53 被阅读0次

    摘要:ElasticsearchJava

    除了HTTP这种跨语言的访问方式之外,es支持Java,Python等多种语言的API,整理Java客户的操作es的相关代码

    • 写入操作:单文档写入,批量写入,单文档更新,批量更新,有则插入无则更新,条件更新插入
    • 读取操作:term过滤,range过滤,分页查询,游标查询,返回指定字段
    • 删除操作:删除单条文档,条件删除
    • 工具类代码: 单例模式

    依赖准备

    官方推荐使用高级客户端RestHighLevelClient,屏蔽底层专注于所有业务逻辑,依赖如下,本例采用6.7.2的es

        <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>6.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>6.7.2</version>
            </dependency>
    

    Java客户端连接和快速开始

    先看kibana中一条文档

        {
            "_index" : "hotel",
            "_type" : "_doc",
            "_id" : "001",
            "_score" : 1.0,
            "_source" : {
              "name" : "a宾馆",
              "city" : "上海",
              "price" : 3.13
            }
    

    Java客户端连接查询该文档测试代码如下

    import org.apache.http.HttpHost;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.action.search.SearchRequest;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.rest.RestStatus;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.SearchHits;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    
    
    import java.io.IOException;
    import java.util.Map;
    
    public class ElasticsearchTest {
        public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            SearchRequest searchRequest = new SearchRequest("hotel");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.query(QueryBuilders.matchQuery("city", "上海"));
            searchRequest.source(sourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            RestStatus restStatus = searchResponse.status();
            System.out.println(restStatus);
            if (restStatus == RestStatus.OK) {
                SearchHits searchHits = searchResponse.getHits();
                for (SearchHit searchHit: searchHits) {
                    System.out.println("id:" + searchHit.getId());
                    System.out.println("index:" + searchHit.getIndex());
                    System.out.println("score:" + searchHit.getScore());
                    Map<String, Object> map = searchHit.getSourceAsMap();
                    System.out.println("name:" + (String) map.get("name"));
                    System.out.println("city:" + (String) map.get("city"));
                    System.out.println("price:" + (Double) map.get("price"));
                }
            }
            restHighLevelClient.close();
        }
    }
    

    idea的终端输出如下

    OK
    id:001
    index:hotel
    score:0.2876821
    name:a宾馆
    city:上海
    price:3.13
    

    连接部分使用RestHighLevelClientRestClientHttpHost实现,通HTTP请求连接得到es客户端,再以此构建高阶客户端。搜索部分构建一个SearchRequest对象,其中SearchSourceBuilder关键搜索语句DSL,然后restHighLevelClient执行search操作得到返回SearchResponse,返回对象调用getHits得到SearchHits,遍历SearchHits即可拿到对应的文档的字段数据,嘴周关闭客户端连接


    写入操作

    (1)根据_id单条文档写入

    单挑文档写入需要创建IndexRequest对象,设置索引名称,类型名称,id名称,以及使用source传入文档字段的Map对象,在执行写入时使用客户端的index方法把IndexRequest传入即可

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            Map<String, Object> map = new HashMap<String, Object>() {{
               put("name", "b宾馆");
               put("city", "扬州");
               put("price", 5.12);
            }};
            IndexRequest indexRequest = new IndexRequest("hotel").type("_doc").id("002").source(map);
            IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
    
            restHighLevelClient.close();
        }
    
    (2)批量写入文档

    批量写入需要创建BulkRequest,多条数据每条创建一个IndexRequest对象设置index,type,id和数据Map,将这些IndexRequest对象条件到BulkRequest中,调用客户端的bulk方法执行即可

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            List<Map<String, Object>> list = new ArrayList<>();
            list.add(new HashMap<String, Object>() {{
                put("id", "003");
               put("name", "c宾馆");
               put("city", "绵竹");
               put("price", 7.54);
            }});
            list.add(new HashMap<String, Object>() {{
                put("id", "004");
                put("name", "d宾馆");
                put("city", "石家庄");
                put("price", 17.34);
            }});
            list.add(new HashMap<String, Object>() {{
                put("id", "005");
                put("name", "e宾馆");
                put("city", "梧州");
                put("price", 21.92);
            }});
    
            BulkRequest bulkRequest = new BulkRequest();
            list.forEach(s -> bulkRequest.add(new IndexRequest().index("hotel").type("_doc").id(s.get("id").toString()).source(s)));
            bulkRequest.timeout(TimeValue.timeValueSeconds(5));
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    
            restHighLevelClient.close();
        }
    

    (3)更新单条文档

    更新单条文档需要创建UpdateRequest对象,设置index,type,id和文档字段数据,调用客户端的update方法执行即可

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            UpdateRequest updateRequest = new UpdateRequest("hotel", "_doc", "001")
                    .doc(new HashMap<String, Object>() {{
                        put("name", "h宾馆");
                        put("price", 0.0);
                    }});
            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            System.out.println(RequestOptions.DEFAULT);
            restHighLevelClient.close();
        }
    

    对于有则更新无则插入的情况,UpdateRequest在设置doc之后再设置以下upsert即可,其他一样

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            UpdateRequest updateRequest = new UpdateRequest("hotel", "_doc", "008");
            updateRequest.doc(new HashMap<String, Object>() {{
                put("name", "k宾馆");
                put("price", 3.0);
            }});
            updateRequest.upsert(new HashMap<String, Object>() {{
                put("name", "k宾馆");
                put("price", 3.0);
            }});
            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            System.out.println(RequestOptions.DEFAULT);
            restHighLevelClient.close();
        }
    

    (4)批量更新文档

    创建BulkRequest对象,填装UpdateRequest,调用客户端的bulk即可

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            BulkRequest bulkRequest = new BulkRequest();
    
            List<Map<String, Object>> list = new ArrayList<>();
            list.add(new HashMap<String, Object>() {{
                put("id", "003");
                put("name", "o宾馆");
                put("city", "绵竹");
                put("price", 7.54);
            }});
            list.add(new HashMap<String, Object>() {{
                put("id", "004");
                put("name", "oo宾馆");
                put("city", "石家庄");
                put("price", 17.34);
            }});
            list.add(new HashMap<String, Object>() {{
                put("id", "011");
                put("name", "e宾馆");
                put("city", "梧州");
                put("price", 21.92);
            }});
            list.forEach(s -> bulkRequest.add(new UpdateRequest("hotel", "_doc", s.get("id").toString())
            .doc(s).upsert(s)));
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    
            restHighLevelClient.close();
        }
    

    (5)根据条件批量更新文档

    创建UpdateByQueryRequest对象,分别设置setQuerysetScript,分别代表条件和更新语句,最后客户端调用updateByQuery更新

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
    
            UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest("hotel");
            updateByQueryRequest.setQuery(new TermQueryBuilder("city", "苏州"));
            updateByQueryRequest.setScript(new Script("ctx._source['city']='杭州'"));
            restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
    
            restHighLevelClient.close();
        }
    

    搜索操作

    (1)term精确搜索

    根据QueryBuilders.termQuery("_id", "001")来找到指定id的文档

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            SearchRequest searchRequest = new SearchRequest("hotel");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.termQuery("_id", "001"));
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            if (searchResponse.status() == RestStatus.OK) {
                SearchHits searchHits = searchResponse.getHits();
                System.out.println(searchHits.getAt(0).getSourceAsMap().get("city"));
            }
    
            restHighLevelClient.close();
        }
    

    (2)range范围搜索

    通过QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0)来设置range查询的DSL

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            SearchRequest searchRequest = new SearchRequest("hotel");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0));
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            if (searchResponse.status() == RestStatus.OK) {
                SearchHits searchHits = searchResponse.getHits();
                for (SearchHit searchHit: searchHits) {
                    System.out.println(searchHit.getSourceAsMap());
                }
            }
    
            restHighLevelClient.close();
        }
    

    输出结果如下

    {city=梧州, price=21.92, name=e宾馆, id=005}
    {city=石家庄, price=17.34, name=oo宾馆, id=004}
    {city=梧州, price=21.92, name=e宾馆, id=011}
    

    (3)分页查询

    searchSourceBuilder中设置fromsize参数即可

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            SearchRequest searchRequest = new SearchRequest("hotel");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().from(1).size(1).query(QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0));
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            if (searchResponse.status() == RestStatus.OK) {
                SearchHits searchHits = searchResponse.getHits();
                for (SearchHit searchHit: searchHits) {
                    System.out.println(searchHit.getSourceAsMap());
                }
            }
            restHighLevelClient.close();
        }
    

    查看运行结果,相比于不加from,size,输出了从索引第1个开始后总计1个的结果,就是第二条

    {city=石家庄, price=17.34, name=oo宾馆, id=004}
    

    (4)游标查询

    这种常用于根据筛选条件之后抽取全部数据的场景,scroll API 可以被用来检索大量的结果, 甚至所有的结果 ,注意es的游标查询的是当下时刻的数据快照,即在游标查询之后的数据的变动不会影响游标查询的结果,默认游标查询根据_doc字段进行排序,示例代码如下

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            SearchRequest searchRequest = new SearchRequest("hotel");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("price").gte(5.0).lte(100.0));
            searchSourceBuilder.size(2);
            searchRequest.source(searchSourceBuilder);
            Scroll scroll = new Scroll(timeValueMillis(1L));
            searchRequest.scroll(scroll);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            String scrollId = searchResponse.getScrollId();
            SearchHit[] hits = searchResponse.getHits().getHits();
            List<SearchHit> resultSearchHit = new ArrayList<>();
            while (hits != null && hits.length > 0) {
                System.out.println(hits.length);
                System.out.println(scrollId);
                resultSearchHit.addAll(Arrays.asList(hits));
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                scrollId = searchScrollResponse.getScrollId();
                hits = searchScrollResponse.getHits().getHits();
            }
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(scrollId);
            restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
            restHighLevelClient.close();
    //        System.out.println(resultSearchHit);
        }
    

    在搜索条件之后使用searchSourceBuilder.size(2)设置了每次游标只抽取2条数据,设置每次游标的超时时间是1毫秒timeValueMillis,可以适当调高超时时间防止由于超时还没查完导致游标提前结束。在执行游标的时候,第一次使用了客户端的search方法,从第二次开始使用scroll方法,每开始下一次游标的时候都通过查看本次游标的结果是否为空searchResponse.getHits().getHits()来判断是否还要继续,把每次游标的返回结果收集起来拿到全部数据。代码最后是释放游标资源,观察以下打印的结果

    2
    DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACkkFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApIRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVBAAAAAAAAKSIWX001eFZqdFNRZE9fM2ZUbUF5S2FlQQAAAAAAACkjFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApJRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVB
    2
    DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACkkFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApIRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVBAAAAAAAAKSIWX001eFZqdFNRZE9fM2ZUbUF5S2FlQQAAAAAAACkjFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApJRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVB
    1
    DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACkkFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApIRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVBAAAAAAAAKSIWX001eFZqdFNRZE9fM2ZUbUF5S2FlQQAAAAAAACkjFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApJRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVB
    

    看到一共查询了3轮,每轮2条数据,最后一轮1条,一共5条符合筛选条件的数据,scrollId在每次执行游标之后返回的都是相同的,es返回的下一个游标id可能是一样的也可能不一样,虽然是一样的但是每次游标取到的数据是不一样的


    (5)返回指定字段

    设置fetchSource参数,传入两个数组,前一个是包含的字段,后一个是排除的字段

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            SearchRequest searchRequest = new SearchRequest("hotel");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                    .query(QueryBuilders.termQuery("_id", "001"))
                    .fetchSource(new String[] {"city"}, new String[]{});
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            if (searchResponse.status() == RestStatus.OK) {
                SearchHits searchHits = searchResponse.getHits();
                System.out.println(searchHits.getAt(0));
            }
    
            restHighLevelClient.close();
        }
    
    (6)排序

    排序在SearchSourceBuilder对象后构建sort参数,通过SortOrder.DESC倒序列和SortOrder.ASC升序

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            SearchRequest searchRequest = new SearchRequest("hotel");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().sort("price", SortOrder.DESC);
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            if (searchResponse.status() == RestStatus.OK) {
                SearchHits searchHits = searchResponse.getHits();
                for (SearchHit searchHit : searchHits) {
                    System.out.println(searchHit);
                }
            }
    
            restHighLevelClient.close();
        }
    

    删除操作

    (1)删除单条文档

    删除单条文档使用DeleteRequest对象,传入index,type,doc_id,客户端调用delete方法即可

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            DeleteRequest deleteByQueryRequest = new DeleteRequest("hotel", "_doc", "005");
            restHighLevelClient.delete(deleteByQueryRequest, RequestOptions.DEFAULT);
    
            restHighLevelClient.close();
        }
    
    (2)批量删除文档

    批量删除使用BulkRequest对象,将DeleteRequest传入BulkRequest中,最后调用客户端的bulk提交即可

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.add(new DeleteRequest("hotel", "_doc", "004"));
            bulkRequest.add(new DeleteRequest("hotel", "_doc", "003"));
            restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    
            restHighLevelClient.close();
        }
    
    (3)根据条件删除文档

    使用deleteByQueryRequest对象,设置setQuery参数为条件,客户端调用deleteByQuery即可

    public static void main(String[] args) throws IOException {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));
    
            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest("hotel");
            deleteByQueryRequest.setQuery(new BoolQueryBuilder().mustNot(new TermQueryBuilder("city", "梧州")));
            restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
    
            restHighLevelClient.close();
        }
    

    代码示例

    构建单例模式创建一个es工具类

    import org.apache.http.HttpHost;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.search.SearchRequest;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.rest.RestStatus;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class ESUtils {
        private RestHighLevelClient client;
        private String index;
        private static ESUtils instance;
    
        public ESUtils() {
            HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
            client = new RestHighLevelClient(RestClient.builder(httpHosts));
            index = "hotel";
        }
    
        public static ESUtils getInstance() {
            if (instance == null) {
                synchronized (ESUtils.class) {
                    if (instance == null) {
                        instance = new ESUtils();
                    }
                }
            }
            return instance;
        }
    
        /**
         * 根据id获取文档
         * @param id
         * @return
         */
        public Map<String, Object> getDocByID(String id) {
            Map<String, Object> map = new HashMap<>();
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.termQuery("_id", id));
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    map = searchResponse.getHits().getAt(0).getSourceAsMap();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return map;
        }
    
        /**
         * 批量upsert
         * @param data
         */
        public void bulkUpsert(List<Map<String, Object>> data) {
            BulkRequest bulkRequest = new BulkRequest();
            for (Map<String, Object> map : data) {
                String id = map.get("id").toString();
                map.remove("id");
                bulkRequest.add(new UpdateRequest(index, "_doc", id).doc(map).upsert(map));
            }
            try {
                BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
                if (bulkResponse.hasFailures()) {
                    System.out.println("bulk失败:" + bulkResponse.buildFailureMessage());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * terms和range查询返回指定字段
         * @return
         */
        public List<String> getQuery() {
            List<String> list = new ArrayList<>();
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
            .query(QueryBuilders.termsQuery("jingjiang", "hangzhou", "zhangzhou"))
            .query(QueryBuilders.rangeQuery("price").gte(3.12).lte(3.13))
            .fetchSource(new String[] {"name"}, new String[] {});
            searchRequest.source(searchSourceBuilder);
            try {
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                if (searchResponse.status() == RestStatus.OK) {
                    searchResponse.getHits().forEach(s -> list.add(s.getSourceAsMap().get("name").toString()));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return list;
        }
    
        public void close() {
            if (client != null) {
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public static void main(String[] args) {
            Map<String, Object> map = ESUtils.getInstance().getDocByID("011");
            System.out.println(map);
            List<Map<String, Object>> list = new ArrayList<>();
            list.add(new HashMap<String, Object>() {{
                put("id", "001");
                put("degree", "low");
            }});
            list.add(new HashMap<String, Object>() {{
                put("id", "002");
                put("degree", "median");
            }});
            list.add(new HashMap<String, Object>() {{
                put("id", "003");
                put("degree", "high");
            }});
            List<String> list1 = ESUtils.getInstance().getQuery();
            System.out.println(list1);
            ESUtils.getInstance().bulkUpsert(list);
            ESUtils.getInstance().close();
        }
    }
    

    相关文章

      网友评论

        本文标题:Elasticsearch:Java RestHighLevel

        本文链接:https://www.haomeiwen.com/subject/xsnbrrtx.html