安装配置Logstash
- 下载Logstash,解压即可使用
- 检查JDK版本
- logstash数据传输的逻辑结构
图片来自官方
- 检查Logstash是否可用
# 进入文件目录
cd /home/test/dev/logstash-6.2.4
# 测试stdin输入是否能输出到stdout
bin/logstash -e 'input { stdin { } } output { stdout {} }'
等待返回如下类似内容:
$ bin/logstash -e 'input { stdin { } } output { stdout {} }'
Sending Logstash's logs to /home/test/dev/logstash-6.2.4/logs which is now configured via log4j2.properties
[2018-07-05T18:50:53,944][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/test/dev/logstash-6.2.4/modules/fb_apache/configuration"}
[2018-07-05T18:50:53,966][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/test/dev/logstash-6.2.4/modules/netflow/configuration"}
[2018-07-05T18:50:54,724][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-07-05T18:50:55,545][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-07-05T18:50:56,171][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-05T18:51:00,195][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-07-05T18:51:00,622][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4c49822d sleep>"}
The stdin plugin is now waiting for input:
[2018-07-05T18:51:00,756][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
键入hellologstash:
hellologstash
{
"@version" => "1",
"@timestamp" => 2018-07-05T09:51:55.280Z,
"host" => "host01",
"message" => "hellologstash"
}
返回以上内容表示logstash正常工作。
Ctrl+D
退出进程。
准备数据文件
- user.csv
$ cat user.csv
"1","李","俊明","0","88","1"
"2","高","励","1","54","2"
"3","郑","景诚","0","55","3"
"4","杨","景诚","1","98","4"
"5","阮","俊依","0","41","5"
"6","江","明","0","53","6"
"7","赵","十莉","1","87","7"
"8","彭","十莉","0","28","8"
"9","王","恬","0","54","9"
"10","林","恬","1","63","10"
将csv后缀改为txt(csv不知道为什么一直无响应)
配置logstash输入输出设置
类似前面的'input { stdin { } } output { stdout {} }'
,这次因为比较长,将配置写在文件中:
- 在logstash下的config目录下创建文件csv.conf
input{
file{
path => ["/tmp/data/*.txt"]
start_position =>"beginning"
# 5s发现一次
discover_interval => 5
# 最后修改时间15s内的才会扫描,其他的忽略
ignore_older => 15
close_older => 1
# 最大打开文件数
max_open_files => 32
# 检查文件状态的间隔时间,默认1s
stat_interval => 5
}
}
filter{
csv{
separator => ","
columns => ["id","first_name","last_name","gender","score","no"]
}
}
output{
# stdout{}
elasticsearch{
index => "user"
# document_id => "%{id}" 让es自动生成id
hosts => ["127.0.0.1:9200"]
}
}
自动生成文件模拟
使用python脚本自动以10s的间隔cp文件到logstash扫描的文件夹中去
- autoGen.py
#!/usr/bin/python
import time,commands
if __name__ == '__main__':
index = 0
datafile = "/tmp/user.txt"
dst = "/tmp/data/{file}.txt"
while 1:
index+=1
command = "cp {src} {dst}".format(src=datafile,dst=dst.format(file="data"+str(index)))
print(command)
commands.getoutput(command)
time.sleep(10)
pass
创建索引
PUT user_dev
配置映射
PUT user_dev/_mapping/doc
{
"properties":{
"id":{
"type":"integer"
},
"first_name":{
"type":"text",
"fields": {
"keyword":{
"type": "keyword",
"ignore_above": 256
}
}
},
"last_name":{
"type":"text",
"fields": {
"keyword":{
"type": "keyword",
"ignore_above": 256
}
}
},
"gender":{
"type":"short"
},
"score":{
"type":"float"
},
"no":{
"type":"integer"
}
}
}
# 查看映射
GET user_dev/_mapping
# 设置别名
PUT user_dev/_alias/user
# 查看别名
GET user_dev/_alias
启动
- 确保elasticsearch已启动
$ ps -ef | grep elsatic
- 启动logstash
$ bin/logstash -f config/csv.conf
- 启动python脚本
./autoGen.py
查看结果
$ curl "http://localhost:9200/user/_search?pretty&_source=false&size=0"
{
"took" : 6,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 180290,
"max_score" : 0.0,
"hits" : [ ]
}
}
网友评论