本文集主要是总结自己在项目中使用ES 的经验教训,包括各种实战和调优。
主要借助了es的scroll完成对要更新数据的备份,然后利用bulk完成对数据的批量更新。
自己写的代码(参考官网,bulk的写法和官网的不太一样):
/**
* 批量更新elasticsearch数据,目前仅支持更新int字段或将某一field字段的值赋给另外的字段。
*
* @param index
* @param type
* @param size bulk批量写入一次写入的数据
* @param routing 没有设置routing,为null或""就行
* @param query
* @param updateField 需要更新的字段
* @param updateValue 更新值,可以为int或mapping中的一个字段。
* @return
*/
private String searchElasticsearchDataByScroll(String index, String type, int size, String routing, QueryBuilder query, String updateField, String updateValue) {
Map<String, Object> hitMap;
SearchResponse response = client.prepareSearch(index)
.setTypes(type)
.setQuery(query)
.setScroll(new TimeValue(6000))
.setSize(size).execute().actionGet();
if (size < 500) {
log.warn("bulk size is small,you'd better set size near 1000 ,size:" + size);
}
BulkRequestBuilder bulkRequest = client.prepareBulk();
do {
for (SearchHit searchHit : response.getHits().getHits()) {
hitMap = searchHit.getSource();
if (!hitMap.containsKey(updateField)) {
log.error("updateField is not exist,updateField:" + updateField);
break;
}
if (hitMap.containsKey(updateValue)) {
hitMap.put(updateField, hitMap.get(updateValue));
} else {
try {
hitMap.put(updateField, Integer.parseInt(updateValue));//默认如果在hit中找不到要更新的值,则将updateValue当做int来进行更新
} catch (NumberFormatException e) {
log.error("can not solve updateValue。", e);
break;
}
}
if (routing.equalsIgnoreCase("") || routing == null) {
bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setDoc(hitMap));
} else {
if(hitMap.get(routing)==null){
log.error("error:can not find routing,routing:"+routing);
}
bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setRouting(hitMap.get(routing).toString()).setDoc(hitMap));
}
bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setRouting(hitMap.get(routing).toString()).setDoc(hitMap));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
log.warn("bulk updateElasticsearch has fail, reason is" + bulkResponse.buildFailureMessage());
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
} while (response.getHits().getHits().length != 0);
boolean clearScroll = clearScroll(response.getScrollId());
if (!clearScroll){
log.warn("clear scroll id false,scrollId:"+response.getScrollId());
}
return "";
}
关于数据批量导入导出的参考链接:
http://blog.csdn.net/u010585120/article/details/48028255
关于es数据的批量导出和导入,可以作为参考,代码不一定是完全正确的。
https://my.oschina.net/chiyong/blog/552622
可以作为代码的参考,但是存在一定的问题,按照官网的示例可以实现。 关于es的批量导入导出。
关于scroll的使用的一些注意事项:
官网文档:
按照docid进行排序的scroll性能会更好。
Scroll requests have optimizations that make them faster when the sort order is _doc. If you want to iterate over all documents regardless of the order, this is the most efficient option:
Sliced Scroll:
For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which can be consumed independently。
使用分片scroll的数量不能大于集群的分片数。官网上也介绍了根据uid hash的算法,当然也可以自己选择进行hash的字段,但是有一定的要求。使用sliced scroll可以加快scroll的处理速度。
网友评论