概述
Logstash是一款轻量级的、开源的日志收集处理框架,它可以方便的把分散的、多样化的日志搜集起来,并进行自定义过滤分析处理,然后传输到指定的位置,比如某个服务器或者文件。
Logstash的理念很简单,从功能上来讲,它只做三件事情:
- input:数据收集
- filter:数据加工,如过滤,修改等。
- output:数据输出
Logstash实现的功能主要分为接收数据、解析过滤并转换数据、输出数据三个部分,这三个功能对应的插件依次是input插件、filter插件、output插件,其中,filter插件是可选的,其它两个是必须插件。
Logstash的安装
从elastic官网https://www.elastic.co/downloads/logstash 获取logstash安装包,这里下载的版本是logstash-7.10.2.tar.gz。
将下载下来的安装包直接解压到一个路径下即可完成logstash的安装。
这里我将logstash安装到主机(192.168.2.136)上,将logstash程序安装到/usr/local目录下,基本操作过程如下:
tar -zxvf logstash-7.10.2-linux-x86_64.tar.gz -C /usr/local
mv /usr/local/logstash-7.10.2 /usr/local/logstash
配置文件
logstash的配置文件在安装程序下的config子目录下:
- jvm.options是设置JVM内存资源的配置文件
- logstash.yml是logstash全局属性配置文件,一般无需修改,另外还需要自己创建一个logstash事件配置文件。
1. 命令行方式
Logstash提供了一个shell脚本/usr/local/logstash/bin/logstash,可以方便快速的启动一个logstash进程,在Linux命令行下,运行如下命令启动Logstash进程:
cd /usr/local/logstash/
bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
[2021-06-21T06:57:35,776][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
Hello world
{
"@version" => "1",
"message" => "Hello world",
"host" => "centos7_9-mod",
"@timestamp" => 2021-06-21T10:58:55.457Z
}
###2.配置文件启动方式
使用-e参数在命令行中指定配置是很常用的方式,但是如果logstash需要配置更多规则的话,就必须把配置固化到文件里,这就是logstash事件配置文件,如果把上面命令行执行的logstash命令,写到一个配置文件logstash-1.conf中,就变成如下内容:
input {
stdin { }
}
output {
stdout { codec => rubydebug }
}
这就是最简单的Logstash事件配置文件。此时,可以使用logstash的-f参数来读取配置文件,然后启动logstash进程,操作如下:
bin/logstash -f config/logstash-1.conf
通过这种方式也可以启动logstash进程,不过这种方式启动的进程是在前台运行的,要放到后台运行,可通过nohup命令实现,操作如下:
nohup bin/logstash -f config/logstash-1 &
tail -f nohup.out
输入插件
1.基本语法组成
input {
输入插件
}
filter {
过滤匹配插件
}
output {
输出插件
}
Logstash配置文件有如下三部分组成,其中input、output部分是必须配置,filter部分是可选配置。
2.从文件读取数据
input {
file {
path => ["/var/log/secure"]
type => "system"
start_position => "beginning"
}
}
output {
stdout{
codec=>rubydebug
}
}
start_position表示按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,默认情况下,logstash会从文件的结束位置开始读取数据。type用来标记事件类型,通常会在输入区域通过type标记事件类型。
bin/logstash -f config/logstash-file.conf
[2021-06-21T22:16:50,615][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
{
"type" => "system",
"message" => "Jun 21 06:28:22 centos7_9-mod sshd[18648]: Accepted password for root from 192.168.2.233 port 51559 ssh2",
"@timestamp" => 2021-06-22T02:16:50.787Z,
"host" => "centos7_9-mod",
"@version" => "1",
"path" => "/var/log/secure"
}
{
"type" => "system",
"message" => "Jun 21 06:28:22 centos7_9-mod sshd[18648]: pam_unix(sshd:session): session opened for user root by (uid=0)",
"@timestamp" => 2021-06-22T02:16:50.788Z,
"host" => "centos7_9-mod",
"@version" => "1",
"path" => "/var/log/secure"
}
3.从标准输入读取数据
input{
stdin{
add_field=>{"key"=>"ok"}
tags=>["add field"]
type=>"mytype"
}
}
output {
stdout{
codec=>rubydebug
}
}
type和tags是logstash的两个特殊字段, type一般会放在input中标记事件类型, tags主要用于在事件中增加标签,以便在后续的处理流程中使用,主要用于filter或output阶段。
bin/logstash -f config/logstash-stdin.conf
hello
{
"tags" => [
[0] "add field"
],
"@version" => "1",
"key" => "ok",
"message" => "hello",
"@timestamp" => 2021-06-22T02:49:57.721Z,
"host" => "centos7_9-mod",
"type" => "mytype"
}
4.从网络读取TCP数据
input {
tcp {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{SYSLOGLINE}" }
}
}
output {
stdout{
codec=>rubydebug
}
}
5044端口是logstash启动的tcp监听端口。注意这里用到了日志过滤"LogStash::Filters::Grok"功能.
bin/logstash -f config/logstash-net.conf
[2021-06-21T23:04:32,647][INFO ][logstash.inputs.tcp ][main][b59e9fd08e530a78065b475d52a49eae15a65cead44875d4e7226628ab208d0c] Starting tcp input listener {:address=>"0.0.0.0:5044", :ssl_enable=>"false"}
//在node03上执行
yum install nc //没有nc先安装
nc 192.168.2.136 5044 < /var/log/secure
{
"pid" => "9459",
"message" => [
[0] "Jun 21 23:10:37 node03 sshd[9459]: pam_unix(sshd:session): session opened for user root by (uid=0)",
[1] "pam_unix(sshd:session): session opened for user root by (uid=0)"
],
"@timestamp" => 2021-06-22T03:22:24.082Z,
"host" => "192.168.2.193",
"logsource" => "node03",
"program" => "sshd",
"port" => 44478,
"@version" => "1",
"timestamp" => "Jun 21 23:10:37"
}
编码插件
1、codec插件之plain
input{
stdin {
}
}
output{
stdout {
codec => "plain"
}
}
plain是一个空的解析器,它可以让用户自己指定格式,也就是说输入是什么格式,输出就是什么格式。
bin/logstash -f config/logstash-plain.conf
hello
2021-06-22T03:39:09.687Z centos7_9-mod hello
2、codec插件之json
input {
stdin {
}
}
output {
stdout {
codec => json
}
}
如果发送给logstash的数据内容为json格式,可以在input字段加入codec=>json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。
如果想让logstash输出为json格式,可以在output字段加入codec=>json
bin/logstash -f config/logstash-json.conf
hello
{"@version":"1","@timestamp":"2021-06-22T03:43:07.588Z","message":"hello","host":"centos7_9-mod"}
输入是什么,输出是什么:
input {
stdin {
}
}
output {
stdout {
codec => line { format => "%{message}"}
}
}
bin/logstash -f config/logstash-json.conf
hello
hello
过滤器插件(Filter)
1、Grok 正则捕获
grok是一个十分强大的logstash filter插件,它可以通过正则解析任意文本,将非结构化日志数据格式为结构化和方便查询的结构。它是目前logstash 中解析非结构化日志数据最好的方式。
[root@centos7_9-mod patterns]# pwd
/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns
[root@centos7_9-mod patterns]# cat grok-patterns
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
。。。。。。
Grok 的语法规则是:
%{语法: 语义}
这里的“语法”指的就是匹配的模式,例如使用NUMBER模式可以匹配出数字,IP模式则会匹配出127.0.0.1这样的IP地址
input {
stdin {
}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
}
}
output {
stdout{
codec=>rubydebug
}
}
查看执行结果:
bin/logstash -f config/logstash-grok.conf
172.16.213.132 [16/Jun/2020:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
{
"timestamp" => "16/Jun/2020:16:24:19 +0800",
"referrer" => "\"GET / HTTP/1.1\"",
"message" => "172.16.213.132 [16/Jun/2020:16:24:19 +0800] \"GET / HTTP/1.1\" 403 5039",
"@version" => "1",
"host" => "centos7_9-mod",
"@timestamp" => 2021-06-22T08:05:06.358Z,
"bytes" => "5039",
"response" => "403",
"clientip" => "172.16.213.132"
}
删除多余的message,时间格式化,删除字段
input {
stdin {
}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
remove_field => [ "message" ]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate {
remove_field => ["timestamp"]
}
}
output {
stdout{
codec=>rubydebug
}
}
2、时间处理(Date)
下面是date插件的一个配置示例(这里仅仅列出filter部分):
filter {
grok {
match => ["message", "%{HTTPDATE:timestamp}"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里.
3、数据修改(Mutate)
mutate插件是Logstash另一个非常重要插件。它提供了丰富的基础类型数据处理能力。包括重命名、删除、替换和修改日志事件中的字段。
filter { mutate { convert => ["filed_name", "integer"] }} #字段类型转换功能
filter { mutate { gsub => ["filed_name_1", "/" , "_"] }} #正则表达式替换匹配字段
filter { mutate { split => {"filed_name_2", "|"} }} #分隔符分割字符串为数组
filter { mutate { rename => {"old_field" => "new_field"} }} #重命名字段
filter { mutate { remove_field => ["timestamp"] }} #删除字段
举个例子:
input {
stdin {
}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
remove_field => [ "message" ]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate {
rename => {"response" => "response_new"}
convert => ["bytes", "float"]
gsub => ["referrer", "\"" , ""]
split => ["clientip", "."]
remove_field => ["timestamp"]
}
}
output {
stdout{
codec=>rubydebug
}
}
172.16.213.132 [16/Jun/2020:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
{
"bytes" => 5039.0,
"host" => "centos7_9-mod",
"@timestamp" => 2020-06-16T08:24:19.000Z,
"response_new" => "403",
"clientip" => [
[0] "172",
[1] "16",
[2] "213",
[3] "132"
],
"referrer" => "GET / HTTP/1.1",
"@version" => "1"
}
Logstash输出插件(output)
output { stdout { codec => rubydebug }} #输出到标准输出(stdout)
output { file { path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log" } #保存为文件
output { file { path => “/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log.gz” codec => line { format => “%{message}”} gzip => true } #保存为文件(file),保存日志原始格式
输出到文件:
input {
stdin {
}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate {
remove_field => ["timestamp"]
}
}
output {
file {
path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
codec => line { format => "%{message}" }
}
}
参考
Logstash 最佳实践:
http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html
grok调式工具:
http://grokdebug.herokuapp.com/
网友评论