美文网首页
索引迁移

索引迁移

作者: YG_9013 | 来源:发表于2017-11-01 20:00 被阅读0次

索引迁移工具esm

下载地址:https://github.com/medcl/esm
经过测试发现使用--copy_setting和--copymappings失败。而只用--copymappings也不起作用。

/bin/esm -s=http://192.168.3.206:9200 -d=http://localhost:9200 --copy_settings --copy_mappings -x=bestbuykaggle  

手动创建索引,设置mapping和setting。数据导入导出没问题。但是速度很慢,可能是我单个文档有点大,大约在3000/s。

reindex测试

ES5之后,推出了一个reindex的功能,可在不同集群间传递数据。详细信息可看https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docs-reindex.html

curl -XPOST 'localhost:9200/_reindex?pretty' -H 'Content-Type: application/json' -d'
{
  "source": {
    "remote": {
      "host": "http://xxx:9200",
    },
    "index": "source",
  },
  "dest": {
    "index": "dest"
  }
}
'

这个也是用手动创建索引,设置mapping和setting。经过测试,数据导入导出没问题。速度在4200/s。系统默认单进程处理,针对于几千万甚至上亿的数据量,这个速度还是慢。后来打算用reindex+slice这种方式通过多进程操作。但是Reindexing from remote clusters does not support manual or automatic slicing.即从其他集群导数据,不支持人工和手动切片。。这条路行不通。

手动scroll+slice+bulk

利用scroll和slice,并行从原集群中读数据,然后并行地通过Bulk写入目标集群。

package com.dump.core;

import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

public class DumpIndex {

    public static String index="";
    public static String dest_cluster_name="";
    public static String source_cluster_name="";
    public static String dest_ips[]=null;
    public static String source_ips[]=null;
    public static String type="";
    public static TransportClient source_client = null;
    public static TransportClient dest_client = null;
    public static ExecutorService executor;
    public static int threadSize;

    public static void setConfES() throws Exception {
        Settings settings = Settings.builder().put("cluster.name", source_cluster_name).build();
        int port = 9300;
        source_client = new PreBuiltTransportClient(settings);

        for (int i=0;i<source_ips.length;i++){
            source_client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(source_ips[i]), port));
        }

        settings = Settings.builder().put("cluster.name", dest_cluster_name).build();
        dest_client = new PreBuiltTransportClient(settings);
        for (int i=0;i<dest_ips.length;i++){
            dest_client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(dest_ips[i]), port));
        }
    }

    public static BulkProcessor getBulker(final int id){
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                dest_client,
                new BulkProcessor.Listener() {
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        if (!response.hasFailures()) {
                            System.out.println("线程 "+id+" 索引过程了"+response.getItems().length+"个文档");
                        } else {
                            System.out.println("线程 "+id+" 索引过程中遇到了一些失败");
                        }
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        failure.printStackTrace();
                    }
                })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(0)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
                .build();
        return bulkProcessor;
    }

    public static void parallel(){
        for(int i=0;i<threadSize;i++){
            QueryBuilder qb = matchAllQuery();
            String vals="";
            SliceBuilder sliceBuilder = new SliceBuilder(i, threadSize);
            SearchResponse scrollResp = source_client.prepareSearch(index)
                    .setScroll(new TimeValue(60000))
                    .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
                    .setQuery(qb)
                    .slice(sliceBuilder)
                    .setSize(5000).get();
            BulkProcessor bulkProcessor = getBulker(i);
            SliceCustomer customer =new SliceCustomer(scrollResp,bulkProcessor,source_client,index,type);
            executor.execute(customer);
        }
        executor.shutdown();
    }
    public static void main(String []args) throws Exception {
        try {
            source_cluster_name= args[0];
            System.out.println("source_cluster_name is "+ source_cluster_name);
            source_ips = args[1].split(",");
            System.out.println("source_ips is "+args[1]);
            dest_cluster_name = args[2];
            System.out.println("dest_cluster_name is " + dest_cluster_name);
            dest_ips = args[3].split(",");
            System.out.println("dest_ips is " + args[3]);
            index = args[4];
            System.out.println("index is " + index);
            type = index.split("-")[0];
            System.out.println("type is " + type);
            threadSize = Integer.parseInt(args[5]);
            System.out.println("threadSize is " + threadSize);
            setConfES();
        } catch (Exception e) {
            System.out.println("参数个数错误,参数依次是source_cluster_name,source_ips,dest_cluster_name,dest_ips,threadSize");
            return;
        }
        executor = Executors.newFixedThreadPool(threadSize);
        setConfES();
        parallel();
    }
}

class SliceCustomer implements Runnable{
    public SearchResponse scrollResp;
    public BulkProcessor bulkProcessor;
    public TransportClient source_client;
    public String index;
    public String type;

    public SliceCustomer(SearchResponse scrollResp, BulkProcessor bulkProcessor, TransportClient source_client, String index, String type) {
        this.scrollResp = scrollResp;
        this.bulkProcessor = bulkProcessor;
        this.source_client = source_client;
        this.index = index;
        this.type = type;
    }

    @Override
    public void run() {
        String vals ="";
        do {
            for (SearchHit hit : scrollResp.getHits().getHits()) {
                vals=hit.sourceAsString();
                bulkProcessor.add(new IndexRequest(index,type).source(vals));
            }
            scrollResp = source_client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
        } while(scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

        bulkProcessor.flush();
        try {
            bulkProcessor.awaitClose(100, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

对_doc进行排序(addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)),ES底层优化,更快速地获取到数据,然后并行地通过bulk写入目标ES。
但是,但是又有问题出现了。scroll+slice会导致节点内存占用过高:

If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals to N bits per slice where N is the total number of documents in the shard. After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of sliced query you perform in parallel to avoid the memory explosion.

可以通过在slice中指定一个doc_values来解决这个问题。更详细的信息请看https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-request-scroll.html#sliced-scroll
经测试,设置4个线程的情况下,除去ES启动时间,索引速度在8000/s左右。对多个数亿的文档来说,依然满足不了要求。

终极方案

通过hive重新导数据。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

复制原索引的setting和mapping脚本

import os
import json
import re
import sys
import threading

def get_json(cmd):
    val = os.popen(cmd).read()
    result = json.loads(val)
    return result

def set_mappings(index,source,dest):
    cmd = "curl %s/%s/_mapping" % (source,index)
    print cmd
    result = get_json(cmd)
    mappings = json.dumps(result[index]["mappings"])
    cmd = "curl %s/%s/_settings" % (source,index)
    print cmd
    result = get_json(cmd)
    print result
    settings = result[index]["settings"]["index"]
    pri_num = settings["number_of_shards"]
    rep_num = settings["number_of_replicas"]
    codec = ''
    routing =''
    try:
        codec = settings["codec"]
    except:
        codec = ''
    try:
        routing = json.dumps(settings["routing"])
    except:
        routing =''
    settings = "{\"number_of_shards\":%s,\"number_of_replicas\":%s," % (pri_num,rep_num)
    if len(codec)>0:
        tmp = "\"codec\":\"%s\"," % codec
        settings = settings + tmp 
    if len(routing)>0:
        tmp = "\"routing\":%s," % routing
        settings = settings + tmp
    settings = settings[:-1] +"}"
    
    print mappings
    print "--------------------"
    print settings
    print "--------------------"
    cmd = "curl -XPUT %s/%s -d '{\"settings\":%s,\"mappings\":%s}'" % (dest,index,settings,mappings)
    result = get_json(cmd)
    print cmd
    print "--------------------"
    print result
    if "error" in result.keys():
        return 0
    if "acknowledged" in result.keys():
        if result["acknowledged"] == 'false' or result["shards_acknowledged"] == 'false':
            return 0
    return 1

def run():
    args = sys.argv
    source = args[1]
    dest = args[2]
    index = args[3]
    print "stat to move %s,create mapping..." % index
    set_mappings(index,source,dest)

run()


相关文章

  • 索引迁移

    索引迁移工具esm 下载地址:https://github.com/medcl/esm经过测试发现使用--copy...

  • 索引的迁移

    索引的迁移 API介绍 ES提供了_reindexAPI用来进行索引的迁移。其最简单的用法如下: 可选参数 sou...

  • es相关 Elasticsearch

    数据迁移:复制新索引 创建索引 Mapping 查询数据 查询Mapping 报错:Limit of tota...

  • ES索引迁移

    再对索引的分片数量或者是字段分词器进行调整时,需要对索引进行重建迁移,对应操作API为reindex。 创建索引 ...

  • ES索引和别名

    能否修改index/mapping solution:重建索引,在建新索引时候做字段的迁移,达到修改index/m...

  • ES工作总结

    常用命令 按天建索引 当前已有索引,只是要将它按天分,其实用reindex可以迁移数据,实现现有索引重命名,并把当...

  • ES跨集群索引迁移(文件拷贝)

    1. 说明 以文件拷贝方式跨集群迁移索引,会完整保留源集群上的索引的setting、mapping、aliase等...

  • 羊 | 图卡02 关键概念 快速了解bujo基本结构

    bujo的关键概念包括了7个,快速记录、集子、迁移、索引、未来记录、月度记录和每日记录。 索引、未来记录、月度记录...

  • ElasticSearch——索引及基本操作

    ES的基础概念 接下来我们介绍以下索引的基本操作,创建、更改、迁移、查询配置信息等1、仅创建索引:PUT inde...

  • es迁移索引数据合并

    es集群迁移有两种方式使用 1.先在原来的es集群将迁移当天的索引文件名reindex,然后做快照,然后用快照恢复...

网友评论

      本文标题:索引迁移

      本文链接:https://www.haomeiwen.com/subject/yyrgpxtx.html