美文网首页
ElasticSearch常用的增删查改操作

ElasticSearch常用的增删查改操作

作者: Damon_e889 | 来源:发表于2019-10-17 21:05 被阅读0次

    使用Java对ElasticSearch增删查改操作,分为两个步骤:
    1.拼接sql语句
    2.执行增删查改操作
    以下提供了一些常用的轮子。

    sql拼接

    1.最普通的sql拼接

        /**
         * Get query DSL
         * @param queryString
         * @return Query DSL
         */
        public String getQueryDSL(String queryString) {
    
            String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } "
                    + " } ";
    
            return dsl;
        }
    

    2.查询并根据字段排序

        /**
         * Get query DSL with sort
         * @param filter
         * @param sortByASCOrDesc : key:sort field, value: asc or desc
         * @return query DSL with sort
         */
        public String getQueryDSL(String queryString, Map<String, String> sortByASCOrDesc) {
    
            String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } ";
    
            StringBuilder builder = new StringBuilder(dsl);
     
            builder.append(", \"sort\": [");
    
            int index = 0;
    
            for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {
    
                if (index == 0) {
                    builder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
                } else {
                    builder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
                }
    
                index++;
            }
    
            builder.append("]");
             
            builder.append(" } ");
    
            return builder.toString();
        }
    

    3.查询区间并排序

        /**
         * Get query DSL with range
         * <p>queryString</p>
         * <p>rangeMap: Key = range field, value = key = gte or gt or lte or lt, value = value </p>
         * <p> sortByASCOrDesc: key:sort field, value: asc or desc. input null if no need sort </p>
         * @return query DSL with range
         */
        public String getRangeQueryDSL(String queryString, Map<String, Map<String, String>> rangeMap, Map<String, String> sortByASCOrDesc) {
    
            StringBuilder strBuilder = new StringBuilder();
    
            strBuilder.append("{").append("\"query\": {")
            .append("\"bool\": {").append("\"must\": [").append("{")
                    .append("\"query_string\": {").append(" \"query\":\"" + queryString + "\" ").append("}").append("},");
    
            int jj = 0;
    
            for (Map.Entry<String, Map<String, String>> pair : rangeMap.entrySet()) {
    
                String field = pair.getKey();
    
                Map<String, String> range = pair.getValue();
    
                if (jj == 0) {
    
                    strBuilder.append("{").append("\"range\": {").append(" \"" + field + "\": { ");
    
                    int ii = 0;
    
                    for (Map.Entry<String, String> map : range.entrySet()) {
    
                        if (ii == 0 && range.size() > 1) {
                            strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");
                        } else {
                            strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");
                        }
    
                        ii++;
    
                    }
                    strBuilder.append("}").append("}").append("}");
                    
                } else {
    
                    strBuilder.append(",{").append("\"range\": {").append(" \"" + field + "\": { ");
    
                    int ii = 0;
    
                    for (Map.Entry<String, String> map : range.entrySet()) {
    
                        if (ii == 0 && range.size() > 1) {
                            strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");
                        } else {
                            strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\"  ");
                        }
    
                        ii++;
    
                    }
                    strBuilder.append("}").append("}").append("}");
    
                }
                
                jj++;
                
            }
            
            strBuilder.append("]").append("}").append("}");
    
            if(sortByASCOrDesc !=null){
                
                strBuilder.append(", \"sort\": [");
        
                jj = 0;
    
                for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {
        
                    if (jj == 0) {
                        strBuilder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
                    } else {
                        strBuilder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
                    }
        
                    jj++;
                }
        
                 strBuilder.append("]");
                 
                }
            
            strBuilder.append("}");
            
            return strBuilder.toString();
    
        }
    

    4.查所有

        /**
         * @return
         */
        public String getQueryAllDSL() {
    
            String dsl = "{ " + " \"query\": { " + " \"match_all\": {} " + " } " + " } ";
    
            return dsl;
        }
    

    5.根据字段分组

        private String getAggsQueryDsl(String queryString, String strAggs) {
            return 
            "{" + 
              "\"query\": {" + 
                 "\"bool\": {" + 
                    "\"must\": [" + 
                       "{" + 
                          "\"query_string\": {" + 
                             "\"default_field\": \"_all\"," + 
                             "\"query\": \"" + queryString + "\"" + 
                           "}" + 
                       "}" + 
                    "]" + 
                  "}" + 
                "}," +
                "\"size\": 0," +
                "\"aggs\": {" + strAggs +"}"+ 
            "}";
        }
    

    注意:
    1).使用aggs聚合函数可以根据字段去重分组,如果有多字段需要去重分组,可以嵌套使用aggs;
    2).实际项目中,建议尽量少的使用aggs,会比较吃性能,建议将数据查询后,在后台代码中进行去重分组

    查询(Query)

    1.执行查询操作,返回结果用实体类封装

        /**
         * Get Documents by query DSL
         * @param index
         * @param type
         * @param dsl variable sample: { "query":{ "query_string":{ "query":"field:value" } } }
         * @param classOfT sample: xxxVO.class
         * @return List<xxxVO>
         */
        public <T> List<T> getDocListByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
            String scrollId = "";
            List<T> list;
            try {
                list = new ArrayList<T>();
                SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                        .setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
                scrollId = scrollResp.getScrollId();
                Gson gson = new Gson();
                do {
                    for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                        list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));
                    }
                    scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
                            .actionGet();
    
                } while (scrollResp.getHits().getHits().length > 0);
                return list;
            } catch (Exception ex) {
                throw ex;
            } finally {
                this.clearESScrollId(scrollId);
            }
        }
    

    2.执行查询操作,返回结果用map封装

        /**
         * Get document map by query DSL 
         *  
         * @param index
         * @param type
         * @param dsl sample: { "query":{ "query_string":{ "query":"field:value" } } }
         * @param classOfT
         * @return Map<String, T> Key = document ID , value = Document VO
         */
        public <T> Map<String, T> getDocMapByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
            
            String scrollId = "";
            
            Map<String, T> retMap;
            
            try {
                
                retMap = new HashMap<String, T>();
    
                SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                        .setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
                scrollId = scrollResp.getScrollId();
                
                Gson gson = new Gson();
                
                do {
                    
                    for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                        retMap.put(searchHit.getId(), gson.fromJson(searchHit.getSourceAsString(), classOfT));
                    }
                    scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
                            .actionGet();
    
                } while (scrollResp.getHits().getHits().length > 0);
                
                return retMap;
    
            } catch (Exception ex) {
                throw ex;
            } finally {
                this.clearESScrollId(scrollId);
            }
        }
    
    

    3.根据ID查询

        /**
         * Get one document by document ID
         * 
         * @param index
         * @param type
         * @param docID sample: AAA_BBB_CCC
         * @param classOfT 
         * @return xxxVO
         */
        public <T> T getDocByDocID(String index, String type, String docID, Class<T> classOfT) {
    
            GetResponse response = this.esClient.prepareGet(index, type, docID).get();
    
            Gson gson = new Gson();
    
            return gson.fromJson(response.getSourceAsString(), classOfT);
        }
    

    4.根据ID的List查询

        /**
         * Get documents by their ID
         * 
         * @param index
         * @param type
         * @param docIDs sample:List<AAA_BBB_CCC  >
         * @param classOfT
         * @return
         * @throws Exception
         */
        public <T> List<T> getDocListByDocID(String index, String type, List<String> docIDs, Class<T> classOfT)
                throws Exception {
    
            String scrollId = "";
            List<T> list;
    
            try {
    
                list = new ArrayList<T>();
    
                SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type)
                        .setQuery(QueryBuilders.idsQuery().ids(docIDs)).setScroll(new TimeValue(60000)).setFrom(0)
                        .setSize(1000).execute().actionGet();
    
                Gson gson = new Gson();
    
                do {
                    for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                        list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));
                    }
                    
                    scrollId = scrollResp.getScrollId();
                    
                    scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
                            .actionGet();
    
                } while (scrollResp.getHits().getHits().length > 0);
                return list;
            } catch (Exception ex) {
                throw ex;
            } finally {
                this.clearESScrollId(scrollId);
            }
        }
    

    5.根据ID查询字段值,返回结果用map封装

        /**
         * Get extra source of documents by document IDs
         * @param index
         * @param type
         * @param ids
         * @param fields
         * @return <p>Map<String, Object> key = id_field, value = document id_field's value.
         *            sample:Map<AAA_BBB_CCC_field, value></p>
         * @throws Exception
         */
        public Map<String, Object> getExtraMapByDocID(String index, String type, List<String> ids, String... fields)
                throws Exception {
    
            SearchResponse response = this.esClient.prepareSearch(index).setTypes(type)
                    .setQuery(QueryBuilders.idsQuery().ids(ids)).setFetchSource(fields, null)
                    .setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();
    
            Map<String, Object> retMap = Maps.newHashMap();
    
            while (true) {
    
                for (SearchHit hit : response.getHits().getHits()) {
    
                    for (String field : fields) {
    
                        String key = String.format("%s_%s", hit.getId(), field);
    
                        retMap.put(key, hit.getSource().get(field));
                    }
    
                }
    
                response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000))
                        .execute().actionGet();
                // Break condition: No hits are returned
                if (response.getHits().getHits().length == 0) {
                    break;
                }
    
            }
    
            return retMap;
    
        }
    

    6.根据sql查询字段值

        /**
         * Get Extra _source
         * 
         * @param index
         * @param type
         * @param queryString
         * @param fields
         * @return List<Map<String, Object>> Map<String, Object> each document key &
         *         value List<Map<String, Object>> all document key & value
         * @throws Exception
         */
        public List<Map<String, Object>> getExtraMapByQueryDSL(String index, String type, String queryDSL, String... fields)
                throws Exception {
    
            List<Map<String, Object>> rtnLst = Lists.newArrayList();
    
            String scrollId = "";
    
            try {
    
                SearchResponse response = this.esClient.prepareSearch(index)
                        .setTypes(type).setExtraSource(queryDSL)
                        .setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute()
                        .actionGet();
    
                scrollId = response.getScrollId();
    
                return this.getHitsResult(response, rtnLst, fields);
    
            } catch (Exception ex) {
                throw ex;
            } finally {
                this.clearESScrollId(scrollId);
            }
        }
    
        /**
         * @param response
         * @param rtnLst
         * @param fields
         * @return
         */
        private List<Map<String, Object>> getHitsResult(SearchResponse response, List<Map<String, Object>> rtnLst,
                String... fields) { 
            while (true) {
                for (SearchHit hit : response.getHits().getHits()) {
                    Map<String, Object> tempMap = Maps.newHashMap();
                    for (String field : fields) {
                        tempMap.put(field, hit.getSource().get(field));
                    }
                    rtnLst.add(tempMap);
                }
                response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000))
                        .execute().actionGet();
                if (response.getHits().getHits().length == 0) {
                    break;
                }
            }
            return rtnLst;
        }
    

    7.查询排序后的第一笔数据

        /**
         * Get Top 1 document by sorted DSL
         * 
         * @param index
         * @param type
         * @param dsl
         *            should by with sort
         * @param classOfT
         * @return
         */
        public <T> T getTop1DocByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
            String scrollId = "";
            try {
                SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                        .setSize(1).setScroll(new TimeValue(60000)).execute().actionGet();
                scrollId = scrollResp.getScrollId();
                Gson gson = new Gson();
                for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                    return gson.fromJson(searchHit.getSourceAsString(), classOfT);
                }
                return null;
            } catch (Exception ex) {
                throw ex;
            } finally {
                this.clearESScrollId(scrollId);
            }
        }
    

    8.通过sql查询数据是否存在

        /**
         * Check document exist or not by query DSL
         * 
         * @param queryDsl
         * @param esIndex
         * @param esType
         * @return
         * @throws Exception
         */
        public boolean checkDocExistByQueryDSL(String esIndex, String esType, String queryDsl) throws Exception {
    
            try {
    
                SearchRequestBuilder searchBuilder = this.esClient.prepareSearch(esIndex).setTypes(esType)
                        .setFetchSource(new String[] { "_id" }, null).setExtraSource(queryDsl).setFrom(0).setSize(1)
                        .setScroll(new TimeValue(60000));
    
                SearchResponse scrollResp = searchBuilder.execute().actionGet();
    
                if (scrollResp.getHits().getTotalHits() > 0) {
                    return true;
                }
    
                return false;
    
            } catch (Exception e) {
                throw e;
            }
    
        }
    

    9.查询数据笔数

        /**
         * Return total count by DSL
         */
        protected long getTotalHitsByDsl(String index, String type, String dsl) {
            String scrollId = "";
            try {
                SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                        .setSize(0).setScroll(new TimeValue(60000)).execute().actionGet();
                scrollId = scrollResp.getScrollId();
                return scrollResp.getHits().getTotalHits();
            } catch (Exception ex) {
                throw ex;
            } finally {
                this.clearESScrollId(scrollId);
            }
        }
    

    新增(Upsert)

    注:upsert用法:有记录就更新,没有记录就新增
    1.通过ID 执行upsert操作

        /**
         * Upsert data by VO + document ID.
         * 
         * @param index
         * @param type
         * @param id
         * @param vo
         * @return
         */
        public <T> IndexResponse upsert(String index, String type, String docID, T vo) {
            Gson gson = new Gson();
            String source = gson.toJson(vo);
            return this.esClient.prepareIndex(index, type, docID).setRefresh(true).setSource(source).execute().actionGet();
        }
    

    2.批量upsert

        /**
         * @param index
         * @param type
         * @param voList
         * @param getDocIDMethodInVo
         *                sample : public String getDocumentID() { return
         *               String.format("%s_%s", AAA,BBB); }
         * @param isAutoRefresh
         * @throws Exception
         */
        public <T> void bulkProcessUpsert(String index, String type, List<T> voList, String getDocIDMethodInVo, boolean isAutoRefresh) throws Exception {
            Gson gson = new Gson();
            
            BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();
            bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));
            
            for (T vo : voList) {
    
                Method addMethod = vo.getClass().getMethod(getDocIDMethodInVo, new Class[] {});
    
                Object result = addMethod.invoke(vo, new Object[] {});
    
                 IndexRequestBuilder indexRequestBuilder = this.esClient.prepareIndex(index, type, result.toString()).setSource(gson.toJson(vo));
                 bulkRequest.add(indexRequestBuilder);
            }
     
            if (bulkRequest.numberOfActions() > 0) {
                BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
                if (resp.hasFailures()) {
                    throw new Exception(String.format("Bulk index items failed, message:%s", resp.buildFailureMessage()));
                }
            }
        }
    

    更新(Update)

    1.根据ID执行update操作

        /**
         * update Extra fields by document ID
         * 
         * @param index
         * @param type
         * @param id
         * @param fieldValMap:key=index field, value = value to be update
         * @return
         * @throws Exception
         */
        public UpdateResponse update(String index, String type, String id, Map<String, Object> fieldValMap) throws Exception {
            return this.esClient.update(new UpdateRequest(index, type, id).refresh(true).doc(fieldValMap)).get();
        }
    

    2.批量update

        /**
         * Update datas. Map key is doc_id, Map Value is update field-value map.
         */
        protected void bulkProcessUpdate(String index, String type, Map<String, Map<String, Object>> mapUpdates, boolean isAutoRefresh)
                throws Exception {
    
            BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();
            bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));
    
            for (String docId : mapUpdates.keySet()) {
                Map<String, Object> mapUpdateFieldValue = mapUpdates.get(docId);
                XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();
                for (String key : mapUpdateFieldValue.keySet()) {
                    jsonBuilder.field(key, mapUpdateFieldValue.get(key));
                }
                jsonBuilder.endObject();
                UpdateRequestBuilder updateRequestBuilder = this.esClient.prepareUpdate(index, type, docId)
                        .setDoc(jsonBuilder);
                bulkRequest.add(updateRequestBuilder);
            }
    
            if (bulkRequest.numberOfActions() > 0) {
                BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
                if (resp.hasFailures()) {
                    throw new Exception(String.format("Bulk update items failed, message:%s", resp.buildFailureMessage()));
                }
            }
        }
    

    删除(Delete)

    1.根据ID 删除

        /**
         * Delete data by id
         */
        public boolean deleteById(String index, String type, String id) {
            return this.esClient.prepareDelete(index, type, id).setRefresh(true).execute().actionGet().isFound();
        }
    

    2.批量删除

        /**
         * Bulk delete by document IDs
         * 
         * @param index
         * @param type
         * @param lstId
         * @throws Exception
         */
        public void bulkProcessDelete(String index, String type, List<String> lstId) throws Exception {
    
            final BulkProcessor bulkProcessor = BulkProcessor.builder(this.esClient, new BulkProcessor.Listener() {
                public void beforeBulk(long executionId, BulkRequest request) {
                }
    
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                }
    
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    failure.printStackTrace();
                    logger.error("ElasticsearchHelper.bulkProcessDelete.Index is {}. error is {}. ", index,
                            failure.getMessage());
                }
            }).setBulkActions(BULK_MAX_SIZE).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                    .setFlushInterval(TimeValue.timeValueSeconds(5)).build();
            try {
                for (String docId : lstId) {
                    bulkProcessor.add(new DeleteRequest(index, type, docId));
                }
            } catch (Exception ex) {
                logger.error("ElasticsearchHelper.bulkProcessUpdateItems.Index is {}. error is {}. ", ex);
                throw ex;
            } finally {
                bulkProcessor.awaitClose(1, TimeUnit.MINUTES);
                this.esClient.admin().indices().prepareRefresh(index).get();
            }
        }
    

    3.根据DSL删除

        /**
         * Delete by query
         * 
         * @param queryDsl
         * @param index
         * @param type
         * @throws Exception
         */
        public void deleteByQueryDsl(String queryDsl , String index, String type, boolean isAutoRefresh) throws Exception {
            
            Client client = null;
            
            try{
                
                client =this.esClient;
    
                SearchRequestBuilder searchBuilder = client.prepareSearch(index)
                            .setTypes(type)
                            .setExtraSource(queryDsl).setFrom(0).setSize(1500).setScroll(new TimeValue(60000));
    
                SearchResponse scrollResp = searchBuilder.execute().actionGet();
             
                BulkRequestBuilder bulkRequest = client.prepareBulk();
                bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));
                    
                while (true) {
                    for (SearchHit hit : scrollResp.getHits().getHits()) {
                            bulkRequest.add(client.prepareDelete(index, type, hit.getId()));
                    }
    
                    scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
                        
                    // Break condition: No hits are returned
                    if (scrollResp.getHits().getHits().length == 0) {
                        break;
                    }
                }
                
                if (bulkRequest.numberOfActions() > 0) {
                    BulkResponse response = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
                    if (response.hasFailures()) {
                        throw new Exception(
                                "deleteByFilter failure:" + response.buildFailureMessage());
                    }
                }
            }catch(Exception ex) {
                throw ex;
            }
        }
    
        /**
         * delete documents By Query DSL
         * 
         * @param index
         * @param type
         * @param dsl
         * @throws Exception
         */
        public void deleteByQueryDsl(String index, String type, String dsl) throws Exception {
    
            String scrollId = "";
            List<String> lstId;
            try {
    
                lstId = new ArrayList<String>();
    
                SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
                        .setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
                scrollId = scrollResp.getScrollId();
    
                do {
                    for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                        lstId.add(searchHit.getId());
                    }
                    scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
                            .actionGet();
    
                } while (scrollResp.getHits().getHits().length > 0);
    
                this.bulkProcessDelete(index, type, lstId);
    
            } catch (Exception ex) {
                throw ex;
            } finally {
                this.clearESScrollId(scrollId);
            }
        }
    

    补充

    我们在执行ES操作的时候,为了避免一次操作的数据量过大,通常会设置size;因此,我们用到了scrollId;在方法finally中,需要将scrollId清除。
    例如:

    public void clearESScrollId(String scrollId) {
            if (StringUtils.isNoneEmpty(scrollId)) {
                ClearScrollRequest clearScrollerRequest = new ClearScrollRequest();
                clearScrollerRequest.addScrollId(scrollId);
                this.esClient.clearScroll(clearScrollerRequest).actionGet();
            }
        }
    

    相关文章

      网友评论

          本文标题:ElasticSearch常用的增删查改操作

          本文链接:https://www.haomeiwen.com/subject/nncemctx.html