背景
架构为filebeat+Elasticsearch+Kibana
1、filebeat直连Elasticsearch,需要对massage提取一些特定的字段。
2、如果对数据处理比较多建议用logstash,logstash更强大。
pipeline 简介
pipeline 开始了解是使用jenkins的时候,即交付流水线。
平时写的jenkinsfile就是用的pipeline。
我理解的pipeline就是把你在界面上配置的自动化变成代码脚本。即Pipeline as Code。
一、原始massage
原始日志内容如下:
2022-12-22T11:00:00.078+08:00 [INFO ][asset-service][7][1605760032885796865][DubboServerHandler-10.0.93.155:17158-thread-200][] --- com.cmcc.icf.asset.service.support.ProviderLogAspectJ-42 : dubbo服务请求|IDraftDiscountAnswerProvider|queryDiscountAnswerException|[]|
我需要提取字段:
{
"traceId": "1605760032885796865",
"appname": "asset-service",
"log-level": "INFO",
"timestamp": "2022-12-22T11:00:00.078+08:00"
}
二、操作
想要使用filebeat提取massage字段
1,先在es里面创建一个pipeline脚本。
2,配置filebeat使用es的pipeline脚本。
2.1 创建pipeline.json文件
在linux目录创建一个文件,文件名称随意起。
vim extract-traceid-pipeline.json
{
"description" : "extract-traceid-pipeline", # 此pipeline描述
"processors" : [
{
"grok" :{ # grok 语法
"field" : "message", # 需要对采集的信息的哪个字段进行处理。其实就是filebeat采集完一条日志之后执行这个处理。可以对采集的任何字段值进行处理。
"patterns" : ["%{TIMESTAMP_ISO8601:@timestamp} \\[%{LOGLEVEL:log-level}%{SPACE}\\]\\[%{DATA:appname}\\]\\[%{NUMBER}\\]\\[%{DATA:traceId}\\]"] # 这个就是从message信息中提取信息的语法。
}
}
]
}
说明:
[""] 这个是固定写法,里面的才是正则
\[ [需要转义但是es也需要转义一层,所以就是两个\
%{} 这个是固定的写法,就是你想要提取哪个信息然后 使你提取出来的信息成为一个字段key:value。
DATA 是grok的语法,内置字段类型,比如:
TIMESTAMP_ISO8601: 表示国际标准ISO8601日期时间
DATE:表示日期
LOGLEVEL:表示日志级别
NUMBER:表示为数值
DATA:表示字符串
IP:表示ip地址
SPACE:表示空格
trace_id 这个表示你给提取出来的信息定义一个Key的名称,比如
trace_id:1605760032885796865
到时候收集到的这条日志信息里面就会多出这一个字段,在kibana就能搜索到,这个名称随便定义。
2.2 将json文件传到es
在json文件所在的目录执行:
curl -H "Content-Type: application/json" -XPUT 'http://10.16.0.61:9200/_ingest/pipeline/extract-traceid-pipeline' -d@extract-traceid-pipeline.json
# extract-traceid-pipeline 这个名称你可以随便起,但是在filebeat配置的时候需要写这个名称
# extract-traceid-pipeline.json 这个就是你自己的json文件
或到kibana 开发工具控制台
1671681554(1).png
2.3 查看pipeline脚本
查看es里刚上传的pipeline规则:
1、访问:http://10.16.0.61:9200/_ingest/pipeline
2、通过kibana 开发工具控制台查看
GET /_ingest/pipeline/extract-traceid-pipeline
1671681852(1).png
2.4 filebeat配置
# 编辑filebeat.yml配置文件
output.elasticsearch:
hosts: ["110.16.0.61:9200""]
pipelines: # pipelines 就是你下面可以放多个pipeline
- pipeline: "extract-traceid-pipeline" # 这个名称就是你curl put到es里面 你后面写的那个名称
when.contains: # 这里是一个filebeat的配置语法,可以看看filebeat的官方文档,大致的意思就是if 条件判断
log_topics: "applogs" # 就是当log_topics:"applogs" 的时候才用这个pipeline
# when 就是if
# contains 就是 包含的意思 还有其他的比如:and,or,not,equals(必须相等)
2.5 查看数据
kibana查看日志。收集的数据里面就会多出刚刚提取的字段。
1671682319(1).png
网友评论