美文网首页
0x05.Logstash导入CSV数据

0x05.Logstash导入CSV数据

作者: 0x70e8 | 来源:发表于2018-07-05 18:05 被阅读0次

    安装配置Logstash

    • 下载Logstash,解压即可使用
    • 检查JDK版本
    • logstash数据传输的逻辑结构

    图片来自官方
    • 检查Logstash是否可用
    # 进入文件目录
    cd /home/test/dev/logstash-6.2.4
    # 测试stdin输入是否能输出到stdout
    bin/logstash -e 'input { stdin { } } output { stdout {} }'
    

    等待返回如下类似内容:

    $ bin/logstash -e 'input { stdin { } } output { stdout {} }'
    Sending Logstash's logs to /home/test/dev/logstash-6.2.4/logs which is now configured via log4j2.properties
    [2018-07-05T18:50:53,944][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/test/dev/logstash-6.2.4/modules/fb_apache/configuration"}
    [2018-07-05T18:50:53,966][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/test/dev/logstash-6.2.4/modules/netflow/configuration"}
    [2018-07-05T18:50:54,724][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2018-07-05T18:50:55,545][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.4"}
    [2018-07-05T18:50:56,171][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    [2018-07-05T18:51:00,195][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
    [2018-07-05T18:51:00,622][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4c49822d sleep>"}
    The stdin plugin is now waiting for input:
    [2018-07-05T18:51:00,756][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
    

    键入hellologstash:

    hellologstash
    {
          "@version" => "1",
        "@timestamp" => 2018-07-05T09:51:55.280Z,
              "host" => "host01",
           "message" => "hellologstash"
    }
    

    返回以上内容表示logstash正常工作。
    Ctrl+D退出进程。

    准备数据文件

    • user.csv
    $ cat user.csv
    "1","李","俊明","0","88","1"
    "2","高","励","1","54","2"
    "3","郑","景诚","0","55","3"
    "4","杨","景诚","1","98","4"
    "5","阮","俊依","0","41","5"
    "6","江","明","0","53","6"
    "7","赵","十莉","1","87","7"
    "8","彭","十莉","0","28","8"
    "9","王","恬","0","54","9"
    "10","林","恬","1","63","10"
    

    将csv后缀改为txt(csv不知道为什么一直无响应)

    配置logstash输入输出设置

    类似前面的'input { stdin { } } output { stdout {} }',这次因为比较长,将配置写在文件中:

    • 在logstash下的config目录下创建文件csv.conf
    input{
        file{
            path => ["/tmp/data/*.txt"]
            start_position =>"beginning"
            # 5s发现一次
            discover_interval => 5
            # 最后修改时间15s内的才会扫描,其他的忽略
            ignore_older => 15
            close_older => 1
            # 最大打开文件数
            max_open_files => 32
             # 检查文件状态的间隔时间,默认1s
            stat_interval => 5
            }
    }
    filter{
        csv{
            separator => ","
            columns => ["id","first_name","last_name","gender","score","no"]
            }
    }
    output{
        # stdout{}
        elasticsearch{
            index => "user"
            # document_id => "%{id}" 让es自动生成id
            hosts => ["127.0.0.1:9200"]
        }
    }
    

    自动生成文件模拟

    使用python脚本自动以10s的间隔cp文件到logstash扫描的文件夹中去

    • autoGen.py
    #!/usr/bin/python
    import time,commands
    if __name__ == '__main__':
        index = 0
        datafile = "/tmp/user.txt"
        dst = "/tmp/data/{file}.txt"
        while 1:
            index+=1
            command = "cp {src} {dst}".format(src=datafile,dst=dst.format(file="data"+str(index)))
            print(command)
            commands.getoutput(command)
            time.sleep(10)
            pass
    

    创建索引

    PUT user_dev

    配置映射

    PUT user_dev/_mapping/doc
    {
      "properties":{
        "id":{
          "type":"integer"
        },
        "first_name":{
          "type":"text",
          "fields": {
              "keyword":{
                "type": "keyword",
                "ignore_above": 256
            }
          }
        },
        "last_name":{
          "type":"text",
          "fields": {
            "keyword":{
            "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "gender":{
          "type":"short"
        },
        "score":{
        "type":"float"
        },
        "no":{
        "type":"integer"
        }
      }
    }
    # 查看映射
    GET user_dev/_mapping
    # 设置别名
    PUT user_dev/_alias/user
    # 查看别名
    GET user_dev/_alias
    

    启动

    • 确保elasticsearch已启动$ ps -ef | grep elsatic
    • 启动logstash$ bin/logstash -f config/csv.conf
    • 启动python脚本./autoGen.py

    查看结果

    $ curl "http://localhost:9200/user/_search?pretty&_source=false&size=0"
    {
      "took" : 6,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 180290,
        "max_score" : 0.0,
        "hits" : [ ]
      }
    }
    

    相关文章

      网友评论

          本文标题:0x05.Logstash导入CSV数据

          本文链接:https://www.haomeiwen.com/subject/pwopuftx.html