美文网首页
ELK部署使用

ELK部署使用

作者: shaipxiang | 来源:发表于2021-12-22 08:54 被阅读0次

    一、ELK介绍

    官网简介

    官网地址

    ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。

    Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

    Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。

    Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。

    Filebeat隶属于Beats。目前Beats包含四种工具:

    1. Packetbeat(搜集网络流量数据)
    2. Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据)
    3. Filebeat(搜集文件数据),公司经常使用这个工具来收集文件日志内容
    4. Winlogbeat(搜集 Windows 事件日志数据)
      系统日志的搜集现在主流采用 FileBeat的方式进行搜集。

    官方文档

    Filebeat
    https://www.elastic.co/cn/products/beats/filebeat
    https://www.elastic.co/guide/en/beats/filebeat/5.6/index.html

    Logstash
    https://www.elastic.co/cn/products/logstash
    https://www.elastic.co/guide/en/logstash/5.6/index.html

    Kibana
    https://www.elastic.co/cn/products/kibana
    https://www.elastic.co/guide/en/kibana/5.5/index.html

    Elasticsearch
    https://www.elastic.co/cn/products/elasticsearch
    https://www.elastic.co/guide/en/elasticsearch/reference/5.6/index.html

    elasticsearch中文社区
    https://elasticsearch.cn/

    架构图.png

    各服务器部署filebeat收集系统日志,将收集的日志放到redis里面缓存,然后logstash收集redis推送的日志内容,过滤后存入es,然后在kibana web上面展示查看。logstash也可以将数据直接推送到数据库中存储。

    二、各组件的工作原理

    采摘自官网

    Filebeat工作原理

    Filebeat由两个主要组件组成:prospectors 和 harvesters。这两个组件协同工作将文件变动发送到指定的输出中。

    prospectors.png

    1. Harvester(收割机)

    负责读取单个文件内容。每个文件会启动一个Harvester,每个Harvester会逐行读取各个文件,并将文件内容发送到制定输出中。Harvester负责打开和关闭文件,意味在Harvester运行的时候,文件描述符处于打开状态,如果文件在收集中被重命名或者被删除,Filebeat会继续读取此文件。所以在Harvester关闭之前,磁盘不会被释放。默认情况filebeat会保持文件打开的状态,直到达到close_inactive(如果此选项开启,filebeat会在指定时间内将不再更新的文件句柄关闭,时间从harvester读取最后一行的时间开始计时。若文件句柄被关闭后,文件发生变化,则会启动一个新的harvester。关闭文件句柄的时间不取决于文件的修改时间,若此参数配置不当,则可能发生日志不实时的情况,由scan_frequency参数决定,默认10s。Harvester使用内部时间戳来记录文件最后被收集的时间。例如:设置5m,则在Harvester读取文件的最后一行之后,开始倒计时5分钟,若5分钟内文件无变化,则关闭文件句柄。默认5m)。

    2. Prospector(勘测者)

    负责管理Harvester并找到所有读取源。

    filebeat.prospectors:
    - input_type: log
      paths:
        - /apps/logs/*/info.log
    

    Prospector会找到/apps/logs/*目录下的所有info.log文件,并为每个文件启动一个Harvester。Prospector会检查每个文件,看Harvester是否已经启动,是否需要启动,或者文件是否可以忽略。若Harvester关闭,只有在文件大小发生变化的时候Prospector才会执行检查。只能检测本地的文件。

    Filebeat如何记录文件状态

    将文件状态记录在文件中(默认在/var/lib/filebeat/registry)。此状态可以记住Harvester收集文件的偏移量。若连接不上输出设备,如ES等,filebeat会记录发送前的最后一行,并再可以连接的时候继续发送。Filebeat在运行的时候,Prospector状态会被记录在内存中。Filebeat重启的时候,利用registry记录的状态来进行重建,用来还原到重启之前的状态。每个Prospector会为每个找到的文件记录一个状态,对于每个文件,Filebeat存储唯一标识符以检测文件是否先前被收集。

    Filebeat如何保证事件至少被输出一次

    Filebeat之所以能保证事件至少被传递到配置的输出一次,没有数据丢失,是因为filebeat将每个事件的传递状态保存在文件中。在未得到输出方确认时,filebeat会尝试一直发送,直到得到回应。若filebeat在传输过程中被关闭,则不会再关闭之前确认所有时事件。任何在filebeat关闭之前为确认的时间,都会在filebeat重启之后重新发送。这可确保至少发送一次,但有可能会重复。可通过设置shutdown_timeout 参数来设置关闭之前的等待事件回应的时间(默认禁用)。

    Logstash工作原理

    Logstash事件处理有三个阶段:inputs → filters → outputs。是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。

    Logstash.png

    1. Input:输入数据到logstash

    一些常用的输入为:

    • file:从文件系统的文件中读取,类似于tial -f命令
    • syslog:在514端口上监听系统日志消息,并根据RFC3164标准进行解析
    • redis:从redis service中读取
    • beats:从filebeat中读取

    Filters:数据中间处理,对数据进行操作

    一些常用的过滤器为:

    • grok:解析任意文本数据,Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。内置120多个解析语法。
    • mutate:对字段进行转换。例如对字段进行删除、替换、修改、重命名等。
    • drop:丢弃一部分events不进行处理。
    • clone:拷贝 event,这个过程中也可以添加或移除字段。
    • geoip:添加地理信息(为前台kibana图形化展示使用)

    2. Outputs:outputs是logstash处理管道的最末端组件。

    一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。

    一些常见的outputs为:

    • elasticsearch:可以高效的保存数据,并且能够方便和简单的进行查询。
    • file:将event数据保存到文件中。
    • graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。
    • Codecs:codecs 是基于数据流的过滤器,它可以作为input,output的一部分配置。

    Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。

    一些常见的codecs:

    • json:使用json格式对数据进行编码/解码。
    • multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息。

    Redis在架构里的作用

    • 存储日志,可以是nginx,apache,tomcat等其他只要产生都可以存储,只要打上标签,logstash在input时就会分好类,就像京东快递仓库一样,把所有的快递全部集中到一起,只要写好目标地址,就会送到你手里
    • 提高冗余性,若redis后面的全部宕机了,也不至于数据丢失
    • 加快日志的读取速度
    • 全部日志集中一起,打好标签,便于操作管理

    三、ELK安装部署

    基础环境搭建.png

    1. Redis安装部署

    参考其他资料

    2. Filebeat安装配置

    1、 下载并解压缩

    wget  [https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gz](https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gz)
    tar  -zxvf filebeat-6.2.4-linux-x86_64.tar.gz
    

    2、配置日志的输入输出
    示例:
    input

    - type: log
      enabled: true
      paths:
        - D:\idea\apache-tomcat-8.5.35\bin\logs\duty-web\infoFile.log
      include_lines: ['userOperationLog']
      tags: ["duty_audit"]
      fields:
        type: web_log
        
    - type: log
      enabled: true
      paths:
        - D:\idea\apache-tomcat-8.5.35\bin\logs\police-web\infoFile.log
      include_lines: ['userOperationLog']
      tags: ["police_audit"]
      fields:
        type: web_log
      fields_under_root: true
    

    output.redis

    output.redis:
      hosts: ["192.168.10.1"]
      key: web_log
      datatype: list
    

    将多个系统日志文件 都推送到 redis中

    3.Logstash安装配置

    解压

    tar -zxvf /home/wyk/logstash-7.7.0.tar.gz -C /opt/app/
    

    解压即是安装结束,下面是个小例子:
    新增一个配置文件: 读取日志文件打印到窗口中

    input {
        redis {
            host => "192.168.10.1"
            port => 6379
            db => "0"
            data_type => "list"
            key => web_log
            type => web_log
        }
    }
    filter {
        mutate {
            remove_field => ["@version","@timestamp","host","agent","offset","log","input","ecs","fields"]
        gsub => [ "message","  "," "]
            gsub => [ "message","="," "]
            gsub => [ "message","null",'""']
        }
        grok {
            match => {
                "message" => "%{DATESTAMP:date} %{LOGLEVEL:level} %{NOTSPACE:class} %{NOTSPACE:logresult} - %{NOTSPACE:log_type} %{GREEDYDATA:json_log}"
            }
        }
        json {
            source => "json_log"
        }
        date {
            match => [ "operationStarttime", "yyyy-MM-dd HH:mm:ss" ]
            target => "operationStarttime_date"
        }
        date {
            match => [ "operationEndtime", "yyyy-MM-dd HH:mm:ss"]
            target => "operationEndtime_date"
        }
        mutate {
            remove_field => ["date","level","class","logresult","log_type","message"]
        }
    }
    output {
        stdout { codec => rubydebug }
        if [type] == "web_log" {
            if [tags][0] == "duty_audit" {
                elasticsearch {
                    hosts  => ["192.168.10.1:9200"]
                    index  => "duty_audit"
                }
            }
        }
        if [type] == "web_log" {
            if [tags][0] == "police_audit" {
                elasticsearch {
                    hosts  => ["192.168.10.1:9200"]
                    index  => "police_audit"
                }
            }
        }
    }
    

    执行文件启动

    bin/logstash -f config/logstash-csdn.yml
    

    也可以按日期配置es索引

    elasticsearch {
        hosts => ["192.168.10.1:9200"]
        index => "police_audit-%{+YYYY.MM.dd}"
    }  
    

    因为ES保存日志是永久保存,所以需要定期删除一下日志,下面命令为删除指定时间前的日志

    curl -X DELETE http://xx.xx.com:9200/logstash-*-`date +%Y-%m-%d -d "-$n days"`
    

    4.ES安装配置

    解压

    tar zxvf elasticsearch-7.7.0-linux-x86_64.tar.gz -C /opt/app/
    

    创建data和logs目录

    mkdir /opt/app/elasticsearch-7.7.0/data
    mkdir /opt/app/elasticsearch-7.7.0/logs
    

    配置文件

    vim /opt/app/elasticsearch-7.7.0/config/elasticsearch.yml
    

    修改es配置

    cluster.name: MyES  #集群名称
    node.name: node01    #本节点名称
    path.data: /opt/app/elasticsearch-7.7.0/data    #数据存储目录
    path.logs: /opt/app/elasticsearch-7.7.0/logs    #日志存储目录
    network.host: 0.0.0.0     #所有机器都可监听
    http.port: 9200        #默认端口
    cluster.initial_master_nodes: ["node01"]  #主节点名称,与上面配置的保持一致
    

    JVM配置
    因为ES是基于Java开发的,所以依赖于JVM,有需求可以修改JVM属性:

    vim /opt/app/elasticsearch-7.7.0/config/jvm.options
    -Xms4g
    -Xmx4g
    

    创建elk用户

    # 创建用户
    useradd elk
    # 为用户设置密码
    passwd elk
    # 删除用户
    userdel -r elk
    # 配置用户的文件夹权限
    chown -R elk:elk elasticsearch-7.7.0
    

    启动

    su - elk
    $ES_HOME/bin/elasticsearch
    

    报错

    ERROR: [3] bootstrap checks failed
    [1]: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65535]
    [2]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
    ERROR: Elasticsearch did not exit normally - check the logs at /opt/app/elasticsearch-7.7.0/logs/MyES.log
    

    查看机器限制

    ulimit -Sn
    ulimit -Hn
    

    修改限制:

    vim /etc/security/limits.conf
    
    *       soft    nofile           65535
    *       hard    nofile           65535
    elk       soft    nproc           4096
    elk       hard    nproc           4096
    

    执行命令sysctl -p另其生效

    sysctl -p
    

    切换elk用户再次启动ES,在浏览器中输入node01:9200验证,出现下面的结果说明已正常安装并启动:

    启停脚本
    编写启停脚本 start-es.sh以及stop-es.sh,编写后记得切换到elk用户刷新一下环境变量,否则会找不到$ES_HOME:
    start-es.sh

    sudo -u elk nohup $ES_HOME/bin/elasticsearch >> $ES_HOME/output.log 2>&1 &
    

    stop-es.sh

    #!/bin/bash
    PROCESS=`ps -ef | grep 'elasticsearch-7.7.0' |grep -v grep | grep -v PPID | awk '{ print $2}'`
    echo $PROCESS
    for i in $PROCESS
    do
      echo "Kill the elasticsearch process [ $i ]"
      kill -9 $i
    done
    

    1.es启动不能使用root用户
    2.es-header优先推荐使用google浏览器插件安装

    5. Kibana安装使用

    解压

    tar -zxvf /home/wyk/kibana-7.7.0-linux-x86_64.tar.gz -C /opt/app/
    

    配置

    vim config/kibana.yml
    
    server.port: 5601
    server.host: "0.0.0.0"
    elasticsearch.hosts: ["http://localhost:9200"] #ES所在的ip
    #elasticsearch.username: "kibana"  #如果ES有配置用户则这里需要配置用户密码
    #elasticsearch.password: "123456"
    

    给用户分配文件夹操作权限

    chown -R elk:elk /opt/app/kibana-7.7.0-linux-x86_64
    

    启动

    $KB_HOME/bin/kibana
    

    后台启动,启动后会监听5601端口

    nohup ./kibana &
    

    启停脚本
    start-kb.sh

    #!/bin/bash
    sudo -u elk  nohup $KB_HOME/bin/kibana > $KB_HOME/output.log 2>&1 &
    

    stop-kb.sh

    #!/bin/bash
    PROCESS=`ps -ef | grep 'kibana' |grep -v grep | grep -v PPID | awk '{ print $2}'`
    echo $PROCESS
    for i in $PROCESS
    do
      echo "Kill the kibana process [ $i ]"
      kill -9 $i
    done
    

    四、日志格式

    为实现对应用系统用户操作日志的收集、记录、分析并最终实现审计功能,需要应用系统按照约定的格式、内容、和编码方式将应用系统用户的操作日志输出到指定目录文件中。同时,为了在应用系统出现运行故障,无法正常运行时,现场技术人员能够借助ELK平台快速检索错误日志,及时排查和解决系统问题,需要应用系统将错误日志照约定的格式、内容、和编码方式输出到指定目录文件中。
    1、 输出路径:
        Tomcat应用服务器部署路径/EHL_logs/应用系统标识/infoFile.log
        应用系统标识应唯一标识系统。
        示例:tomcat8.0/EHL_logs/EHL_EW/infoFile.log。
    2、 日志文件编码格式:UTF-8。
    3、 输出标识:
    用户操作日志添加前缀,与其他系统日志区别。
    示例:userOperationLog={"requestIp":"10.2.113.44","userId":"001"……}。
    4、 输出格式:
        用户操作日志内容以json格式输出。
        示例:2019/07/31 18:10:49 INFO LogAspect:77 userOperationLog={"logResult":"成功","logType":"QUERY",…}。


    日志字段.png

    五、Kibana使用简要说明

    1. Stack Management

        Index management     索引管理
        Index Patterns    索引检索

    2. Discover

        根据第一步增加的索引检索,查询索引日志

    3. Dev tools

        Grok Debugger
        grok 语法测试,可以在这将输出日志使用grok语法测试分割

    六、Grok语法补充说明

    在线文档
    ELK Stack权威指南
    https://www.bookstack.cn/read/logstash-best-practice-cn/README.md

    logstash插件grok地址:
    https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-grok.html

    正则在线调试:
    http://grokdebug.herokuapp.com/

    官方预定义的 grok 表达式:
    https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

    grok·ELKstack中文指南:
    https://elkguide.elasticsearch.cn/logstash/plugins/filter/grok.html

    自定义grok表达式

    %{HOSTNAME},匹配请求的主机名
    %{TIMESTAMP_ISO8601:time},代表时间戳
    %{LOGLEVEL},代表日志级别
    %{URIPATHPARAM},代表请求路径
    %{INT},代表字符串整数数字大小
    %{NUMBER}, 可以匹配整数或者小数
    %{UUID},匹配类似091ece39-5444-44a1-9f1e-019a17286b48
    %{IP}, 匹配ip
    %{WORD}, 匹配请求的方式
    %{GREEDYDATA},匹配所有剩余的数据
    (?([\S+]*)),自定义正则
    \s*或者\s+,代表多个空格
    \S+或者\S*,代表多个字符
    大括号里面:xxx,相当于起别名
    (?<class_info>([\S+]*)), 自定义正则匹配多个字符
    

    logstash配置

    filter {
          grok {
            match => { "message" =>"%{TIMESTAMP_ISO8601:timestamp}\s*\|\-%{LOGLEVEL:log_level}\s*\[%{DATA:thread}\]\s*(?m)(?<msg>.*|\s)" }
            remove_field => ["message","_source"]
          }
    }
    

    具体配置使用可在Kibana-->Grok Debugger中进行语法测试,调试好后配置在logstash中

    相关文章

      网友评论

          本文标题:ELK部署使用

          本文链接:https://www.haomeiwen.com/subject/wnvxqrtx.html