我们使用elasticsearch作为搜索引擎,基本上会把相关的相关的软件和功能都用上了。官网提供了和它相辅相成的一套软件,起名:Elastic Stack
。这里我们就看看Elastic Stack
的功能和简单使用。
1. Elastic Stack
功能
- 先看看官方是怎么定义的:
Elastic Stack
:了解可帮助您构建搜索体验、解决问题并取得成功的搜索平台
核心产品包括Elasticsearch
、Kibana
、Beats
和Logstash
(也称为ELK Stack
)等等。能够安全可靠地从任何来源获取任何格式的数据,然后对数据进行搜索、分析和可视化。
它集成的功能插件可以从这里有一个整体的认识:
image.png - 再看看官方介绍的功能,那叫一个强啊:
Elastic Stack
功能:Elastic Stack
包含各种功能(之前统一称为X-Pack
),从企业级安全性和开发人员友好型 API,到 Machine Learning 和图表分析,非常全面;借助这些功能,您能够对所有类型的数据进行大规模采集、分析、搜索和可视化。
自身
功能
功能有很多,这里截取了一部分,大家可以自己去看看。
2 Logstash
官方介绍
Logstash
是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。
它的整个工作流程是三段式的:输入 -> 筛选 -> 输出
2.1 输入
采集各种样式、大小和来源的数据。常见的输入方式有:文件、beats、http、jdbc、kafka等。
数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择,可以同时从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
2.2 筛选
实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 筛选器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便进行更强大的分析和实现商业价值。
Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:
- 利用 Grok 从非结构化数据中派生出结构
- 从 IP 地址破译出地理坐标
- 将 PII 数据匿名化,完全排除敏感字段
- 简化整体处理,不受数据源、格式或架构的影响。
使用我们丰富的筛选器库和功能多样的 Elastic Common Schema,您可以实现无限丰富的可能。
2.3 输出
选择您的存储库,传输您的数据
尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。
Logstash 提供了众多输出选择,您可以将数据发送到所需的位置,并且能够灵活地解锁众多下游用例。
3. elk搭建
前面我们已经搭建了elasticsearch
和kibana
,并做了监控、安装了分词器。
这里我们只用docker compose
安装Logstash
就可以了。官网有教程,比较粗糙,可以参考。
3.1 docker compose
文件
docker-compose-logstash.yml
version: '3.9'
services:
beats:
image: 'docker.elastic.co/logstash/logstash:8.14.3'
ports:
- 5044:5044
container_name: logstash
hostname: logstash
restart: always
environment:
- TZ=Asia/Shanghai
- ES_JAVA_OPTS=-Xms512m -Xmx512m
privileged: true
volumes:
- /etc/timezone:/etc/timezone
- /etc/localtime:/etc/localtime:ro
# 映射日志文件,要读取的
- '/logs/test:/logs/test'
# 映射pipeline配置文件目录
- '/opt/soft/logstash/pipeline:/usr/share/logstash/pipeline'
# 映射logstash配置文件
- '/opt/soft/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml'
# 映射elasticsearch证书目录,连接时候用
- '/opt/soft/elasticsearch/config/certs:/usr/share/logstash/config/certs'
networks:
elastic:
# {}
ipv4_address: 172.18.0.16
networks:
elastic:
external: true
driver: bridge
3.2 配置文件
Logstash 的配置主要分为两部分:Pipeline 配置文件
和 Settings 配置文件
。
-
Pipeline 配置文件(logstash.conf):这是 Logstash 的核心配置,用于定义数据处理的流程,包括输入(input)、过滤(filter)和输出(output)三个部分。每个部分都可以使用多种插件来完成特定的任务。例如,输入部分可以使用 file 插件从文件中读取数据,过滤部分可以使用 grok 插件解析日志,输出部分可以使用 elasticsearch 插件将数据发送到 Elasticsearch。位置在
./pipeline/logstash.conf
-
Settings 配置文件(logstash.yml):这是 Logstash 的全局配置,包括 Logstash 实例的名称、数据存储路径、配置文件路径、自动重载配置、工作线程数量等。位置在:
./config/logstash.yml
在 Logstash 启动时,它会首先读取 Settings 配置文件,然后加载并执行 Pipeline 配置文件。
3.2.1 logstash.conf
注意:证书文件需要处理一下,不支持.crt
格式的,复制一份,后缀改成.pem
input {
# 输入插件支持哪些,可参考https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
# 选择一个即可
file {
path => ["/logs/app1/app1.log", "/logs/app2/app3.log", "/logs/httpd/*"]
# 从文件开头就获取
start_position => "beginning"
exclude => "*.zip"
}
# beats {
# port => 5044
# codec => json {
# charset => "UTF-8"
# }
# }
# jdbc {
# jdbc_driver_library => "/path/to/your/jdbc/driver"
# jdbc_driver_class => "com.mysql.jdbc.Driver"
# jdbc_connection_string => "jdbc:mysql://localhost:3306/yourdatabase"
# jdbc_user => "yourusername"
# jdbc_password => "yourpassword"
# }
# kafka {
# bootstrap_servers => "localhost:9092"
# topics => ["your_topic"]
# }
}
filter {
# grok 过滤器用于解析非结构化的日志数据,将其转换为结构化的数据
# 效果就是输出中添加了一个字段message,按指定格式来的
# 如果就想看 日志中的原格式,这个就不用加。比如java项目的日志,格式就挺好的
#grok {
# match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:loglevel} %{DATA:logger} - %{GREEDYDATA:message}" }
#}
# date 过滤器用于解析日期和时间信息,将其转换为 Logstash 的 @timestamp 字段
#date {
# match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS"]
#target => "@timestamp"
# locale => "cn"
#}
# 排除一些没用字段,logstash输出的内容中有一些字段,看需不需要
mutate {
remove_field => ["@version", "tags"]
}
}
# 输出支持选项参考官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
# 一般都是接elasticsearch
output {
# elasticsearch的配置项参考这里:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-options
# 有的配置项可能会改名字,自己进去确认
# elasticsearch 2种方式:'file文件'、'filebeat'。用一个,另一个注释掉
# 直接从file文件读取时
elasticsearch {
hosts => ["https://es01:9200"]
index => "test-%{+YYYY.MM.dd}"
user => "elastic"
password => "123456"
ssl_enabled => true
ssl_certificate_authorities => "config/certs/ca/ca.pem"
}
# 从filebeats获取数据,'tags'是filebeat传过来数据中的一个字段,在filebeat中配置
if "test-01-server" in [tags] {
elasticsearch {
hosts => ["https://es01:9200"]
index => "test-01-server-%{+YYYY.MM.dd}"
user => "elastic"
password => "123456"
ssl_enabled => true
ssl_certificate_authorities => "config/certs/ca/ca.pem"
}
}
if "test-02-server" in [tags] {
elasticsearch {
hosts => ["https://es01:9200"]
index => "test-02-server-%{+YYYY.MM.dd}"
user => "elastic"
password => "123456"
ssl_enabled => true
ssl_certificate_authorities => "config/certs/ca/ca.pem"
}
}
# 输出到启动logstash的shell页面
stdout {
codec => rubydebug
}
}
3.2.2 logstash.yml
没有配置什么,可以按需设置
#监听地址
http.host: "0.0.0.0"
#节点名称,${index}是脚本变量,两个节点,1和2
#node.name: logstash-node-${index}
#开启logstash指标监测,将指标数据发送到es集群
#xpack.monitoring.enabled: true
#xpack.monitoring.elasticsearch.hosts: ["https://172.18.0.11:9200"]
#xpack.monitoring.elasticsearch.username: "elastic"
#xpack.monitoring.elasticsearch.password: "123456"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: "config/certs/ca.pem"
#设置output或filter插件的工作线程数
#pipeline.workers: 8
#设置批量执行event的最大值
#pipeline.batch.size: 125
#批处理的最大等待值
#pipeline.batch.delay: 5000
#开启持久化
#queue.type: persisted
#队列存储路径;如果队列类型为persisted,则生效
#path.queue: /usr/share/logstash/data
#队列为持久化,单个队列大小
#queue.page_capacity: 250mb
#当启用持久化队列时,队列中未读事件的最大数量,0为不限制
#queue.max_events: 0
#队列最大容量
#queue.max_bytes: 1024mb
#在启用持久队列时强制执行检查点的最大数量,0为不限制
#queue.checkpoint.acks: 1024
#在启用持久队列时强制执行检查点之前的最大数量的写入事件,0为不限制
#queue.checkpoint.writes: 1024
#当启用持久队列时,在头页面上强制一个检查点的时间间隔
#queue.checkpoint.interval: 1000
3.3.3 logstash.conf(API Key
)
注意:很不幸,logstash启动不成功,因为连接不上elasticsearch,证书文件打不开,但是kibana和metricbeat连的时候都是用的这个文件啊!查看官方文档,上面写着支持 .cer
和 .pem
文件,但是当初elasticsearch这个吊毛给我们生成的是.crt
证书,没有.pem
。我也不会转,只怪自己对证书概念了解的不够,客户端和服务端不一样吧。真是坑啊!!!
所以我们输出连接elasticsearch的时候换个方式,用API Key
试试。
input {
# 输入插件支持哪些,可参考https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
# 选择一个即可
file {
path => ["/logs/app1/app1.log", "/logs/app2/app3.log", "/logs/httpd/*"]
exclude => "*.zip"
}
}
filter {
# grok 过滤器用于解析非结构化的日志数据,将其转换为结构化的数据
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
# date 过滤器用于解析日期和时间信息,将其转换为 Logstash 的 @timestamp 字段
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
}
}
# 输出支持选项参考官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
# 一般都是接elasticsearch
output {
# elasticsearch的配置项参考这里:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-options
# 有的配置项可能会改名字,自己进去确认
elasticsearch {
hosts => ["https://es01:9200"]
index => "test-%{+YYYY.MM.dd}"
ssl_enabled => true
ssl_verification_mode => full
api_key => "apiId(自己的):SWtoZ1RwRUIyODlSYU8zX056Sy06TDZBTHF0NHhSckdCQXFaRFNyMDBlQQ=="
}
# 输出到启动logstash的shell页面
stdout {
codec => rubydebug
}
}
好吧,又失败了。容器是起来了,但是连接elasticsearch连不上,还是一直找证书。再换使用ca_trusted_fingerprint连接还是报错:PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
。
没有办法,百度了一下,crt格式后缀可以直接改成pem后缀使用,我就改了,按第一种证书的连接就可以了。
4. 查看日志
当logstash启动成功以后,会把日志文件写到elasticsearch
中,但是此时,我们没法直接在kibana中查看内容。需要将数据在kibana中做一个数据视图,然后才能看数据。
创建视图
然后就可以在discover中查看这个索引了。
image.png
我们排除了2个字段,不排除接收到的数据原来是这样的:
{
"@version" => "1",
"tags" => [
[0] "_grokparsefailure"
],
"log" => {
"file" => {
"path" => "/logs/test/test.log"
}
},
"@timestamp" => 2024-08-14T09:02:24.211705249Z,
"event" => {
"original" => "2024-07-27 17:27:54.453 [http-nio-9001-exec-6] INFO org.springdoc.api.AbstractOpenApiResource.getOpenApi(390) -- Init duration for springdoc-openapi is: 79 ms\r"
},
"host" => {
"name" => "d8a03914b903"
}
}
5. 将logstash添加到metricbeat监控
这个官网给了的,直接参考拿来用就行。
-
logstash.yml
中将自身监控关闭,这个上面已经配置了
-
monitoring.enabled: false
- 在metricbeat的配置文件中加上logstash模块
- module: logstash
metricsets:
- node
- node_stats
period: 10s
hosts: ["http://172.18.0.16:9600"]
xpack.enabled: true
- 官网建议关闭metricbeat的系统监控
system监控
默认是开启的,可以选择关闭。官网说:除非你有其他目的
。我先不不管这个了。
- 官网建议关闭metricbeat的系统监控
-
重启metricbeat,等2分钟,进kibana那会自动添加logstash监控应该能看到。
image.png
image.png
-
网友评论