美文网首页
ElasticSearch常用的增删查改操作

ElasticSearch常用的增删查改操作

作者: Damon_e889 | 来源:发表于2019-10-17 21:05 被阅读0次

使用Java对ElasticSearch增删查改操作,分为两个步骤:
1.拼接sql语句
2.执行增删查改操作
以下提供了一些常用的轮子。

sql拼接

1.最普通的sql拼接

    /**
     * Get query DSL
     * @param queryString
     * @return Query DSL
     */
    public String getQueryDSL(String queryString) {

        String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } "
                + " } ";

        return dsl;
    }

2.查询并根据字段排序

    /**
     * Get query DSL with sort
     * @param filter
     * @param sortByASCOrDesc : key:sort field, value: asc or desc
     * @return query DSL with sort
     */
    public String getQueryDSL(String queryString, Map<String, String> sortByASCOrDesc) {

        String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } ";

        StringBuilder builder = new StringBuilder(dsl);
 
        builder.append(", \"sort\": [");

        int index = 0;

        for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {

            if (index == 0) {
                builder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
            } else {
                builder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
            }

            index++;
        }

        builder.append("]");
         
        builder.append(" } ");

        return builder.toString();
    }

3.查询区间并排序

    /**
     * Get query DSL with range
     * <p>queryString</p>
     * <p>rangeMap: Key = range field, value = key = gte or gt or lte or lt, value = value </p>
     * <p> sortByASCOrDesc: key:sort field, value: asc or desc. input null if no need sort </p>
     * @return query DSL with range
     */
    public String getRangeQueryDSL(String queryString, Map<String, Map<String, String>> rangeMap, Map<String, String> sortByASCOrDesc) {

        StringBuilder strBuilder = new StringBuilder();

        strBuilder.append("{").append("\"query\": {")
        .append("\"bool\": {").append("\"must\": [").append("{")
                .append("\"query_string\": {").append(" \"query\":\"" + queryString + "\" ").append("}").append("},");

        int jj = 0;

        for (Map.Entry<String, Map<String, String>> pair : rangeMap.entrySet()) {

            String field = pair.getKey();

            Map<String, String> range = pair.getValue();

            if (jj == 0) {

                strBuilder.append("{").append("\"range\": {").append(" \"" + field + "\": { ");

                int ii = 0;

                for (Map.Entry<String, String> map : range.entrySet()) {

                    if (ii == 0 && range.size() > 1) {
                        strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");
                    } else {
                        strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");
                    }

                    ii++;

                }
                strBuilder.append("}").append("}").append("}");
                
            } else {

                strBuilder.append(",{").append("\"range\": {").append(" \"" + field + "\": { ");

                int ii = 0;

                for (Map.Entry<String, String> map : range.entrySet()) {

                    if (ii == 0 && range.size() > 1) {
                        strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");
                    } else {
                        strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");
                    }

                    ii++;

                }
                strBuilder.append("}").append("}").append("}");

            }
            
            jj++;
            
        }
        
        strBuilder.append("]").append("}").append("}");

        if(sortByASCOrDesc !=null){
            
            strBuilder.append(", \"sort\": [");
    
            jj = 0;

            for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {
    
                if (jj == 0) {
                    strBuilder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
                } else {
                    strBuilder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
                }
    
                jj++;
            }
    
             strBuilder.append("]");
             
            }
        
        strBuilder.append("}");
        
        return strBuilder.toString();

    }

4.查所有

    /**
     * @return
     */
    public String getQueryAllDSL() {

        String dsl = "{ " + " \"query\": { " + " \"match_all\": {} " + " } " + " } ";

        return dsl;
    }

5.根据字段分组

    private String getAggsQueryDsl(String queryString, String strAggs) {
        return 
        "{" + 
          "\"query\": {" + 
             "\"bool\": {" + 
                "\"must\": [" + 
                   "{" + 
                      "\"query_string\": {" + 
                         "\"default_field\": \"_all\"," + 
                         "\"query\": \"" + queryString + "\"" + 
                       "}" + 
                   "}" + 
                "]" + 
              "}" + 
            "}," +
            "\"size\": 0," +
            "\"aggs\": {" + strAggs +"}"+ 
        "}";
    }

注意:
1).使用aggs聚合函数可以根据字段去重分组,如果有多字段需要去重分组,可以嵌套使用aggs;
2).实际项目中,建议尽量少的使用aggs,会比较吃性能,建议将数据查询后,在后台代码中进行去重分组

查询(Query)

1.执行查询操作,返回结果用实体类封装

    /**
     * Get Documents by query DSL
     * @param index
     * @param type
     * @param dsl variable sample: { "query":{ "query_string":{ "query":"field:value" } } }
     * @param classOfT sample: xxxVO.class
     * @return List<xxxVO>
     */
    public <T> List<T> getDocListByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
        String scrollId = "";
        List<T> list;
        try {
            list = new ArrayList<T>();
            SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                    .setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
            scrollId = scrollResp.getScrollId();
            Gson gson = new Gson();
            do {
                for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                    list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));
                }
                scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
                        .actionGet();

            } while (scrollResp.getHits().getHits().length > 0);
            return list;
        } catch (Exception ex) {
            throw ex;
        } finally {
            this.clearESScrollId(scrollId);
        }
    }

2.执行查询操作,返回结果用map封装

    /**
     * Get document map by query DSL 
     *  
     * @param index
     * @param type
     * @param dsl sample: { "query":{ "query_string":{ "query":"field:value" } } }
     * @param classOfT
     * @return Map<String, T> Key = document ID , value = Document VO
     */
    public <T> Map<String, T> getDocMapByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
        
        String scrollId = "";
        
        Map<String, T> retMap;
        
        try {
            
            retMap = new HashMap<String, T>();

            SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                    .setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
            scrollId = scrollResp.getScrollId();
            
            Gson gson = new Gson();
            
            do {
                
                for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                    retMap.put(searchHit.getId(), gson.fromJson(searchHit.getSourceAsString(), classOfT));
                }
                scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
                        .actionGet();

            } while (scrollResp.getHits().getHits().length > 0);
            
            return retMap;

        } catch (Exception ex) {
            throw ex;
        } finally {
            this.clearESScrollId(scrollId);
        }
    }

3.根据ID查询

    /**
     * Get one document by document ID
     * 
     * @param index
     * @param type
     * @param docID sample: AAA_BBB_CCC
     * @param classOfT 
     * @return xxxVO
     */
    public <T> T getDocByDocID(String index, String type, String docID, Class<T> classOfT) {

        GetResponse response = this.esClient.prepareGet(index, type, docID).get();

        Gson gson = new Gson();

        return gson.fromJson(response.getSourceAsString(), classOfT);
    }

4.根据ID的List查询

    /**
     * Get documents by their ID
     * 
     * @param index
     * @param type
     * @param docIDs sample:List<AAA_BBB_CCC  >
     * @param classOfT
     * @return
     * @throws Exception
     */
    public <T> List<T> getDocListByDocID(String index, String type, List<String> docIDs, Class<T> classOfT)
            throws Exception {

        String scrollId = "";
        List<T> list;

        try {

            list = new ArrayList<T>();

            SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type)
                    .setQuery(QueryBuilders.idsQuery().ids(docIDs)).setScroll(new TimeValue(60000)).setFrom(0)
                    .setSize(1000).execute().actionGet();

            Gson gson = new Gson();

            do {
                for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                    list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));
                }
                
                scrollId = scrollResp.getScrollId();
                
                scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
                        .actionGet();

            } while (scrollResp.getHits().getHits().length > 0);
            return list;
        } catch (Exception ex) {
            throw ex;
        } finally {
            this.clearESScrollId(scrollId);
        }
    }

5.根据ID查询字段值,返回结果用map封装

    /**
     * Get extra source of documents by document IDs
     * @param index
     * @param type
     * @param ids
     * @param fields
     * @return <p>Map<String, Object> key = id_field, value = document id_field's value.
     *            sample:Map<AAA_BBB_CCC_field, value></p>
     * @throws Exception
     */
    public Map<String, Object> getExtraMapByDocID(String index, String type, List<String> ids, String... fields)
            throws Exception {

        SearchResponse response = this.esClient.prepareSearch(index).setTypes(type)
                .setQuery(QueryBuilders.idsQuery().ids(ids)).setFetchSource(fields, null)
                .setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();

        Map<String, Object> retMap = Maps.newHashMap();

        while (true) {

            for (SearchHit hit : response.getHits().getHits()) {

                for (String field : fields) {

                    String key = String.format("%s_%s", hit.getId(), field);

                    retMap.put(key, hit.getSource().get(field));
                }

            }

            response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000))
                    .execute().actionGet();
            // Break condition: No hits are returned
            if (response.getHits().getHits().length == 0) {
                break;
            }

        }

        return retMap;

    }

6.根据sql查询字段值

    /**
     * Get Extra _source
     * 
     * @param index
     * @param type
     * @param queryString
     * @param fields
     * @return List<Map<String, Object>> Map<String, Object> each document key &
     *         value List<Map<String, Object>> all document key & value
     * @throws Exception
     */
    public List<Map<String, Object>> getExtraMapByQueryDSL(String index, String type, String queryDSL, String... fields)
            throws Exception {

        List<Map<String, Object>> rtnLst = Lists.newArrayList();

        String scrollId = "";

        try {

            SearchResponse response = this.esClient.prepareSearch(index)
                    .setTypes(type).setExtraSource(queryDSL)
                    .setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute()
                    .actionGet();

            scrollId = response.getScrollId();

            return this.getHitsResult(response, rtnLst, fields);

        } catch (Exception ex) {
            throw ex;
        } finally {
            this.clearESScrollId(scrollId);
        }
    }

    /**
     * @param response
     * @param rtnLst
     * @param fields
     * @return
     */
    private List<Map<String, Object>> getHitsResult(SearchResponse response, List<Map<String, Object>> rtnLst,
            String... fields) { 
        while (true) {
            for (SearchHit hit : response.getHits().getHits()) {
                Map<String, Object> tempMap = Maps.newHashMap();
                for (String field : fields) {
                    tempMap.put(field, hit.getSource().get(field));
                }
                rtnLst.add(tempMap);
            }
            response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000))
                    .execute().actionGet();
            if (response.getHits().getHits().length == 0) {
                break;
            }
        }
        return rtnLst;
    }

7.查询排序后的第一笔数据

    /**
     * Get Top 1 document by sorted DSL
     * 
     * @param index
     * @param type
     * @param dsl
     *            should by with sort
     * @param classOfT
     * @return
     */
    public <T> T getTop1DocByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
        String scrollId = "";
        try {
            SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                    .setSize(1).setScroll(new TimeValue(60000)).execute().actionGet();
            scrollId = scrollResp.getScrollId();
            Gson gson = new Gson();
            for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                return gson.fromJson(searchHit.getSourceAsString(), classOfT);
            }
            return null;
        } catch (Exception ex) {
            throw ex;
        } finally {
            this.clearESScrollId(scrollId);
        }
    }

8.通过sql查询数据是否存在

    /**
     * Check document exist or not by query DSL
     * 
     * @param queryDsl
     * @param esIndex
     * @param esType
     * @return
     * @throws Exception
     */
    public boolean checkDocExistByQueryDSL(String esIndex, String esType, String queryDsl) throws Exception {

        try {

            SearchRequestBuilder searchBuilder = this.esClient.prepareSearch(esIndex).setTypes(esType)
                    .setFetchSource(new String[] { "_id" }, null).setExtraSource(queryDsl).setFrom(0).setSize(1)
                    .setScroll(new TimeValue(60000));

            SearchResponse scrollResp = searchBuilder.execute().actionGet();

            if (scrollResp.getHits().getTotalHits() > 0) {
                return true;
            }

            return false;

        } catch (Exception e) {
            throw e;
        }

    }

9.查询数据笔数

    /**
     * Return total count by DSL
     */
    protected long getTotalHitsByDsl(String index, String type, String dsl) {
        String scrollId = "";
        try {
            SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                    .setSize(0).setScroll(new TimeValue(60000)).execute().actionGet();
            scrollId = scrollResp.getScrollId();
            return scrollResp.getHits().getTotalHits();
        } catch (Exception ex) {
            throw ex;
        } finally {
            this.clearESScrollId(scrollId);
        }
    }

新增(Upsert)

注:upsert用法:有记录就更新,没有记录就新增
1.通过ID 执行upsert操作

    /**
     * Upsert data by VO + document ID.
     * 
     * @param index
     * @param type
     * @param id
     * @param vo
     * @return
     */
    public <T> IndexResponse upsert(String index, String type, String docID, T vo) {
        Gson gson = new Gson();
        String source = gson.toJson(vo);
        return this.esClient.prepareIndex(index, type, docID).setRefresh(true).setSource(source).execute().actionGet();
    }

2.批量upsert

    /**
     * @param index
     * @param type
     * @param voList
     * @param getDocIDMethodInVo
     *                sample : public String getDocumentID() { return
     *               String.format("%s_%s", AAA,BBB); }
     * @param isAutoRefresh
     * @throws Exception
     */
    public <T> void bulkProcessUpsert(String index, String type, List<T> voList, String getDocIDMethodInVo, boolean isAutoRefresh) throws Exception {
        Gson gson = new Gson();
        
        BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();
        bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));
        
        for (T vo : voList) {

            Method addMethod = vo.getClass().getMethod(getDocIDMethodInVo, new Class[] {});

            Object result = addMethod.invoke(vo, new Object[] {});

             IndexRequestBuilder indexRequestBuilder = this.esClient.prepareIndex(index, type, result.toString()).setSource(gson.toJson(vo));
             bulkRequest.add(indexRequestBuilder);
        }
 
        if (bulkRequest.numberOfActions() > 0) {
            BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
            if (resp.hasFailures()) {
                throw new Exception(String.format("Bulk index items failed, message:%s", resp.buildFailureMessage()));
            }
        }
    }

更新(Update)

1.根据ID执行update操作

    /**
     * update Extra fields by document ID
     * 
     * @param index
     * @param type
     * @param id
     * @param fieldValMap:key=index field, value = value to be update
     * @return
     * @throws Exception
     */
    public UpdateResponse update(String index, String type, String id, Map<String, Object> fieldValMap) throws Exception {
        return this.esClient.update(new UpdateRequest(index, type, id).refresh(true).doc(fieldValMap)).get();
    }

2.批量update

    /**
     * Update datas. Map key is doc_id, Map Value is update field-value map.
     */
    protected void bulkProcessUpdate(String index, String type, Map<String, Map<String, Object>> mapUpdates, boolean isAutoRefresh)
            throws Exception {

        BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();
        bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));

        for (String docId : mapUpdates.keySet()) {
            Map<String, Object> mapUpdateFieldValue = mapUpdates.get(docId);
            XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();
            for (String key : mapUpdateFieldValue.keySet()) {
                jsonBuilder.field(key, mapUpdateFieldValue.get(key));
            }
            jsonBuilder.endObject();
            UpdateRequestBuilder updateRequestBuilder = this.esClient.prepareUpdate(index, type, docId)
                    .setDoc(jsonBuilder);
            bulkRequest.add(updateRequestBuilder);
        }

        if (bulkRequest.numberOfActions() > 0) {
            BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
            if (resp.hasFailures()) {
                throw new Exception(String.format("Bulk update items failed, message:%s", resp.buildFailureMessage()));
            }
        }
    }

删除(Delete)

1.根据ID 删除

    /**
     * Delete data by id
     */
    public boolean deleteById(String index, String type, String id) {
        return this.esClient.prepareDelete(index, type, id).setRefresh(true).execute().actionGet().isFound();
    }

2.批量删除

    /**
     * Bulk delete by document IDs
     * 
     * @param index
     * @param type
     * @param lstId
     * @throws Exception
     */
    public void bulkProcessDelete(String index, String type, List<String> lstId) throws Exception {

        final BulkProcessor bulkProcessor = BulkProcessor.builder(this.esClient, new BulkProcessor.Listener() {
            public void beforeBulk(long executionId, BulkRequest request) {
            }

            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            }

            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                failure.printStackTrace();
                logger.error("ElasticsearchHelper.bulkProcessDelete.Index is {}. error is {}. ", index,
                        failure.getMessage());
            }
        }).setBulkActions(BULK_MAX_SIZE).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                .setFlushInterval(TimeValue.timeValueSeconds(5)).build();
        try {
            for (String docId : lstId) {
                bulkProcessor.add(new DeleteRequest(index, type, docId));
            }
        } catch (Exception ex) {
            logger.error("ElasticsearchHelper.bulkProcessUpdateItems.Index is {}. error is {}. ", ex);
            throw ex;
        } finally {
            bulkProcessor.awaitClose(1, TimeUnit.MINUTES);
            this.esClient.admin().indices().prepareRefresh(index).get();
        }
    }

3.根据DSL删除

    /**
     * Delete by query
     * 
     * @param queryDsl
     * @param index
     * @param type
     * @throws Exception
     */
    public void deleteByQueryDsl(String queryDsl , String index, String type, boolean isAutoRefresh) throws Exception {
        
        Client client = null;
        
        try{
            
            client =this.esClient;

            SearchRequestBuilder searchBuilder = client.prepareSearch(index)
                        .setTypes(type)
                        .setExtraSource(queryDsl).setFrom(0).setSize(1500).setScroll(new TimeValue(60000));

            SearchResponse scrollResp = searchBuilder.execute().actionGet();
         
            BulkRequestBuilder bulkRequest = client.prepareBulk();
            bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));
                
            while (true) {
                for (SearchHit hit : scrollResp.getHits().getHits()) {
                        bulkRequest.add(client.prepareDelete(index, type, hit.getId()));
                }

                scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
                    
                // Break condition: No hits are returned
                if (scrollResp.getHits().getHits().length == 0) {
                    break;
                }
            }
            
            if (bulkRequest.numberOfActions() > 0) {
                BulkResponse response = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
                if (response.hasFailures()) {
                    throw new Exception(
                            "deleteByFilter failure:" + response.buildFailureMessage());
                }
            }
        }catch(Exception ex) {
            throw ex;
        }
    }
    /**
     * delete documents By Query DSL
     * 
     * @param index
     * @param type
     * @param dsl
     * @throws Exception
     */
    public void deleteByQueryDsl(String index, String type, String dsl) throws Exception {

        String scrollId = "";
        List<String> lstId;
        try {

            lstId = new ArrayList<String>();

            SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                    .setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
            scrollId = scrollResp.getScrollId();

            do {
                for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                    lstId.add(searchHit.getId());
                }
                scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
                        .actionGet();

            } while (scrollResp.getHits().getHits().length > 0);

            this.bulkProcessDelete(index, type, lstId);

        } catch (Exception ex) {
            throw ex;
        } finally {
            this.clearESScrollId(scrollId);
        }
    }

补充

我们在执行ES操作的时候,为了避免一次操作的数据量过大,通常会设置size;因此,我们用到了scrollId;在方法finally中,需要将scrollId清除。
例如:

public void clearESScrollId(String scrollId) {
        if (StringUtils.isNoneEmpty(scrollId)) {
            ClearScrollRequest clearScrollerRequest = new ClearScrollRequest();
            clearScrollerRequest.addScrollId(scrollId);
            this.esClient.clearScroll(clearScrollerRequest).actionGet();
        }
    }

相关文章

网友评论

      本文标题:ElasticSearch常用的增删查改操作

      本文链接:https://www.haomeiwen.com/subject/nncemctx.html