1、系统架构
流量回溯系统捕获和分析数据流程,一般由以下几个步骤组成:
1.数据包捕获-记录网络上的数据包流量。
2.协议解析-解析不同的网络协议和字段。
3.搜索和可视化-详细或汇总浏览数据。
从Wireshark 3.0.0rc1开始,TShark可以使用-G elastic-mapping
选项,生成Elasticsearch映射文件,存储在Elasticsearch中并进行浏览,使得TShark解析结果可以在在Kibana中进行搜索和可视化。
下面,我将展示如何使用Wireshark和Elastic Stack来建立流量回溯系统:
2、数据捕获和解析
2.1 使用packetbeat
Packetbeat可以配置为捕获实时网络数据包,并使用-I
选项从捕获文件中读取数据包。它可以识别和解析许多应用程序级协议,例如HTTP,MySQL和DNS,以及常规流信息。但packetbeat对报文采用会话方式分析,不能对一个个报文单独分析,倾向于应用层,对网络层面分析不足,并且支持的协议有限,tshark的2000多种存在明显差距。
2.2 使用Wireshark / Tshark
Wireshark是最受欢迎的数据包捕获和分析软件,它可以识别2,000多个协议,其中包含200,000多个字段。除GUI界面操作外,它还提供命令行实用程序tshark
来捕获实时流量以及读取和解析捕获文件。作为其输出,tshark
可以生成报告和统计信息,还可以分析不同文本格式的数据包数据。
自2.2版(2016年9月发布)以来,tshark
支持的Elasticsearch Bulk API的JSON格式输出。
tshark -i eth0 -T ek > packets.json
将在eth0
网络接口上实时捕获数据包,并以Elasticsearch Bulk API格式输出到文件packets.json
。
tshark -r capture.pcap -T ek > packets.json
将从捕获文件capture.pcap
中读取数据包,并作为JSON格式输出为Elasticsearch Bulk API格式到文件packets.json
。
tshark -r capture.pcap -Y http -e ip.src -e ip.dst -e tcp.srcport -e tcp.dstport -T ek > packets.json
将从捕获文件capture.pcap
中读取数据包,过滤HTTP协议指定字段,并作为JSON格式输出为Elasticsearch Bulk API格式到文件packets.json
。
tshark -r capture.pcap -Y http -e ip.src -e ip.dst -e tcp.srcport -e tcp.dstport -Tfileds -t ad -E separator="|" > packets.txt
将从捕获文件capture.pcap
中读取数据包,过滤HTTP协议指定字段,并增加"|"分隔符以txt格式输出到文件packets.txt
。
3、数据格式转换
3.1 使用Ingest Pipeline
从5.0版开始,Elasticsearch提出了Ingest Pipeline的概念。一个pipeline 由一系列线程组成,这些可以对数据进行许多不同的更改。
Pipeline示例如下:
PUT _ingest/pipeline/packets
{
"description": "Import Tshark Elasticsearch output",
"processors" : [
{
"date_index_name" : {
"field" : "timestamp",
"index_name_prefix" : "packets-webserver01-",
"date_formats": [ "UNIX_MS" ],
"date_rounding" : "d"
}
}
]
}
该Pipeline只是简单更改数据包将被写入的Elasticsearch索引(默认值为packets-YYYY-MM-DD
)。要在导入数据时使用此管道,请在URL中指定它:
curl -s -H "Content-Type: application/x-ndjson" -XPOST "localhost:9200/_bulk?pipeline=packets" --data-binary "@packets.json"
有关更多信息,请参见Ingest模块文档和新的摄取方式-第1部分,第2部分。
Filebeat和Logstash都有等效的配置选项,用于在将数据发送到Elasticsearch时指定接收Pipeline。
3.2 使用Logstash
Logstash是Elastic Stack的一部分,并用作数据处理线程。它可以用来读取Elasticsearch Bulk API格式的数据,并在将数据发送到Elasticsearch之前对数据执行更复杂的转换和充实。
配置示例为:
logstash.conf
input {
file {
path => "/path/to/packets.json"
start_position => "beginning"
}
}
filter {
# Drop Elasticsearch Bulk API control lines
if ([message] =~ "{\"index") {
drop {}
}
json {
source => "message"
remove_field => "message"
}
# Extract innermost network protocol
grok {
match => {
"[layers][frame][frame_frame_protocols]" => "%{WORD:protocolRead More
grok过滤器从,格式为“ protocol:protocol:protocol ”的frame_frame_protocols
字段(例如“ eth:ethertype:ip:tcp:http ” )中提取最内层的网络协议名称,到顶级“ protocol ”字段中。
4、解析结果导入Elasticsearch
4.1 设置Elasticsearch映射
Wireshark解析的原始数据包含大量字段,这些字段种绝大多数不会搜索或汇总,因此在所有这些字段上创建索引通常不是正确的选择。
字段太多会降低索引和查询速度,Elasticsearch 5.5开始,默认会将索引中的字段数限制为1000。另外,tshark -T ek
无论原始数据字段值是文本还是数字,包括时间戳和IP地址,均输出为字符串。
如果要指定正确的数据类型,将数字索引为数字,将时间戳作为时间戳等,防止索引字段激增,应显式指定Elasticsearch映射。
Elasticsearch映射例子如下:
PUT _template/packets
{
"template": "packets-*",
"mappings": {
"pcap_file": {
"dynamic": "false",
"properties": {
"timestamp": {
"type": "date"
},
"layers": {
"properties": {
"frame": {
"propertiesRead More
“ template”:“ packets- ”指定该模板应用于与该模式匹配的所有新索引。
*“ dynamic”:“ false” 指定不应在映射中未明确指定的字段建立索引。
未编入索引的字段仍将存储在Elasticsearch中,可以在搜索结果中看到它们,但是无法搜索或汇总它们。
4.2 写入Elasticsearch映射
要将tshark -T ek
的输出数据导入到Elasticsearch中,有以下几种方式。
1、 使用curl
命令
curl -s -H "Content-Type: application/x-ndjson" -XPOST "localhost:9200/_bulk" --data-binary "@packets.json"
注意:如果您的JSON文件包含数千个文档,则可能必须将其拆分为较小的块,然后分别将其发送到Bulk API,例如使用脚本。如果系统上有
split
程序,您可以用来执行该操作。
2、使用Filebeat
Filebeat非常轻巧,可以监视一组文件或目录中是否有任何新文件,并自动对其进行处理。读取Tshark输出文件并将数据包数据发送到Elasticsearch。
示例配置如下所示:
filebeat.yml
filebeat.prospectors:
- input_type: log
paths:
- "/path/to/packets.json"
document_type: "pcap_file"
json.keys_under_root: true
processors:
- drop_event:
when:
equals:
index._typeRead More
json.keysunderroot: true 将数据解析为JSON。
index.type: "pcapfile" 删除用于Elasticsearch Bulk API的控制行。
template.enabled: false 使用现有模板上传Elasticsearch。
3、使用Logstash
与Filebeat一样,Logstash可以监视目录中的新文件并自动对其进行处理。与Filebeat相比,它比Elasticsearch Ingest Pipeline可以更广泛地转换数据。
有关Logstash配置的示例,请参见3.1转换数据的部分。
5、Kibana查询和展示
在Kibana中,您现在可以浏览数据包并在它们之上构建仪表板。
5.1 图表展示
*网络数据详细视图,包括一个表格,该表格在可扩展的行中显示原始数据包数据
*显示网络协议分布的饼图
5.2 Kibana设置
由于来自Wireshark的网络数据具有与syslog等不同的数据格式,因此更改Kibana中的某些设置(“管理”选项卡->“ Kibana高级设置”)很有意义。
Kibana配置如下:
shortDots:enable = true
format:number:defaultPattern = 0.[000] (2)
timepicker:timeDefaults = {
"from": "now-30d",
"to": "now",
"mode": "quick"
}
shortDots:enable = true 缩短长嵌套字段的名称,例如,修改
layer.frame.frame_frame_number
为l.f.frame_frame_number
。
format:number:defaultPattern = 0.000 改变显示格式编号,不显示千位分隔符。
timepicker:timeDefaults 设置Kibana显示最近30天内的数据。
参考
网友评论