美文网首页DBA
使用Elastic Stack搭建日志集中分析平台

使用Elastic Stack搭建日志集中分析平台

作者: mysia | 来源:发表于2019-08-01 14:58 被阅读8次

    前言

    一般我们需要进行日志分析场景:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。

    一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。

    一个完整的集中式日志系统,需要包含以下几个主要特点:

    • 收集-能够采集多种来源的日志数据;
    • 传输-能够稳定的把日志数据传输到中央系统;
    • 存储-如何存储日志数据;
    • 分析-可以支持 UI 分析;
    • 警告-能够提供错误报告,监控机制;

    ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。

    Elastic Stack简介

    Elastic Stack包括Beats、Elasticsearch、Logstash、Kibana、APM等,ELK是其核心套件。

    • Elasticsearch是实时全文搜索和分析引擎,提供搜集、分析、存储数据三大功能;是一套开放REST和JAVA API等结构提供高效搜索功能,可扩展的分布式系统。它构建于Apache Lucene搜索引擎库之上。

    • Logstash是一个用来搜集、分析、过滤日志的工具。它支持几乎任何类型的日志,包括系统日志、错误日志和自定义应用程序日志。它可以从许多来源接收日志,这些来源包括 syslog、消息传递(例如 RabbitMQ)和JMX,它能够以多种方式输出数据,包括电子邮件、websockets和Elasticsearch。

    • Kibana是一个基于Web的图形界面,用于搜索、分析和可视化存储在 Elasticsearch指标中的日志数据。它利用Elasticsearch的REST接口来检索数据,不仅允许用户创建他们自己的数据的定制仪表板视图,还允许他们以特殊的方式查询和过滤数据。

    • Beats是轻量级数据采集工具,包括:

      • Packetbeat(搜集网络流量数据);
      • Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据);
      • Filebeat(搜集文件数据);
      • Winlogbeat(搜集 Windows 事件日志数据)
      • Metricbeat(收集系统级的 CPU 使用率、内存、文件系统、磁盘 IO 和网络 IO 统计数据);
      • Auditbeat(采集linux审计日志);

    系统架构

    第一种ELK架构,是最简单的一种ELK架构方式。优点是搭建简单,易于上手。缺点是Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。建议小规模集群使用。此架构首先由Logstash分布于各个节点上搜集相关日志、数据,并经过分析、过滤后发送给远端服务器上的Elasticsearch进行存储。Elasticsearch将数据以分片的形式压缩存储并提供多种API供用户查询,操作。用户亦可以更直观的通过配置Kibana Web Portal方便的对日志查询,并根据数据生成报表。


    基本ELK.png

    第二种架构,引入了消息队列机制,位于各个节点上的Logstash Agent先将数据/日志传递给Kafka(或者Redis),并将队列中消息或数据间接传递给Logstash,Logstash过滤、分析后将数据传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka(或者Redis),所以即使远端Logstash server因故障停止运行,数据将会先被存储下来,从而避免数据丢失。这种架构适合于较大集群的解决方案,但由于Logstash中心节点和Elasticsearch的负荷会比较重,可将他们配置为集群模式,以分担负荷,这种架构的优点在于引入了消息队列机制,均衡了网络传输,从而降低了网络闭塞尤其是丢失数据的可能性,但依然存在Logstash占用系统资源过多的问题。


    ELK进阶架构.png

    第三种架构,引入了Logstash-forwarder。首先,Logstash-forwarder将日志数据搜集并统一发送给主节点上的Logstash,Logstash分析、过滤日志数据后发送至Elasticsearch存储,并由Kibana最终将数据呈现给用户。这种架构解决了Logstash在各计算机点上占用系统资源较高的问题。经测试得出,相比Logstash,Logstash-forwarder所占用系统CPU和MEM几乎可以忽略不计。另外,Logstash-forwarder和Logstash间的通信是通过SSL加密传输,起到了安全保障。如果是较大集群,用户亦可以如结构三那样配置logstash集群和Elasticsearch集群,引入High Available机制,提高数据传输和存储安全。更主要的配置多个Elasticsearch服务,有助于搜索和数据存储效率。但在此种架构下发现Logstash-forwarder和Logstash间通信必须由SSL加密传输,这样便有了一定的限制性。

    Logstash-forwarder

    第四种架构,将Logstash-forwarder替换为Beats。经测试,Beats满负荷状态所耗系统资源和Logstash-forwarder相当,但其扩展性和灵活性有很大提高。Beats platform目前包含有Packagebeat、Topbeat和Filebeat三个产品,均为Apache 2.0 License。同时用户可根据需要进行二次开发。这种架构原理基于第三种架构,但是更灵活,扩展性更强。同时可配置Logstash 和Elasticsearch 集群用于支持大集群系统的运维日志数据监控和查询。

    ELK with Beats

    一个例子:MySQL日志审计系统

    MySQL日志审计系统,采用percona audit插件审计MySQL的访问情况,结果记录到指定文件中。通过Rsyslog将每个MySQL审计日志集中到Rsyslog Server的指定目录中,使用filebeat监控文件变化,上报到kafka。使用Logstash消费数据,把数据过滤切割后,写入ES中,用户通过kibana查询相关数据。

    系统架构图如下:


    MySQL审计日志系统.png

    MySQL审计采用percona的审计插件,配置如下:

    +----------------------------+---------------+
    | Variable_name              | Value         |
    +----------------------------+---------------+
    | audit_log_buffer_size      | 1048576       |
    | audit_log_exclude_accounts |               |
    | audit_log_exclude_commands |               |
    | audit_log_file             | audit.log     |
    | audit_log_flush            | OFF           |
    | audit_log_format           | OLD           |
    | audit_log_handler          | FILE          |
    | audit_log_include_accounts |               |
    | audit_log_include_commands |               |
    | audit_log_policy           | ALL           |
    | audit_log_rotate_on_size   | 0             |
    | audit_log_rotations        | 0             |
    | audit_log_strategy         | ASYNCHRONOUS  |
    | audit_log_syslog_facility  | LOG_USER      |
    | audit_log_syslog_ident     | percona-audit |
    | audit_log_syslog_priority  | LOG_INFO      |
    +----------------------------+---------------+
    

    收集到的审计日志,通过Rsyslog的imfile模块,采集审计日志,之后发送到Rsyslog Server。

    input(type="imfile" File="audit.log" Tag="mysqlaudit" Severity="info" Facility="local2")
    local2.info                             @Rsyslog.Server.dns:514
    

    在Rsyslog上,创建相关目录接收上传的日志:

    $template mysql_audit_file, "/data/log/mysql_audit_log/%$YEAR%/%$MONTH%/%$DAY%/%fromhost-ip%.log"
    if ($syslogfacility-text == 'local2') and ($syslogseverity-text == 'info') then -?mysql_audit_file
    & stop
    

    Rsyslog上接收到的文件,通过filebeat上报kafka:

    filebeat.prospectors:
    - type: log
      enabled: true
      paths:
        - /data/log/mysql_audit_log/*/*/*/*
      multiline:
        pattern: "^{\"audit_record\":"
        negate: true
        match: after
      document_type: mysqlauditlog
    

    Logstash负责消费kafka的数据,过滤切割后,写入到ES中:

    input {
      kafka {
        add_field => {"myid"=>"mysql_audit"}
        bootstrap_servers => ""
        group_id => "logstash-mysql-audit"
        topics => ["mysql_audit"]
        client_id => "mysql_audit"
        consumer_threads => 1
        auto_offset_reset => "latest"
        decorate_events => true
      }
    }
    
    filter {
      if [myid] == "mysql_audit" {
        ruby {
          code => "
            array=event.get('message')
    
            timestamp = array.scan(/\"@timestamp\":\"(.*?)\"/)
            if timestamp.length > 0
                    event.set('timestamp',timestamp[0][0])
            end
     
            client_ip = array.scan(/\"client_ip\":\"(.*?)\"/)
            if client_ip.length > 0
                    event.set('client_ip',client_ip[0][0])
            end
    
            request = array.scan(/\"request\":\"(.*?)\",/)
            if request.length > 0
                    event.set('request',request[0][0])
            end
    
            status = array.scan(/\"status\":\"(.*?)\",/)
            if status.length > 0
                    event.set('status',status[0][0])
            end
    
            bytes_in = array.scan(/\"bytes_in\":(\d+)/)
            if bytes_in.length > 0
                    event.set('bytes_in',bytes_in[0][0])
            end 
    
            type = array.scan(/\"type\":\"(.*?)\",/)
            if type.length > 0
                    event.set('type',type[0][0])
            end 
    
            query = array.scan(/\"query\":\"(.*?)\",/)
            if query.length > 0
                    event.set('query',query[0][0])
            end
    
            bytes_out = array.scan(/\"bytes_out\":(\d+)/)
            if bytes_out.length > 0
                    event.set('bytes_out',bytes_out[0][0])
            end
    
            responsetime = array.scan(/\"responsetime\":(\d+)/)
            if responsetime.length > 0
                    event.set('responsetime',responsetime[0][0])
            end
    
            response = array.scan(/\"response\":\"(\{.*?\})\",/)
            if response.length > 0
                    event.set('response',response[0][0])
            end 
    
            port = array.scan(/\"port\":(\d+)/)
            if port.length > 0
                    event.set('port',port[0][0])
            end  
    
            client_port = array.scan(/\"client_port\":(\d+)/)
            if client_port.length > 0
                    event.set('client_port',client_port[0][0])
            end
    
            method = array.scan(/\"method\":\"(.*?)\",/)
            if method.length > 0
                    event.set('method',method[0][0])
            end
    
            ip = array.scan(/\"ip\":\"(.*?)\"/)
            if ip.length > 0
                    event.set('ip',ip[0][0])
            end
    
            host = array.scan(/\"host\":\"(.*?)\"/)
            if host.length > 0
                    event.set('host',host[0][0])
            end
    
          "
        }
        mutate {
          remove_field =>["message"]
        }
      }
    }
    
    output {
      if [myid] == "mysql_audit" {
        elasticsearch {
          hosts => [""]
          index => "mysql_audit-%{+YYYY-MM-dd}"
          codec => "json"
        }
      }
    }
    

    用户可以在kibana中查询自己所需的数据:


    kibana

    总结

    目前,上报到公司kafka的日志,皆可接入数据库部门的ES,可通过kibana统一查询、分析,协助排查错误、分析性能。后续通过接入更多的beats组件,来丰富ES日志平台的使用场景。

    相关文章

      网友评论

        本文标题:使用Elastic Stack搭建日志集中分析平台

        本文链接:https://www.haomeiwen.com/subject/pghzrctx.html