一、elk 实用知识点总结
codec => plain {
charset =>"GB2312"}
filebeat.prospectors:
- input_type: log
paths:
- c:\Users\Administrator\Desktop\performanceTrace.txt
encoding: GB2312
if([message] =~"^20.*-\ task\ request,.*,start\ time.*") {#用正则需删除的多余行 drop {}
}
2018-03-2010:44:01,523[33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-2010:43:59 #需删除的行-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End
2018-03-2010:44:01,523[33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-2010:43:59-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End
(2)在logstash filter中grok 分别处理3行
match => {
"message"=>"^20.*-\ task\ request,.*,start\ time\:%{TIMESTAMP_ISO8601:RequestTime}"match => {
"message"=>"^--\ Request\ String\ :\ \{\"UserName\":\"%{NUMBER:UserName:int}\",\"Pwd\":\"(?.*)\",\"DeviceType\":%{NUMBER:DeviceType:int},\"DeviceId\":\"(?.*)\",\"EquipmentNo\":(?.*),\"SSID\":(?.*),\"RegisterPhones\":(?.*),\"AppKey\":\"(?.*)\",\"Version\":\"(?.*)\"\}\ --\ \End.*"
}
match => {
"message"=>"^--\ Response\ String\ :\ \{\"ErrorCode\":%{NUMBER:ErrorCode:int},\"Success\":(?[a-z]*),\"ErrorMsg\":(?.*),\"Result\":(?.*),\"WaitInterval\":%{NUMBER:WaitInterval:int}\}\ --\ \End.*"}
... 等多行
2018-03-2010:44:01,523[33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-2010:43:59-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End
② logstash grok 对合并后多行的处理(合并多行后续都一样,如下)
filter {
grok {
match => {
"message"=>"^%{TIMESTAMP_ISO8601:InsertTime}\ .*-\ task\ request,.*,start\ time:%{TIMESTAMP_ISO8601:RequestTime}\n--\ Request\ String\ :\ \{\"UserName\":\"%{NUMBER:UserName:int}\",\"Pwd\":\"(?.*)\",\"DeviceType\":%{NUMBER:DeviceType:int},\"DeviceId\":\"(?.*)\",\"EquipmentNo\":(?.*),\"SSID\":(?.*),\"RegisterPhones\":(?.*),\"AppKey\":\"(?.*)\",\"Version\":\"(?.*)\"\}\ --\ \End\n--\ Response\ String\ :\ \{\"ErrorCode\":%{NUMBER:ErrorCode:int},\"Success\":(?[a-z]*),\"ErrorMsg\":(?.*),\"Result\":(?.*),\"WaitInterval\":%{NUMBER:WaitInterval:int}\}\ --\ \End" }
}
}
(2)在filebeat中使用multiline 插件(推荐)
negate:true/false,匹配到pattern 部分开始合并,还是不配到的合并
after:匹配到pattern 部分后合并,注意:这种情况最后一行日志不会被匹配处理
before:匹配到pattern 部分前合并(推荐)
filebeat.prospectors:
- input_type: log
paths:
- /root/performanceTrace*
fields:
type: zidonghualog
multiline.pattern: '.*\"WaitInterval\":.*--\ End'
multiline.negate: true
multiline.match: before
filebeat.prospectors:
- input_type: log
paths:
- /root/performanceTrace*
input_type: log
multiline:
pattern: '^20.*'
negate: true
match: after
(3)在logstash input中使用multiline 插件(没有filebeat 时推荐)
negate:true/false,匹配到pattern 部分开始合并,还是不配到的合并
previous:相当于filebeat 的after
next:相当于filebeat 的before
input {
file {
path => ["/root/logs/log2"]
start_position =>"beginning" codec => multiline {
pattern =>"^20.*" negate => true
what =>"previous" }
}
}
(4)在logstash filter中使用multiline 插件(不推荐)
① filter设置multiline后,pipline worker会自动将为1
② 5.5 版本官方把multiline 去除了,要使用的话需下载,下载命令如下:
/usr/share/logstash/bin/logstash-plugin install logstash-filter-multiline
filter {
multiline {
pattern =>"^20.*" negate => true
what =>"previous" }
}
2018-03-2010:44:01[33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-2010:43:59
date {
match => ["InsertTime","YYYY-MM-dd HH:mm:ss "]
remove_field =>"InsertTime" }
match => ["timestamp" ,"dd/MMM/YYYY H:m:s Z"]
匹配这个字段,字段的格式为:日日/月月月/年年年年 时/分/秒 时区
也可以写为:match => ["timestamp","ISO8601"](推荐)
就是将匹配日志中时间的key 替换为@timestamp 的时间,因为@timestamp 的时间是日志送到logstash 的时间,并不是日志中真正的时间。
filebeat:
prospectors:
-
paths:
#- /mnt/data/WebApiDebugLog.txt*
- /mnt/data_total/WebApiDebugLog.txt*
fields:
type: WebApiDebugLog_total
-
paths:
- /mnt/data_request/WebApiDebugLog.txt*
#- /mnt/data/WebApiDebugLog.txt*
fields:
type: WebApiDebugLog_request
-
paths:
- /mnt/data_report/WebApiDebugLog.txt*
#- /mnt/data/WebApiDebugLog.txt*
fields:
type: WebApiDebugLog_report
② 在logstash filter中使用if,可进行对不同类进行不同处理
filter {
if[fields][type] =="WebApiDebugLog_request"{#对request 类日志if([message] =~"^20.*-\ task\ report,.*,start\ time.*") {#删除report 行 drop {}
}
grok {
match => {"... ..."}
}
}
if[fields][type] =="WebApiDebugLog_total" {
elasticsearch {
hosts => ["6.6.6.6:9200"]
index =>"logstashl-WebApiDebugLog_total-%{+YYYY.MM.dd}" document_type =>"WebApiDebugLog_total_logs"}
每天每台机器可处理:24h*60min*60sec*3000*250Byte=64,800,000,000Bytes,约64G
③ 瓶颈在logstash 从redis中取数据存入ES,开启一个logstash,每秒约处理6000条数据;开启两个logstash,每秒约处理10000条数据(cpu已基本跑满);
④ logstash的启动过程占用大量系统资源,因为脚本中要检查java、ruby以及其他环境变量,启动后资源占用会恢复到正常状态。
(1)没有原则要求使用filebeat或logstash,两者作为shipper的功能是一样的,区别在于:
① logstash由于集成了众多插件,如grok,ruby,所以相比beat是重量级的;
② logstash启动后占用资源更多,如果硬件资源足够则无需考虑二者差异;
③ logstash基于JVM,支持跨平台;而beat使用golang编写,AIX不支持;
④ AIX 64bit平台上需要安装jdk(jre) 1.7 32bit,64bit的不支持;
⑤ filebeat可以直接输入到ES,但是系统中存在logstash直接输入到ES的情况,这将造成不同的索引类型造成检索复杂,最好统一输入到els 的源。
logstash/filter 总之各有千秋,但是,我推荐选择:在每个需要收集的日志服务器上配置filebeat,因为轻量级,用于收集日志;再统一输出给logstash,做对日志的处理;最后统一由logstash 输出给els。
默认配置 ---> pipeline.workers: 2
可优化为 ---> pipeline.workers: CPU内核数(或几倍cpu内核数)
默认配置 ---> pipeline.output.workers: 1
可优化为 ---> pipeline.output.workers: 不超过pipeline 线程数
默认配置 ---> pipeline.batch.size: 125
可优化为 ---> pipeline.batch.size: 1000
默认配置 ---> pipeline.batch.delay: 5
可优化为 ---> pipeline.batch.size: 10
通过设置-w参数指定pipeline worker数量,也可直接修改配置文件logstash.yml。这会提高filter和output的线程数,如果需要的话,将其设置为cpu核心数的几倍是安全的,线程在I/O上是空闲的。
默认每个输出在一个pipeline worker线程上活动,可以在输出output 中设置workers设置,不要将该值设置大于pipeline worker数。
还可以设置输出的batch_size数,例如ES输出与batch size一致。
filter设置multiline后,pipline worker会自动将为1,如果使用filebeat,建议在beat中就使用multiline,如果使用logstash作为shipper,建议在input 中设置multiline,不要在filter中设置multiline。
Logstash是一个基于Java开发的程序,需要运行在JVM中,可以通过配置jvm.options来针对JVM进行设定。比如内存的最大最小、垃圾清理机制等等。JVM的内存分配不能太大不能太小,太大会拖慢操作系统。太小导致无法启动。默认如下:
(2)Redis可以使用list(最长支持4,294,967,295条)或发布订阅存储模式;
② requirepass ilinux.io #加密码,为了安全运行
③ 只做队列,没必要持久存储,把所有持久化功能关掉:快照(RDB文件)和追加式文件(AOF文件),性能更好
save "" 禁用快照
appendonly no 关闭RDB
maxmemory 0 #maxmemory为0的时候表示我们对Redis的内存使用没有限制
① vm.swappiness =1#ES 推荐将此参数设置为1,大幅降低 swap 分区的大小,强制最大程度的使用内存,注意,这里不要设置为0, 这会很可能会造成 OOM
② net.core.somaxconn =65535 #定义了每个端口最大的监听队列的长度
③ vm.max_map_count=262144 #限制一个进程可以拥有的VMA(虚拟内存区域)的数量。虚拟内存区域是一个连续的虚拟地址空间区域。当VMA 的数量超过这个值,OOM
④ fs.file-max =518144#设置 Linux 内核分配的文件句柄的最大数量
[root@elasticsearch]# sysctl -p 生效一下
elasticsearch soft nofile65535elasticsearch hard nofile 65535elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
vim /etc/pam.d/common-session-noninteractive
session required pam_limits.so
① 将最小堆大小(Xms)和最大堆大小(Xmx)设置为彼此相等。
② Elasticsearch可用的堆越多,可用于缓存的内存就越多。但请注意,太多的堆可能会使您长时间垃圾收集暂停。
③ 设置Xmx为不超过物理RAM的50%,以确保有足够的物理内存留给内核文件系统缓存。
④ 不要设置Xmx为JVM用于压缩对象指针的临界值以上;确切的截止值有所不同,但接近32 GB。不要超过32G,如果空间大,多跑几个实例,不要让一个实例太大内存
bootstrap.memory_lock:true #锁住内存,不使用swap
#缓存、线程等优化如下
bootstrap.mlockall: truetransport.tcp.compress: trueindices.fielddata.cache.size: 40%indices.cache.filter.size: 30%indices.cache.filter.terms.size: 1024mb
threadpool:
search:
type: cached
size: 100 queue_size: 2000
vim /etc/profile.d/elasticsearch.sh export ES_HEAP_SIZE=2g #Heap Size不超过物理内存的一半,且小于32G
① ES是分布式存储,当设置同样的cluster.name后会自动发现并加入集群;
② 集群会自动选举一个master,当master宕机后重新选举;
Logstash和其连接的服务运行速度一致,它可以和输入、输出的速度一样快。
注意CPU是否过载。在Linux/Unix系统中可以使用top -H查看进程参数以及总计。
如果CPU使用过高,直接跳到检查JVM堆的章节并检查Logstash worker设置。
注意Logstash是运行在Java虚拟机中的,所以它只会用到你分配给它的最大内存。
检查其他应用使用大量内存的情况,这将造成Logstash使用硬盘swap,这种情况会在应用占用内存超出物理内存范围时。
使用Logstash plugin(例如使用文件输出)磁盘会发生饱和。
当发生大量错误,Logstash生成大量错误日志时磁盘也会发生饱和。
在Linux中,可使用iostat,dstat或者其他命令监控磁盘I/O
当使用大量网络操作的input、output时,会导致网络饱和。
heap设置太小会导致CPU使用率过高,这是因为JVM的垃圾回收机制导致的。
一个快速检查该设置的方法是将heap设置为两倍大小然后检测性能改进。不要将heap设置超过物理内存大小,保留至少1G内存给操作系统和其他进程。
你可以使用类似jmap命令行或VisualVM更加精确的计算JVM heap
网友评论