美文网首页Elasticsearch
golang解析csv并导入elastic search

golang解析csv并导入elastic search

作者: bigtom | 来源:发表于2017-02-25 16:35 被阅读243次

    今天我们一起来写一个从csv文件将数据导入elastic search的小程序

    准备工作

    在gopath的src文件夹下创建csv2es文件夹,并创建main.go文件。下载一些csv文件备用

    解析命令行参数

    首先我们需要使用flag package解析命令行参数,代码如下

    func main() {
      // 解析命令行输入
      host := flag.String("host", "http://localhost:9200", "host, e.g. http://localhost:9200")
      file := flag.String("file", "", "file path")
      esIndex := flag.String("index", "", "elastic search index")
      esType := flag.String("type", "", "elastic search type")
      flag.Parse()
      if *file == "" {
        fmt.Println("please set which csv file you want to import clearly")
        return
      }
      if *esIndex == "" {
        fmt.Println("please set elastic search index")
        return
      }
      if *esType == "" {
        fmt.Println("please set elastic search type")
        return
      }
    ...
    }
    

    主要解析的参数有es的地址,待导入的文件的路径,导入到的es的index和type。运行go build,生成可执行文件csv2go,执行

    csv2go -h
    -file string
            file path (default "")
      -host string
            host, e.g. http://localhost:9200 (default "http://localhost:9200")
      -index string
            elastic search index (default "")
      -type string
            elastic search type (default "")
    

    连接es

    elastic这个开源项目可以帮助我们连接elastic

      // 连接es
      ctx := context.Background()
      client, err := elastic.NewClient(
        elastic.SetURL(*host),
        elastic.SetSniff(false))
      if err != nil {
        panic(err)
      }
    
      // 检查index是否存在,如果不存在则创建index
      exists, err := client.IndexExists(*esIndex).Do(ctx)
      if err != nil {
        panic(err)
      }
      if !exists {
        createIndex, err := client.CreateIndex(*esIndex).Do(ctx)
        if err != nil {
          panic(err)
        }
      }
    

    解析csv并导入(index)到elastic search

    这里需要注意几点。第一,Mac上会存在\r结尾的文件的问题,所以我们使用macreader这个包对io.Reader包了一层,有兴趣的同学可以看我之前的文章《mac上的文件有毒》,第二,我们默认csv文件的第一行为column name,后面各行都是合法的记录。

      // 解析csv
      f, _ := os.Open(*file)
      r := csv.NewReader(macreader.New(bufio.NewReader(f)))
      keys, err := r.Read()
      for {                                            //1
        record, err := r.Read()
        if err == io.EOF {
          break
        }
        m := make(map[string]string)
        for i, key := range keys {
          m[key] = record[i]
        }
        jsonStr, err := json.Marshal(m)
        if err != nil {
          panic(err)
        }
        put1, err := client.Index().
                Index(*esIndex).
                Type(*esType).
                BodyString(string(jsonStr)).
                Do(ctx)
        if err != nil {
          // Handle error
          panic(err)
        }
        fmt.Printf("Indexed tweet %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
      }                                           //2
    

    ok,一个基本的将csv中的数据导入elastic search的程序完成了,我们来测试一下性能吧。在上面的(1)行代码前面加上

    start := time.Now().Unix()
    

    在上面的(2)行代码后面加上

    end := time.Now().Unix()
    fmt.Println(end, start)
    

    测试了一下跑了一个61567条记录的文件,一共跑了36分钟。够我睡一个午觉了...

    提升效率

    elastic search有一个bulk api,可以将一些操作合并起来,同时传递给elastic search处理并返回

      // 新建一个Bulk
      bulkRequest := client.Bulk()
      for {
        ...
        // 为每一条记录生成一个IndexRequest并加入Bulk
        req := elastic.NewBulkIndexRequest().Index(*esIndex).Type(*esType).Doc(string(jsonStr))
        bulkRequest.Add(req)
      }
      // 一次性完成请求
      bulkResponse, err := bulkRequest.Do(ctx)
      if err != nil {
      }
      indexed := bulkResponse.Indexed()
      fmt.Println("向es导入了",len(indexed),"条数据")
    

    优化过后,插入相同的6万多条记录只需要几秒钟。cool _

    本文的代码已经开源在github,欢迎使用或者提出意见。

    相关文章

      网友评论

        本文标题:golang解析csv并导入elastic search

        本文链接:https://www.haomeiwen.com/subject/fkagwttx.html