说明
- 命令行flag高于配置文件
- 支持环境变量PTAH
node.name: ${NODENAME}
文件分类
-
pipeline configuration files
定义logstash的pipeline处理stages config-examples
-
setting files
定义安装logstash的参数设定 logstash6.3-settings-file
setting files
监控设置
监控工作通过组件data目录下UUID来保证唯一,每个组件node负责上报到es。
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.url: ["http://es-prod-node-1:9200", "http://es-prod-node-2:9200"]
xpack.monitoring.elasticsearch.username: "logstash_system"
xpack.monitoring.elasticsearch.password: "changeme"
xpack.monitoring.elasticsearch.sniffing: false
性能调优
# 并发执行fileter和output阶段的pipeline数量,提高cpu使用率
pipeline.workers: 8
# 每个worker线程从inputs读取的最大events数,默认125,越大越有效但内存将增加,可配合调整jvm.options
pipeline.batch.size: 3000
# 发送下一批event之前,前一批中每个event等待的时间。默认50ms
pipeline.batch.delay: 50
pipeline.output.workers: 8
常用参数
# 周期性检测配置文件,改变将触发SIGHUP信号reload,默认false
config.reload.automatic: true
# 日志界别 trace、debug、info、warn、error、fatal,默认info
log.level: info
# 日志格式 json or plain, 默认plain
log.format: plain
pipeline configuration files
设定文件位置
# vim logstash-6.3.1/config/pipelines.yml
- pipeline.id: to_es
path.config: "logstash-6.3.1/config/pipeline"
# 事件event的缓存 memory or persisted, 默认memory
queue.type: memory
配置input-filter-output
input {
# 标准输入
stdin { }
# beat
beats {
port => 5044
}
# 文件
file {
# 递归 /var/log/**/*.log
path => ["/var/log/*","/var/log/**/*.log"]
exclude => ["*.gz"]
}
# kafka
kafka {
bootstrap_servers => "xxx:9092"
topics => ["__TOPIC__"]
group_id => "to_es"
consumer_threads => 8
# 调优参数
# 与kafka的心跳时间,时间必须低于session.timeout.ms,一般是其1/3
# 调低可以控制正常的rebalances时间,默认不设置
heartbeat_interval_ms =>
# 超时后,如果poll_timeout_ms没触发将引起comsumer的rebalance,默认不设置
session_timeout_ms =>
# 单个poll池的最大records数,默认不设置
max_poll_records =>
# 触发下个poll是最大延迟时间,值必须大于request_timeout_ms,默认不设置
max_poll_interval_ms =>
# 连接kafka的最大响应时间,默认不设置
request_timeout_ms =>
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
mutate {
copy => { "source" => "sourcetmp" }
copy => { "[host][ip][0]" => "[host][ip]" }
split => ["sourcetmp","/"]
add_field => {
"[fields][app]" => "%{[sourcetmp][3]}"
}
remove_field => [ "[beat][hostname]", "prospector", "timestamp1", "offset" ]
}
}
output {
# 标准输出
stdout { codec => rubydebug }
# 输出到es
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "%{[fields][project]}-%{[fields][app]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "xxx"
}
}
网友评论