logstash 介绍
Logstash是一个具有实时管道功能的开源数据收集引擎,Logstash可以动态地将来自不同数据源的数据统一起来,并将数据规范化为你选择的目的地,清理和大众化你的所有数据,用于各种高级下游分析和可视化用例。
虽然Logstash最初推动了日志收集方面的创新,但是它的功能远远超出了这个用例,任何类型的事件都可以通过大量的输入、过滤器和输出插件来丰富和转换,使用许多原生编解码可以进一步简化摄取过程。Logstash通过利用大量和多种数据来提高你的洞察力。
简单来说,logstash 是一个很牛逼的开源数据收集引擎,并能将日志繁乱的格式字段定义为你想要的。
Hillstone 配置
系统版本:Version 5.5
路径:监控-日志-日志管理-你想搜集的日志-日志服务器-日志分发方式-明文日志(注意一定是明文,默认为二进制日志,logstash拿到后不好处理)

由于可选日志条目较多,我们只拿会话日志进行举例,我们勾选让会话日志先传递给logstash。

Logstash配置
- 新建一个test.conf
[root@localhost config]# vim test-hillstone.conf
input{
udp {port => 5010 type => "Hillstone"}
}
output {
stdout { codec=> rubydebug }
}
-
放通防火墙端口,保证防火墙和Logstash的端口连通性,为方便起见,可以先暂时停用Linux的防火墙。具体停用方法可以百度。
-
我们执行一下配置文件(由于Logstash默认的日志文件只能执行一个项,所以需要重新指定路径。)
[root@localhost logstash-6.2.2]# ./bin/logstash -f config/test-hillstone.conf --path.data data/hillstone-log --path.logs logs/hillstone-log
-
我们可以看到,大多数日志都是这两种格式,我们先按照这种格式进行分析
会话日志-3.png
-
利用logstash内置的正则格式进行日志分析,通过Grok Debugger进行日志切割。(到目前的经验来看,凡是grokdebug分析不出来的,logstash肯定分析不出来,而且grokdebug速度快,效率高,建议多尝试)
eg1:
<190>May 10 11:08:48 2812242182000133(root) 44243630 Traffic@FLOW: SESSION: 10.2.20.232:53550->140.143.254.155:443(TCP), application HTTPS, interface ethernet0/1.10, vr trust-vr, policy 10, user -@-, host -, mac 0000.0000.0000, send packets 3, send bytes 176, receive packets 2, receive bytes 120, start time 2019-05-10 11:08:47, close time 2019-05-10 11:08:48, session end, Block\n\u0000
\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), application\ %{DATA:app}\, interface %{DATA:interface}\, vr %{DATA:vr}\, policy %{DATA:policy}\, user %{USERNAME:user}\@%{DATA:AAAserver}\, host %{DATA:HOST}\, mac %{CISCOMAC:mac}\, send packets %{BASE10NUM:sendPackets}\, send bytes %{BASE10NUM:sendBytes}\, receive packets %{BASE10NUM:receivePackets}\, receive bytes %{BASE10NUM:receiveBytes}\, start time %{TIMESTAMP_ISO8601:startTime}\, close time %{TIMESTAMP_ISO8601:closeTime}\, %{GREEDYDATA:reason}

我们将变量都定义出来,将日志定义成自己想要的,后面可以根据字段进行删减。
- 好了,废话不多说,大体思路是这样。下面我将.conf配置文件贴出来,有问题可以多沟通。
[root@localhost config]# cat hillstone.conf
input{
udp {port => 5010 type => "Hillstone"}
# udp {port => 5003 type => "H3C"}
}
# kv{ }
#input { stdin { } }
filter {
grok {
##流量日志
#PBR修改后语句
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: PBR\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), app %{DATA:app}\, in-interface\ %{DATA:in-interface}\, vr\ %{DATA:vr}\, pbr-policy\ %{DATA:pbrpolicy}\, pbr id\ %{DATA:pbrID}\, out-interface\ %{DATA:out-interface}\, nexthop\ %{IPV4:nexthop}, user\ %{USERNAME:user}\@%{DATA:AAAserver}\, host %{DATA:HOST}\, %{GREEDYDATA:reason}"}
#SESSION会话结束日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), application\ %{DATA:app}\, interface %{DATA:interface}\, vr %{DATA:vr}\, policy %{DATA:policy}\, user %{USERNAME:user}\@%{DATA:AAAserver}\, host %{DATA:HOST}\, mac %{CISCOMAC:mac}\, send packets %{BASE10NUM:sendPackets}\, send bytes %{BASE10NUM:sendBytes}\, receive packets %{BASE10NUM:receivePackets}\, receive bytes %{BASE10NUM:receiveBytes}\, start time %{TIMESTAMP_ISO8601:startTime}\, close time %{TIMESTAMP_ISO8601:closeTime}\, %{GREEDYDATA:reason}"}
#SESSION会话开始日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), interface %{DATA:interface}\, vr %{DATA:vr}\, policy %{DATA:policy}\, user %{USERNAME:user}\@%{DATA:AAAserver}\, host %{DATA:HOST}\, mac %{CISCOMAC:mac}\, %{GREEDYDATA:reason}"}
#SNAT日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), snat to %{IPV4:snatip}\:%{BASE10NUM:snatport}\, vr\ %{DATA:vr}\, user\ %{USERNAME:user}\@%{DATA:AAAserver}\, host\ %{DATA:HOST}\, rule\ %{BASE10NUM:rule}"}
#DNAT日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), dnat to %{IPV4:dnatip}\:%{BASE10NUM:dnatport}\, vr\ %{DATA:vr}\, user\ %{USERNAME:user}\@%{DATA:AAAserver}\, host\ %{DATA:HOST}\, rule\ %{BASE10NUM:rule}"}
##内容审计日志
#URL日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Event}@%{DATA:Class}\: %{DATA:module}: IP %{IPV4:srcip}\:%{BASE10NUM:srcport}\(%{IPV4:snatip}\:%{BASE10NUM:snatport}\)->%{IPV4:dstip}\:%{BASE10NUM:dstport}\(%{IPV4:dnatip}\:%{BASE10NUM:dnatport}\), user %{USERNAME:user}, VR %{DATA:vr}\, URL\ %{DATA:url}\, category\ %{DATA:category}\, method\ %{DATA:method}\, action\ %{DATA:action}\, reason\ %{GREEDYDATA:reason}"}
#IM日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}: IP %{IPV4:srcip}\:%{BASE10NUM:srcport}\(%{IPV4:snatip}\:%{BASE10NUM:snatport}\)->%{IPV4:dstip}\:%{BASE10NUM:dstport}\(%{IPV4:dnatip}\:%{BASE10NUM:dnatport}\), user %{USERNAME:user}\@%{DATA:AAAserver}\, VR %{DATA:vr}\, %{DATA:app}\, %{WORD: content}\, %{DATA:state}\, user mac\ %{BASE10NUM:mac}"}
##系统日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{GREEDYDATA:reason}"}
}
# if [message] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\."
# {
# geoip {
# source => "dstip"
# target => "geoip"
# database => "/opt/elk/logstash-6.2.2/geoip/GeoLite2-City.mmdb"
# fields => ["country_name", "continent_code", "region_name", "city_name", "location", "latitude", "longitude"]
# remove_field => ["[geoip][latitude]", "[geoip][longitude]"]
# }
#
# geoip {
# source => "dstip"
# target => "geoip"
# database => "/opt/elk/logstash-6.2.2/geoip/GeoLite2-ASN.mmdb"
# remove_field => ["[geoip][ip]"]
# }
# }
mutate {
# add_field => ["logTimestamp", "%{date} %{time}"]
# remove_field => ["logTimestamp", "year", "month", "day", "time", "date"]
remove_field => ["type", "host", "message", "ROOT", "HOST", "serial", "syslog_pri", "timestamp", "mac"]
}
}
output {
elasticsearch {
hosts => "10.2.3.193:9200" #elasticsearch服务地址
index => "logstash-hillstone-%{+YYYY.MM.dd}"
}
# stdout { codec=> rubydebug }
}
网友评论