使用Java对ElasticSearch增删查改操作,分为两个步骤:
1.拼接sql语句
2.执行增删查改操作
以下提供了一些常用的轮子。
sql拼接
1.最普通的sql拼接
/**
* Get query DSL
* @param queryString
* @return Query DSL
*/
public String getQueryDSL(String queryString) {
String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } "
+ " } ";
return dsl;
}
2.查询并根据字段排序
/**
* Get query DSL with sort
* @param filter
* @param sortByASCOrDesc : key:sort field, value: asc or desc
* @return query DSL with sort
*/
public String getQueryDSL(String queryString, Map<String, String> sortByASCOrDesc) {
String dsl = "{" + " \"query\": { " + " \"query_string\": { " + " \"query\":\"" + queryString + "\" " + "}" + " } ";
StringBuilder builder = new StringBuilder(dsl);
builder.append(", \"sort\": [");
int index = 0;
for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {
if (index == 0) {
builder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
} else {
builder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
}
index++;
}
builder.append("]");
builder.append(" } ");
return builder.toString();
}
3.查询区间并排序
/**
* Get query DSL with range
* <p>queryString</p>
* <p>rangeMap: Key = range field, value = key = gte or gt or lte or lt, value = value </p>
* <p> sortByASCOrDesc: key:sort field, value: asc or desc. input null if no need sort </p>
* @return query DSL with range
*/
public String getRangeQueryDSL(String queryString, Map<String, Map<String, String>> rangeMap, Map<String, String> sortByASCOrDesc) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("{").append("\"query\": {")
.append("\"bool\": {").append("\"must\": [").append("{")
.append("\"query_string\": {").append(" \"query\":\"" + queryString + "\" ").append("}").append("},");
int jj = 0;
for (Map.Entry<String, Map<String, String>> pair : rangeMap.entrySet()) {
String field = pair.getKey();
Map<String, String> range = pair.getValue();
if (jj == 0) {
strBuilder.append("{").append("\"range\": {").append(" \"" + field + "\": { ");
int ii = 0;
for (Map.Entry<String, String> map : range.entrySet()) {
if (ii == 0 && range.size() > 1) {
strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");
} else {
strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" ");
}
ii++;
}
strBuilder.append("}").append("}").append("}");
} else {
strBuilder.append(",{").append("\"range\": {").append(" \"" + field + "\": { ");
int ii = 0;
for (Map.Entry<String, String> map : range.entrySet()) {
if (ii == 0 && range.size() > 1) {
strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" , ");
} else {
strBuilder.append(" \"" + map.getKey() + "\": \"" + map.getValue() + "\" ");
}
ii++;
}
strBuilder.append("}").append("}").append("}");
}
jj++;
}
strBuilder.append("]").append("}").append("}");
if(sortByASCOrDesc !=null){
strBuilder.append(", \"sort\": [");
jj = 0;
for (Map.Entry<String, String> pars : sortByASCOrDesc.entrySet()) {
if (jj == 0) {
strBuilder.append(String.format("{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
} else {
strBuilder.append(String.format(",{\"%s\":\"%s\"}", pars.getKey(), pars.getValue()));
}
jj++;
}
strBuilder.append("]");
}
strBuilder.append("}");
return strBuilder.toString();
}
4.查所有
/**
* @return
*/
public String getQueryAllDSL() {
String dsl = "{ " + " \"query\": { " + " \"match_all\": {} " + " } " + " } ";
return dsl;
}
5.根据字段分组
private String getAggsQueryDsl(String queryString, String strAggs) {
return
"{" +
"\"query\": {" +
"\"bool\": {" +
"\"must\": [" +
"{" +
"\"query_string\": {" +
"\"default_field\": \"_all\"," +
"\"query\": \"" + queryString + "\"" +
"}" +
"}" +
"]" +
"}" +
"}," +
"\"size\": 0," +
"\"aggs\": {" + strAggs +"}"+
"}";
}
注意:
1).使用aggs聚合函数可以根据字段去重分组,如果有多字段需要去重分组,可以嵌套使用aggs;
2).实际项目中,建议尽量少的使用aggs,会比较吃性能,建议将数据查询后,在后台代码中进行去重分组
查询(Query)
1.执行查询操作,返回结果用实体类封装
/**
* Get Documents by query DSL
* @param index
* @param type
* @param dsl variable sample: { "query":{ "query_string":{ "query":"field:value" } } }
* @param classOfT sample: xxxVO.class
* @return List<xxxVO>
*/
public <T> List<T> getDocListByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
String scrollId = "";
List<T> list;
try {
list = new ArrayList<T>();
SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
.setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
scrollId = scrollResp.getScrollId();
Gson gson = new Gson();
do {
for (SearchHit searchHit : scrollResp.getHits().getHits()) {
list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));
}
scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
.actionGet();
} while (scrollResp.getHits().getHits().length > 0);
return list;
} catch (Exception ex) {
throw ex;
} finally {
this.clearESScrollId(scrollId);
}
}
2.执行查询操作,返回结果用map封装
/**
* Get document map by query DSL
*
* @param index
* @param type
* @param dsl sample: { "query":{ "query_string":{ "query":"field:value" } } }
* @param classOfT
* @return Map<String, T> Key = document ID , value = Document VO
*/
public <T> Map<String, T> getDocMapByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
String scrollId = "";
Map<String, T> retMap;
try {
retMap = new HashMap<String, T>();
SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
.setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
scrollId = scrollResp.getScrollId();
Gson gson = new Gson();
do {
for (SearchHit searchHit : scrollResp.getHits().getHits()) {
retMap.put(searchHit.getId(), gson.fromJson(searchHit.getSourceAsString(), classOfT));
}
scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
.actionGet();
} while (scrollResp.getHits().getHits().length > 0);
return retMap;
} catch (Exception ex) {
throw ex;
} finally {
this.clearESScrollId(scrollId);
}
}
3.根据ID查询
/**
* Get one document by document ID
*
* @param index
* @param type
* @param docID sample: AAA_BBB_CCC
* @param classOfT
* @return xxxVO
*/
public <T> T getDocByDocID(String index, String type, String docID, Class<T> classOfT) {
GetResponse response = this.esClient.prepareGet(index, type, docID).get();
Gson gson = new Gson();
return gson.fromJson(response.getSourceAsString(), classOfT);
}
4.根据ID的List查询
/**
* Get documents by their ID
*
* @param index
* @param type
* @param docIDs sample:List<AAA_BBB_CCC >
* @param classOfT
* @return
* @throws Exception
*/
public <T> List<T> getDocListByDocID(String index, String type, List<String> docIDs, Class<T> classOfT)
throws Exception {
String scrollId = "";
List<T> list;
try {
list = new ArrayList<T>();
SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type)
.setQuery(QueryBuilders.idsQuery().ids(docIDs)).setScroll(new TimeValue(60000)).setFrom(0)
.setSize(1000).execute().actionGet();
Gson gson = new Gson();
do {
for (SearchHit searchHit : scrollResp.getHits().getHits()) {
list.add(gson.fromJson(searchHit.getSourceAsString(), classOfT));
}
scrollId = scrollResp.getScrollId();
scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
.actionGet();
} while (scrollResp.getHits().getHits().length > 0);
return list;
} catch (Exception ex) {
throw ex;
} finally {
this.clearESScrollId(scrollId);
}
}
5.根据ID查询字段值,返回结果用map封装
/**
* Get extra source of documents by document IDs
* @param index
* @param type
* @param ids
* @param fields
* @return <p>Map<String, Object> key = id_field, value = document id_field's value.
* sample:Map<AAA_BBB_CCC_field, value></p>
* @throws Exception
*/
public Map<String, Object> getExtraMapByDocID(String index, String type, List<String> ids, String... fields)
throws Exception {
SearchResponse response = this.esClient.prepareSearch(index).setTypes(type)
.setQuery(QueryBuilders.idsQuery().ids(ids)).setFetchSource(fields, null)
.setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute().actionGet();
Map<String, Object> retMap = Maps.newHashMap();
while (true) {
for (SearchHit hit : response.getHits().getHits()) {
for (String field : fields) {
String key = String.format("%s_%s", hit.getId(), field);
retMap.put(key, hit.getSource().get(field));
}
}
response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000))
.execute().actionGet();
// Break condition: No hits are returned
if (response.getHits().getHits().length == 0) {
break;
}
}
return retMap;
}
6.根据sql查询字段值
/**
* Get Extra _source
*
* @param index
* @param type
* @param queryString
* @param fields
* @return List<Map<String, Object>> Map<String, Object> each document key &
* value List<Map<String, Object>> all document key & value
* @throws Exception
*/
public List<Map<String, Object>> getExtraMapByQueryDSL(String index, String type, String queryDSL, String... fields)
throws Exception {
List<Map<String, Object>> rtnLst = Lists.newArrayList();
String scrollId = "";
try {
SearchResponse response = this.esClient.prepareSearch(index)
.setTypes(type).setExtraSource(queryDSL)
.setFetchSource(fields, null).setScroll(new TimeValue(60000)).setFrom(0).setSize(1000).execute()
.actionGet();
scrollId = response.getScrollId();
return this.getHitsResult(response, rtnLst, fields);
} catch (Exception ex) {
throw ex;
} finally {
this.clearESScrollId(scrollId);
}
}
/**
* @param response
* @param rtnLst
* @param fields
* @return
*/
private List<Map<String, Object>> getHitsResult(SearchResponse response, List<Map<String, Object>> rtnLst,
String... fields) {
while (true) {
for (SearchHit hit : response.getHits().getHits()) {
Map<String, Object> tempMap = Maps.newHashMap();
for (String field : fields) {
tempMap.put(field, hit.getSource().get(field));
}
rtnLst.add(tempMap);
}
response = this.esClient.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000))
.execute().actionGet();
if (response.getHits().getHits().length == 0) {
break;
}
}
return rtnLst;
}
7.查询排序后的第一笔数据
/**
* Get Top 1 document by sorted DSL
*
* @param index
* @param type
* @param dsl
* should by with sort
* @param classOfT
* @return
*/
public <T> T getTop1DocByQueryDSL(String index, String type, String dsl, Class<T> classOfT) {
String scrollId = "";
try {
SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
.setSize(1).setScroll(new TimeValue(60000)).execute().actionGet();
scrollId = scrollResp.getScrollId();
Gson gson = new Gson();
for (SearchHit searchHit : scrollResp.getHits().getHits()) {
return gson.fromJson(searchHit.getSourceAsString(), classOfT);
}
return null;
} catch (Exception ex) {
throw ex;
} finally {
this.clearESScrollId(scrollId);
}
}
8.通过sql查询数据是否存在
/**
* Check document exist or not by query DSL
*
* @param queryDsl
* @param esIndex
* @param esType
* @return
* @throws Exception
*/
public boolean checkDocExistByQueryDSL(String esIndex, String esType, String queryDsl) throws Exception {
try {
SearchRequestBuilder searchBuilder = this.esClient.prepareSearch(esIndex).setTypes(esType)
.setFetchSource(new String[] { "_id" }, null).setExtraSource(queryDsl).setFrom(0).setSize(1)
.setScroll(new TimeValue(60000));
SearchResponse scrollResp = searchBuilder.execute().actionGet();
if (scrollResp.getHits().getTotalHits() > 0) {
return true;
}
return false;
} catch (Exception e) {
throw e;
}
}
9.查询数据笔数
/**
* Return total count by DSL
*/
protected long getTotalHitsByDsl(String index, String type, String dsl) {
String scrollId = "";
try {
SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
.setSize(0).setScroll(new TimeValue(60000)).execute().actionGet();
scrollId = scrollResp.getScrollId();
return scrollResp.getHits().getTotalHits();
} catch (Exception ex) {
throw ex;
} finally {
this.clearESScrollId(scrollId);
}
}
新增(Upsert)
注:upsert用法:有记录就更新,没有记录就新增
1.通过ID 执行upsert操作
/**
* Upsert data by VO + document ID.
*
* @param index
* @param type
* @param id
* @param vo
* @return
*/
public <T> IndexResponse upsert(String index, String type, String docID, T vo) {
Gson gson = new Gson();
String source = gson.toJson(vo);
return this.esClient.prepareIndex(index, type, docID).setRefresh(true).setSource(source).execute().actionGet();
}
2.批量upsert
/**
* @param index
* @param type
* @param voList
* @param getDocIDMethodInVo
* sample : public String getDocumentID() { return
* String.format("%s_%s", AAA,BBB); }
* @param isAutoRefresh
* @throws Exception
*/
public <T> void bulkProcessUpsert(String index, String type, List<T> voList, String getDocIDMethodInVo, boolean isAutoRefresh) throws Exception {
Gson gson = new Gson();
BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();
bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));
for (T vo : voList) {
Method addMethod = vo.getClass().getMethod(getDocIDMethodInVo, new Class[] {});
Object result = addMethod.invoke(vo, new Object[] {});
IndexRequestBuilder indexRequestBuilder = this.esClient.prepareIndex(index, type, result.toString()).setSource(gson.toJson(vo));
bulkRequest.add(indexRequestBuilder);
}
if (bulkRequest.numberOfActions() > 0) {
BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
if (resp.hasFailures()) {
throw new Exception(String.format("Bulk index items failed, message:%s", resp.buildFailureMessage()));
}
}
}
更新(Update)
1.根据ID执行update操作
/**
* update Extra fields by document ID
*
* @param index
* @param type
* @param id
* @param fieldValMap:key=index field, value = value to be update
* @return
* @throws Exception
*/
public UpdateResponse update(String index, String type, String id, Map<String, Object> fieldValMap) throws Exception {
return this.esClient.update(new UpdateRequest(index, type, id).refresh(true).doc(fieldValMap)).get();
}
2.批量update
/**
* Update datas. Map key is doc_id, Map Value is update field-value map.
*/
protected void bulkProcessUpdate(String index, String type, Map<String, Map<String, Object>> mapUpdates, boolean isAutoRefresh)
throws Exception {
BulkRequestBuilder bulkRequest = this.esClient.prepareBulk();
bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));
for (String docId : mapUpdates.keySet()) {
Map<String, Object> mapUpdateFieldValue = mapUpdates.get(docId);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();
for (String key : mapUpdateFieldValue.keySet()) {
jsonBuilder.field(key, mapUpdateFieldValue.get(key));
}
jsonBuilder.endObject();
UpdateRequestBuilder updateRequestBuilder = this.esClient.prepareUpdate(index, type, docId)
.setDoc(jsonBuilder);
bulkRequest.add(updateRequestBuilder);
}
if (bulkRequest.numberOfActions() > 0) {
BulkResponse resp = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
if (resp.hasFailures()) {
throw new Exception(String.format("Bulk update items failed, message:%s", resp.buildFailureMessage()));
}
}
}
删除(Delete)
1.根据ID 删除
/**
* Delete data by id
*/
public boolean deleteById(String index, String type, String id) {
return this.esClient.prepareDelete(index, type, id).setRefresh(true).execute().actionGet().isFound();
}
2.批量删除
/**
* Bulk delete by document IDs
*
* @param index
* @param type
* @param lstId
* @throws Exception
*/
public void bulkProcessDelete(String index, String type, List<String> lstId) throws Exception {
final BulkProcessor bulkProcessor = BulkProcessor.builder(this.esClient, new BulkProcessor.Listener() {
public void beforeBulk(long executionId, BulkRequest request) {
}
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
}
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
failure.printStackTrace();
logger.error("ElasticsearchHelper.bulkProcessDelete.Index is {}. error is {}. ", index,
failure.getMessage());
}
}).setBulkActions(BULK_MAX_SIZE).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5)).build();
try {
for (String docId : lstId) {
bulkProcessor.add(new DeleteRequest(index, type, docId));
}
} catch (Exception ex) {
logger.error("ElasticsearchHelper.bulkProcessUpdateItems.Index is {}. error is {}. ", ex);
throw ex;
} finally {
bulkProcessor.awaitClose(1, TimeUnit.MINUTES);
this.esClient.admin().indices().prepareRefresh(index).get();
}
}
3.根据DSL删除
/**
* Delete by query
*
* @param queryDsl
* @param index
* @param type
* @throws Exception
*/
public void deleteByQueryDsl(String queryDsl , String index, String type, boolean isAutoRefresh) throws Exception {
Client client = null;
try{
client =this.esClient;
SearchRequestBuilder searchBuilder = client.prepareSearch(index)
.setTypes(type)
.setExtraSource(queryDsl).setFrom(0).setSize(1500).setScroll(new TimeValue(60000));
SearchResponse scrollResp = searchBuilder.execute().actionGet();
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.setTimeout(TimeValue.timeValueMinutes(20));
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
bulkRequest.add(client.prepareDelete(index, type, hit.getId()));
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
// Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
if (bulkRequest.numberOfActions() > 0) {
BulkResponse response = bulkRequest.setRefresh(isAutoRefresh).execute().actionGet();
if (response.hasFailures()) {
throw new Exception(
"deleteByFilter failure:" + response.buildFailureMessage());
}
}
}catch(Exception ex) {
throw ex;
}
}
/**
* delete documents By Query DSL
*
* @param index
* @param type
* @param dsl
* @throws Exception
*/
public void deleteByQueryDsl(String index, String type, String dsl) throws Exception {
String scrollId = "";
List<String> lstId;
try {
lstId = new ArrayList<String>();
SearchResponse scrollResp = this.esClient.prepareSearch(index).setTypes(type).setExtraSource(dsl).setFrom(0)
.setSize(2000).setScroll(new TimeValue(60000)).execute().actionGet();
scrollId = scrollResp.getScrollId();
do {
for (SearchHit searchHit : scrollResp.getHits().getHits()) {
lstId.add(searchHit.getId());
}
scrollResp = this.esClient.prepareSearchScroll(scrollId).setScroll(new TimeValue(60000)).execute()
.actionGet();
} while (scrollResp.getHits().getHits().length > 0);
this.bulkProcessDelete(index, type, lstId);
} catch (Exception ex) {
throw ex;
} finally {
this.clearESScrollId(scrollId);
}
}
补充
我们在执行ES操作的时候,为了避免一次操作的数据量过大,通常会设置size;因此,我们用到了scrollId;在方法finally中,需要将scrollId清除。
例如:
public void clearESScrollId(String scrollId) {
if (StringUtils.isNoneEmpty(scrollId)) {
ClearScrollRequest clearScrollerRequest = new ClearScrollRequest();
clearScrollerRequest.addScrollId(scrollId);
this.esClient.clearScroll(clearScrollerRequest).actionGet();
}
}
网友评论