导入elasticsearch方法总结

作者: black3y | 来源:发表于2016-11-10 18:11 被阅读2350次
    1. 所需驱动和工具
    1. 安装mssql和mysql的jdbc驱动请参考上一篇:elasticsearch环境搭建过程
    1. 如果在windows下使用导elasticsearch的命令:推荐使用cygwin
    2. 用kopf插件查看导数据进度,若导入中断或错误,也可使用kopf删除该索引,重新导入

    2. 导elasticsearch参数对应名称
    elasticsearch对应参数:
    elasticsearch服务器IP:192.168.1.1
    索引名称:indexname       #可以将其看成关系型数据库中的数据库名称
    类型名称:typename        #可以将其看成关系型数据库中的表名称
    映射:mapping            #可以将其看成关系型数据库中的表结构,默认分词,可进行设置
    elasticsearch端口号:9200
    who_jdbc_river: 每次导入数据,此处不能相同
    
    关系型数据库对应参数:
      服务器IP:192.168.1.2
      用户:此处分别以 mssql的sa 和 mysql的root举例
      数据库名称:dbname
      表名称/视图名:tbname
      密码:dbpasswd
    

    3. 根据需求创建映射

    视图(mapping)相当于关系型数据库的表结构,在执行之后导入命令前最好先创建索引和mapping,设置好数据类型和是否分词等关键参数,以免以后调用数据或者使用数据时陷入困境,最后不得不重新导入。

    创建索引indexname:
    curl -XPUT 192.168.1.1:9200/indexname -d '{}'
    
    创建映射
    curl -XPUT 192.168.1.1:9200/indexname/typename/_mapping -d '{
      "typename" : {
        "properties" : {
          "domain" : {                #数据字段名称
            "type" : "string",        #指定类型
            "index": "not_analyzed"   #指定不分词
          },
          "record_type" : {
            "type" : "string",
            "index": "not_analyzed"
          },
          "record_value" : {
            "type" : "string",
            "index": "not_analyzed"
          }
        }
      }
    }'
    

    4. 从mysql数据库导入到elasticsearch
    curl -XPUT '192.168.1.1:9200/_river/who_jdbc_river/_meta' -d '{
        "type" : "jdbc",
        "shedule" : null,
        "jdbc" : {
            "url" : "jdbc:mysql://192.168.1.2:3306/dbname",
            "user" : "root",
            "password" : "dbpasswd",
            "sql" : "select * from tbname",
            "index" : "indexname",
            "type" : "typename"
        }
    }'
    

    5. 从mssql数据库导入到elasticsearch
    curl -XPUT '192.168.1.1:9200/_river/who_jdbc_river/_meta' -d '
    {
        "type" : "jdbc",
        "jdbc": {
            "url":"jdbc:sqlserver://192.168.1.2:1433;databaseName=dbname",
            "user":"sa",
            "password":"dbpasswd",
            "sql":"select * from dbo.tbname",
            "index" : "indexname",
            "type" : "typename"
        }
    }'
    

    6. csv或txt格式数据导入到elasticsearch

    注意事项:

    1. 如果文件中的第一行未设置字段名称可用下方命令行添加,或者在导入命令中修改指定参数添加
    sed -i '1 s/^/username,email,password\n/' user.txt   #为user.txt首行添加字段名username,email,password
    
    1. 在导入过程中,要导入的csv文件的文件名 如user.csv 会变成user.csv.processing,导入成功后user.csv.processing.imported
      如果导入过程中出现错误中途断开,在重新导数据前记得先将文件名user.csv.processing.imported 改成user.csv 否则会提示找不到文件
    curl -XPUT 192.168.1.1:9200/_river/who_jdbc_river/_meta -d '
    {
        "type" : "csv",                          #指定文件类型
        "csv_file" : {
            "folder" : "//home//black3y//",      #要导入的文件的文件路径,注意注释斜杠,window下:D://isc//b
            "filename_pattern" : "user.csv",     #待导入数据文件名称(txt或csv),支持同类型所有文件(.*\\.csv$)
            "poll":"5m",
            "fields" : [
                "username",
                "email",  
                "password"    
            ],
            "first_line_is_header" : "false",     #true:将第一行作为字段名,false:将fields中的信息作为字段名
            "field_separator" : "\t",             #tab对应\t,逗号直接写,
            "field_id" : "id",
            "field_id_include" : "false",
            "concurrent_requests" : "1",
            "charset" : "UTF-8",
            "script_before_all": "/path/to/before_all.sh",
            "script_after_all": "/path/to/after_all.sh",
            "script_before_file": "/path/to/before_file.sh",
            "script_after_file": "/path/to/after_file.sh"
        },
        "index" : {
            "index" : "indexname",
            "type" : "typename",
            "bulk_size" : 100,
            "bulk_threshold" : 10
        }
    }'
    

    也可使用head插件执行上面的命令

    使用head插件导入csv数据
    7. json格式数据导入elasticsearch

    赠上现成好用的python脚本 ( json2es.py )

    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    
    from itertools import islice
    import json , sys
    from elasticsearch import Elasticsearch , helpers
    import threading
    
    _index = 'indexname'   #修改为索引名
    _type = 'typename'     #修改为类型名
    es_url = 'http://192.168.1.1:9200/'  #修改为elasticsearch服务器
    
    reload(sys)
    sys.setdefaultencoding('utf-8')  
    es = Elasticsearch(es_url)
    #es.indices.create(index='webinfo', ignore=400,body = mapping)
    es.indices.create(index=_index, ignore=400)
    chunk_len = 10
    num = 0
    
    def bulk_es(chunk_data):
        bulks=[]
        try:
            for i in xrange(chunk_len):
                bulks.append({
                        "_index": _index,
                        "_type": _type,
                        "_source": chunk_data[i]
                    })
            helpers.bulk(es, bulks)
        except:
            pass
    
    with open(sys.argv[1]) as f:
        while True:
            lines = list(islice(f, chunk_len))
            num =num +chunk_len
            sys.stdout.write('\r' + 'num:'+'%d' % num)
            sys.stdout.flush()
            bulk_es(lines)
            if not lines:
                print "\n"
                print "task has finished"
                break
    
    使用方法:
    json2es.py使用方法

    相关文章

      网友评论

      • 鱼呀_:使用head插件的话,不能加 下划线 是我工具的问题吗?

      本文标题:导入elasticsearch方法总结

      本文链接:https://www.haomeiwen.com/subject/qssxpttx.html