一、logstash概述
1.1 logstash简介
Logstash 项目诞生于 2009 年 8 月 2 日。其作者是世界著名的运维工程师乔丹西塞(JordanSissel)。
2013 年,Logstash 被 Elasticsearch 公司收购,ELK Stack 正式成为官方用语(随着 beats 的加入改名为 Elastic Stack)。Elasticsearch 本身 也是近两年最受关注的大数据项目之一,三次融资已经超过一亿美元。在 Elasticsearch 开发人员的共同努力下,Logstash 的发布机制,插件架构也愈发科学和合理。
Logstash 是一个具有实时渠道能力的数据收集引擎。使用 JRuby 语言编写。logstash就像是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让根据需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足各种应用场景。
Logstash常用于日志关系系统中做日志采集设备,最常用于ELK(elasticsearch + logstash + kibane)中作为日志收集器使用;
1.2 logstash优势
主要有两个优势,一个是在支持各类插件的前提下提供统一的管道进行日志处理(就是 input-filter-output 这一套);二个是灵活且性能不错。
Logstash收集日志基本流程:
input-->fillter-->output
input:从哪里收集日志
filter:对日志进行过滤
output:输出哪里
1.3 logstash功能
主要是用来日志的搜集、分析、过滤日志的工具。用于管理日志和事件的工具,你可以用它去收集日志、转换日志、解析日志并将他们作为数据提供给其它模块调用,例如搜索、存储等。
1.4 logstash架构
Logstash 是由 JRuby 编写的,使用基于消息的简单架构,在 JVM 上运行。理念非常简单,如果说 MapReduce 框架分为 Mapper 和 Reducer 两大模块,那么 Logstash 有:
Collect: 数据输入。对应 input
Enrich: 数据处理。对应 filter
Transport: 数据输出。对应 output
Logstash的事件(logstash将数据流中等每一条数据称之为一个event)处理流水线有三个主要角色完成。
虽然模块仅仅比 MapReduce 框架多了一个,但是无三不成几,通过不同的拓扑结构,可以完成各类数据处理应用。不过这里我们主要还是以日志汇总处理系统的思路来进行介绍,一个典型的架构为:inpust:必须,负责产生事件(Inputs generate events),常用:File、syslog、redis、beats(如:Filebeats)
filters:可选,负责数据处理与转换(filters modify them),常用:grok、mutate、drop、clone、geoip
outpus:必须,负责数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、statsd

1.5 logstash基础了解
1.5.1 标准输出输入
input {
stdin {} #标准输入(屏幕输入)
}
output {
stdout { #标准输出(输出屏幕)
codec => rubydebug
}
}

1.5.2 logstash的file输入文件
input {
file {
path => ["/var/log/*.log","/var/log/message"]
type => "system"
start_position => "beginning"
}
}
参数
• discover_interval
logstash每隔多久去检查一次被监听的目录下是否有新文件,默认是15秒
• exclude
可以排除不想监听的文件
• sincedb_write_interval
logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
• stat_interval
logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
• start_position
logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,有点类似 cat,但是读到最后一行不会终止,而是继续变成 tail -F。
start_position 仅在该文件从未被监听过的时候起作用。如果 sincedb 文件中已经有这个文件的 inode 记录了,那么 logstash 依然会从记录过的 pos 开始读取数据。所以重复测试的时候每回需要删除 sincedb 文件。
1.5.3 logstash读取网络数据
input {
tcp {
port => 8888
}
}
output {
stdout {
codec => rubydebug
}
}
配合 nc 127.0.0.1 8888< a.txt
{
"host" => "localhost",
"@version" => "1",
"message" => "haha",
"port" => 52405,
"@timestamp" => 2020-02-24-T01:25:32.158Z
}
根据这个特性,可以通过配置filebeat传的数据
1.6 logstash过滤器
1.6.1 grok正则匹配
grok模式的语法如下:
%{SYNTAX:SEMANTIC}
SYNTAX: 代表匹配值的类型。例如3.44可以用NUMBER类型所匹配,127.0.0.1可以 使用IP类型匹配
SEMANTIC:代表存储该值的一个变量名称。例如 3.44 可能是一个事件的持续时间,127.0.0.1可能是请求的client地址。
所以这两个值可以用 %{NUMBER:duration} %{IP:client} 来匹配。
begin 123.456 end
%{WORD} %{NUMBER:request_time:float} %{WORD}
1.6.2 mutate数据修改及处理
---mutate数据修改---
filters/mutate插件是 Logstash另一个重要插件。
它提供了丰富的基础类型数据处理能力。
包括类型转换,字符串处理和字段处理等。
类型转换
可以设置的转换类型包括:"integer","float" 和 "string"。
示例如下:
filter {
mutate {
convert => ["request_time", "float"]
}
}
---mutate数据处理---
1.字符串处理
---split
filter {
mutate {
split => ["message", "|"]
}
}
---join
对数组类型字段有效
filter {
mutate {
split => ["message", "|"]
}
mutate {
join => ["message", ","]
}
}
2.字段处理
-rename
重命名某个字段,如果目的字段已经存在,会被覆盖掉:
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
-update
更新某个字段的内容。如果字段不存在,不会新建。
-replace
作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。
1.6.3 GeoIP地址查询
GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。
filter {
geoip {
source => “message”
}
}
GeoIP 库数据较多,如果不需要这么多内容,可以通过 fields 选项指定自己所需要的。
filter {
geoip {
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
}
}
1.6.4 useragent匹配归类
以获取浏览器版本、型号以及系统版本
filter {
useragent {
target => "ua"
source => "useragent"
}
}
1.6.5 split拆分事件
split 拆分事件,可以把一行数据,拆分成多个事件
filter {
split {
field => "message"
terminator => "#"
}
}
网友评论