美文网首页数据库ELK
[9]es数据批量更新及数据导入导出

[9]es数据批量更新及数据导入导出

作者: 不怕天黑_0819 | 来源:发表于2022-03-25 14:19 被阅读0次

    本文集主要是总结自己在项目中使用ES 的经验教训,包括各种实战和调优。


    主要借助了es的scroll完成对要更新数据的备份,然后利用bulk完成对数据的批量更新。

    自己写的代码(参考官网,bulk的写法和官网的不太一样):

    
    /**
    
    * 批量更新elasticsearch数据,目前仅支持更新int字段或将某一field字段的值赋给另外的字段。
    
    *
    
    * @param index
    
    * @param type
    
    * @param size bulk批量写入一次写入的数据
    
    * @param routing 没有设置routing,为null或""就行
    
    * @param query
    
    * @param updateField 需要更新的字段
    
    * @param updateValue 更新值,可以为int或mapping中的一个字段。
    
    * @return
    
    */
    
    private String searchElasticsearchDataByScroll(String index, String type, int size, String routing, QueryBuilder query, String updateField, String updateValue) {
    
    Map<String, Object> hitMap;
    
    SearchResponse response = client.prepareSearch(index)
    
    .setTypes(type)
    
    .setQuery(query)
    
    .setScroll(new TimeValue(6000))
    
    .setSize(size).execute().actionGet();
    
    if (size < 500) {
    
    log.warn("bulk size is small,you'd better set size near 1000 ,size:" + size);
    
    }
    
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    
    do {
    
    for (SearchHit searchHit : response.getHits().getHits()) {
    
    hitMap = searchHit.getSource();
    
    if (!hitMap.containsKey(updateField)) {
    
    log.error("updateField is not exist,updateField:" + updateField);
    
    break;
    
    }
    
    if (hitMap.containsKey(updateValue)) {
    
    hitMap.put(updateField, hitMap.get(updateValue));
    
    } else {
    
    try {
    
    hitMap.put(updateField, Integer.parseInt(updateValue));//默认如果在hit中找不到要更新的值,则将updateValue当做int来进行更新
    
    } catch (NumberFormatException e) {
    
    log.error("can not solve updateValue。", e);
    
    break;
    
    }
    
    }
    
    if (routing.equalsIgnoreCase("") || routing == null) {
    
    bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setDoc(hitMap));
    
    } else {
    
    if(hitMap.get(routing)==null){
    
    log.error("error:can not find routing,routing:"+routing);
    
    }
    
    bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setRouting(hitMap.get(routing).toString()).setDoc(hitMap));
    
    }
    
    bulkRequest.add(client.prepareUpdate(index, type, searchHit.getId()).setRouting(hitMap.get(routing).toString()).setDoc(hitMap));
    
    }
    
    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
    
    if (bulkResponse.hasFailures()) {
    
    log.warn("bulk updateElasticsearch has fail, reason is" + bulkResponse.buildFailureMessage());
    
    }
    
    response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
    
    } while (response.getHits().getHits().length != 0);
    
    boolean clearScroll = clearScroll(response.getScrollId());
    
    if (!clearScroll){
    
    log.warn("clear scroll id false,scrollId:"+response.getScrollId());
    
    }
    
    return "";
    
    }
    

    关于数据批量导入导出的参考链接:
    http://blog.csdn.net/u010585120/article/details/48028255

    关于es数据的批量导出和导入,可以作为参考,代码不一定是完全正确的。
    https://my.oschina.net/chiyong/blog/552622

    可以作为代码的参考,但是存在一定的问题,按照官网的示例可以实现。 关于es的批量导入导出。


    关于scroll的使用的一些注意事项:

    官网文档:

    https://www.elastic.co/guide/en/elasticsearch/reference/5.3/search-request-scroll.html#scroll-search-context

    按照docid进行排序的scroll性能会更好。

    Scroll requests have optimizations that make them faster when the sort order is _doc. If you want to iterate over all documents regardless of the order, this is the most efficient option:

    Sliced Scroll:

    For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which can be consumed independently。

    使用分片scroll的数量不能大于集群的分片数。官网上也介绍了根据uid hash的算法,当然也可以自己选择进行hash的字段,但是有一定的要求。使用sliced scroll可以加快scroll的处理速度。

    相关文章

      网友评论

        本文标题:[9]es数据批量更新及数据导入导出

        本文链接:https://www.haomeiwen.com/subject/teiirktx.html