ELK Stack

作者: 轻轻敲醒沉睡的心灵 | 来源:发表于2024-08-13 17:41 被阅读0次

    我们使用elasticsearch作为搜索引擎,基本上会把相关的相关的软件和功能都用上了。官网提供了和它相辅相成的一套软件,起名:Elastic Stack。这里我们就看看Elastic Stack的功能和简单使用。

    1. Elastic Stack功能

    • 先看看官方是怎么定义的:
      Elastic Stack:了解可帮助您构建搜索体验、解决问题并取得成功的搜索平台
      核心产品包括 ElasticsearchKibanaBeatsLogstash(也称为 ELK Stack)等等。能够安全可靠地从任何来源获取任何格式的数据,然后对数据进行搜索、分析和可视化。
      它集成的功能插件可以从这里有一个整体的认识:
      image.png
    • 再看看官方介绍的功能,那叫一个强啊:
      Elastic Stack 功能:Elastic Stack 包含各种功能(之前统一称为 X-Pack),从企业级安全性和开发人员友好型 API,到 Machine Learning 和图表分析,非常全面;借助这些功能,您能够对所有类型的数据进行大规模采集、分析、搜索和可视化。
      自身
      功能

    功能有很多,这里截取了一部分,大家可以自己去看看

    2 Logstash

    官方介绍
    Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
    Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。
    它的整个工作流程是三段式的:输入 -> 筛选 -> 输出

    工作流程
    2.1 输入

    采集各种样式、大小和来源的数据。常见的输入方式有:文件、beats、http、jdbc、kafka等。
    数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择,可以同时从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

    输入源
    2.2 筛选

    实时解析和转换数据
    数据从源传输到存储库的过程中,Logstash 筛选器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便进行更强大的分析和实现商业价值。
    Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:

    • 利用 Grok 从非结构化数据中派生出结构
    • 从 IP 地址破译出地理坐标
    • 将 PII 数据匿名化,完全排除敏感字段
    • 简化整体处理,不受数据源、格式或架构的影响。

    使用我们丰富的筛选器库和功能多样的 Elastic Common Schema,您可以实现无限丰富的可能。

    帅选
    2.3 输出

    选择您的存储库,传输您的数据
    尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。
    Logstash 提供了众多输出选择,您可以将数据发送到所需的位置,并且能够灵活地解锁众多下游用例。

    输出

    3. elk搭建

    前面我们已经搭建了elasticsearchkibana,并做了监控安装了分词器
    这里我们只用docker compose安装Logstash就可以了。官网有教程,比较粗糙,可以参考。

    3.1 docker compose文件

    docker-compose-logstash.yml

    version: '3.9'
    services:
        beats:
            image: 'docker.elastic.co/logstash/logstash:8.14.3'
            ports: 
                - 5044:5044
            container_name: logstash
            hostname: logstash
            restart: always
            environment: 
                - TZ=Asia/Shanghai
                - ES_JAVA_OPTS=-Xms512m -Xmx512m
            privileged: true
            volumes:
                - /etc/timezone:/etc/timezone
                - /etc/localtime:/etc/localtime:ro
                # 映射日志文件,要读取的
                - '/logs/test:/logs/test'
                # 映射pipeline配置文件目录
                - '/opt/soft/logstash/pipeline:/usr/share/logstash/pipeline'
                # 映射logstash配置文件
                - '/opt/soft/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml'
                # 映射elasticsearch证书目录,连接时候用
                - '/opt/soft/elasticsearch/config/certs:/usr/share/logstash/config/certs'
            networks:
                elastic: 
                    # {}
                    ipv4_address: 172.18.0.16
    networks:
        elastic: 
          external: true
          driver: bridge
    
    3.2 配置文件

    Logstash 的配置主要分为两部分:Pipeline 配置文件Settings 配置文件

    • Pipeline 配置文件(logstash.conf):这是 Logstash 的核心配置,用于定义数据处理的流程,包括输入(input)、过滤(filter)和输出(output)三个部分。每个部分都可以使用多种插件来完成特定的任务。例如,输入部分可以使用 file 插件从文件中读取数据,过滤部分可以使用 grok 插件解析日志,输出部分可以使用 elasticsearch 插件将数据发送到 Elasticsearch。位置在./pipeline/logstash.conf
    • Settings 配置文件(logstash.yml):这是 Logstash 的全局配置,包括 Logstash 实例的名称、数据存储路径、配置文件路径、自动重载配置、工作线程数量等。位置在:./config/logstash.yml
      在 Logstash 启动时,它会首先读取 Settings 配置文件,然后加载并执行 Pipeline 配置文件。
    3.2.1 logstash.conf

    注意:证书文件需要处理一下,不支持.crt格式的,复制一份,后缀改成.pem

    input {
        # 输入插件支持哪些,可参考https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
        # 选择一个即可
        file {
          path => ["/logs/app1/app1.log", "/logs/app2/app3.log", "/logs/httpd/*"]
          # 从文件开头就获取
         start_position => "beginning"
         exclude => "*.zip"
      }
    #  beats {
    #    port => 5044
    #    codec => json {
    #      charset => "UTF-8"
    #    }
    #  }
    #   jdbc {
    #    jdbc_driver_library => "/path/to/your/jdbc/driver"
    #    jdbc_driver_class => "com.mysql.jdbc.Driver"
    #    jdbc_connection_string => "jdbc:mysql://localhost:3306/yourdatabase"
    #    jdbc_user => "yourusername"
    #    jdbc_password => "yourpassword"
    #  }
    #   kafka {
    #    bootstrap_servers => "localhost:9092"
    #    topics => ["your_topic"]
    #  }
    }
    
    filter {  
        # grok 过滤器用于解析非结构化的日志数据,将其转换为结构化的数据
        # 效果就是输出中添加了一个字段message,按指定格式来的
        # 如果就想看 日志中的原格式,这个就不用加。比如java项目的日志,格式就挺好的
      #grok {
        #   match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:loglevel} %{DATA:logger} - %{GREEDYDATA:message}" }
      #}
        # date 过滤器用于解析日期和时间信息,将其转换为 Logstash 的 @timestamp 字段
      #date {
      #  match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS"]
        #target => "@timestamp"
        #   locale => "cn"
      #}
        # 排除一些没用字段,logstash输出的内容中有一些字段,看需不需要
        mutate {
            remove_field => ["@version", "tags"]
        }
    }
    
    # 输出支持选项参考官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
    # 一般都是接elasticsearch
    output {
        # elasticsearch的配置项参考这里:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-options
        # 有的配置项可能会改名字,自己进去确认
            # elasticsearch 2种方式:'file文件'、'filebeat'。用一个,另一个注释掉
        # 直接从file文件读取时
     elasticsearch { 
            hosts => ["https://es01:9200"]
            index => "test-%{+YYYY.MM.dd}" 
            user => "elastic"
            password => "123456"
            ssl_enabled => true
            ssl_certificate_authorities => "config/certs/ca/ca.pem"
        }  
      # 从filebeats获取数据,'tags'是filebeat传过来数据中的一个字段,在filebeat中配置
      if "test-01-server" in [tags] {
            elasticsearch { 
            hosts => ["https://es01:9200"]
            index => "test-01-server-%{+YYYY.MM.dd}" 
            user => "elastic"
            password => "123456"
            ssl_enabled => true
            ssl_certificate_authorities => "config/certs/ca/ca.pem"
        }  
      }
       if "test-02-server" in [tags] {
            elasticsearch { 
            hosts => ["https://es01:9200"]
            index => "test-02-server-%{+YYYY.MM.dd}" 
            user => "elastic"
            password => "123456"
            ssl_enabled => true
            ssl_certificate_authorities => "config/certs/ca/ca.pem"
        }  
      }
      # 输出到启动logstash的shell页面 
      stdout { 
        codec => rubydebug 
      }
    }
    
    3.2.2 logstash.yml

    没有配置什么,可以按需设置

    #监听地址
    http.host: "0.0.0.0"
    #节点名称,${index}是脚本变量,两个节点,1和2
    #node.name: logstash-node-${index} 
    
    #开启logstash指标监测,将指标数据发送到es集群
    #xpack.monitoring.enabled: true  
    #xpack.monitoring.elasticsearch.hosts: ["https://172.18.0.11:9200"]
    #xpack.monitoring.elasticsearch.username: "elastic"
    #xpack.monitoring.elasticsearch.password: "123456"
    #xpack.monitoring.elasticsearch.ssl.certificate_authority: "config/certs/ca.pem"
    
    #设置output或filter插件的工作线程数
    #pipeline.workers: 8 
    #设置批量执行event的最大值
    #pipeline.batch.size: 125 
    #批处理的最大等待值
    #pipeline.batch.delay: 5000 
    #开启持久化
    #queue.type: persisted 
    #队列存储路径;如果队列类型为persisted,则生效
    #path.queue: /usr/share/logstash/data 
    #队列为持久化,单个队列大小
    #queue.page_capacity: 250mb 
    #当启用持久化队列时,队列中未读事件的最大数量,0为不限制
    #queue.max_events: 0 
    #队列最大容量
    #queue.max_bytes: 1024mb 
    #在启用持久队列时强制执行检查点的最大数量,0为不限制
    #queue.checkpoint.acks: 1024 
    #在启用持久队列时强制执行检查点之前的最大数量的写入事件,0为不限制
    #queue.checkpoint.writes: 1024
    #当启用持久队列时,在头页面上强制一个检查点的时间间隔 
    #queue.checkpoint.interval: 1000 
    
    3.3.3 logstash.conf(API Key

    注意:很不幸,logstash启动不成功,因为连接不上elasticsearch,证书文件打不开,但是kibana和metricbeat连的时候都是用的这个文件啊!查看官方文档,上面写着支持 .cer.pem文件,但是当初elasticsearch这个吊毛给我们生成的是.crt证书,没有.pem。我也不会转,只怪自己对证书概念了解的不够,客户端和服务端不一样吧。真是坑啊!!!
    所以我们输出连接elasticsearch的时候换个方式,用API Key试试。

    input {
        # 输入插件支持哪些,可参考https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
        # 选择一个即可
        file {
        path => ["/logs/app1/app1.log", "/logs/app2/app3.log", "/logs/httpd/*"]
            exclude => "*.zip"
      }
    }
    
    filter {  
        # grok 过滤器用于解析非结构化的日志数据,将其转换为结构化的数据
      grok {
        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
      }
        # date 过滤器用于解析日期和时间信息,将其转换为 Logstash 的 @timestamp 字段
      date {
        match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS"]
        target => "@timestamp"
      }
    }
    
    # 输出支持选项参考官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
    # 一般都是接elasticsearch
    output {
        # elasticsearch的配置项参考这里:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-options
        # 有的配置项可能会改名字,自己进去确认
        elasticsearch { 
            hosts => ["https://es01:9200"]
            index => "test-%{+YYYY.MM.dd}" 
            ssl_enabled => true
            ssl_verification_mode => full
            api_key => "apiId(自己的):SWtoZ1RwRUIyODlSYU8zX056Sy06TDZBTHF0NHhSckdCQXFaRFNyMDBlQQ=="
        }  
      # 输出到启动logstash的shell页面 
      stdout { 
        codec => rubydebug 
      }
    }
    

    好吧,又失败了。容器是起来了,但是连接elasticsearch连不上,还是一直找证书。再换使用ca_trusted_fingerprint连接还是报错:PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    没有办法,百度了一下,crt格式后缀可以直接改成pem后缀使用,我就改了,按第一种证书的连接就可以了。

    logstash success

    4. 查看日志

    当logstash启动成功以后,会把日志文件写到elasticsearch中,但是此时,我们没法直接在kibana中查看内容。需要将数据在kibana中做一个数据视图,然后才能看数据。

    目录
    创建视图
    然后就可以在discover中查看这个索引了。
    image.png

    我们排除了2个字段,不排除接收到的数据原来是这样的:

    {
      "@version" => "1",
      "tags" => [
                [0] "_grokparsefailure"
            ],
      "log" => {
             "file" => {
                     "path" => "/logs/test/test.log"
                }
        },
      "@timestamp" => 2024-08-14T09:02:24.211705249Z,
      "event" => {
          "original" => "2024-07-27 17:27:54.453 [http-nio-9001-exec-6] INFO  org.springdoc.api.AbstractOpenApiResource.getOpenApi(390) -- Init duration for springdoc-openapi is: 79 ms\r"
       },
      "host" => {
          "name" => "d8a03914b903"
        }
     }
    

    5. 将logstash添加到metricbeat监控

    这个官网给了的,直接参考拿来用就行。

      1. logstash.yml中将自身监控关闭,这个上面已经配置了
    monitoring.enabled: false
    
      1. 在metricbeat的配置文件中加上logstash模块
      - module: logstash
        metricsets:
          - node
          - node_stats
        period: 10s
        hosts: ["http://172.18.0.16:9600"]
        xpack.enabled: true
    
      1. 官网建议关闭metricbeat的系统监控
        system监控默认是开启的,可以选择关闭。官网说:除非你有其他目的。我先不不管这个了。
      1. 重启metricbeat,等2分钟,进kibana那会自动添加logstash监控应该能看到。


        image.png
        image.png

    相关文章

      网友评论

          本文标题:ELK Stack

          本文链接:https://www.haomeiwen.com/subject/fbnrkjtx.html