美文网首页dibo
数据采集--Logstash(四)

数据采集--Logstash(四)

作者: 无剑_君 | 来源:发表于2019-09-25 16:58 被阅读0次

    一、Logstash简介

      Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。


    Logstash

      Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。


    Logstash管道

    输入:采集各种样式、大小和来源的数据
    数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

    输入

    过滤器:实时解析和转换数据
    数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

    Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:

    利用 Grok 从非结构化数据中派生出结构
    从 IP 地址破译出地理坐标
    将 PII(个人验证信息) 数据匿名化,完全排除敏感字段
    整体处理不受数据源、格式或架构的影响

    输出:选择你的存储,导出你的数据
    尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。
    Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

    logstash是做数据采集的,类似于flume。
    Logstash架构:


    Logstash架构图

    Batcher负责批量的从queue中取数据;
    Queue分类:
    In Memory : 无法处理进程Crash、机器宕机等情况,会导致数据丢失
    Persistent Queue In Disk:可处理进程Crash等情况,保证数据不丢失,保证数据至少消费一次,充当缓冲区,可以替代kafka等消息队列的作用。

    官网:https://www.elastic.co/cn/products/logstash

    二、下载安装

    1. 下载安装
    # 下载
    [root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz
    # 解压
    [root@localhost ~]# tar -zxvf logstash-6.4.3.tar.gz -C /usr/local
    # 查看内容
    [root@localhost ~]# ls /usr/local/logstash-6.4.3/
    # 测试logstash-6.4.3
    [root@localhost logstash-6.4.3]# ./bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
    Sending Logstash logs to /usr/local/logstash-6.4.3/logs which is now configured via log4j2.properties
    [2019-09-25T15:12:41,903][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2019-09-25T15:12:42,613][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.4.3"}
    [2019-09-25T15:12:45,490][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
    [2019-09-25T15:12:45,845][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x310540d4 run>"}
    The stdin plugin is now waiting for input:
    [2019-09-25T15:12:46,543][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    [2019-09-25T15:12:47,020][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    
    

    如果启动成功会出现一下的提示语句:


    启动成功

    接着屏幕就等着你输入了,比如输入一个Hello World,会出现以下的提示语句。

    HelloWorld
    {
        "@timestamp" => 2019-09-25T07:14:40.491Z,
              "host" => "localhost",
          "@version" => "1",
           "message" => "HelloWorld"
    }
    
    
    1. 配置文件简单测试
      pipeline配置简介:
      Pipeline用于配置input、filter和output插件
    input {
            ...
    }
    filter {
            ...
    }
    output {
            ...
    }
    

    创建配置文件logstash.conf:

    [root@localhost logstash-6.4.3]# vi config/logstash.conf
    # 输入内容
    
    input {
        stdin { }
    }
    
    output {
        stdout {
            codec => rubydebug { }
        }
        elasticsearch {
            hosts => ["0.0.0.0:9200"]
            user => elastic
            password => xW9dqAxThD5U4ShQV1JT
        }
    }
    
    # 启动elasticsearch
    # 指定配置文件启动
    [root@localhost logstash-6.4.3]# ./bin/logstash -f config/logstash.conf
    # 同样命令行等着你输入指令
    Hello World
    {
          "@version" => "1",
              "host" => "localhost",
        "@timestamp" => 2019-09-25T07:25:03.292Z,
           "message" => "Hello World"
    }
    
    

    访问:
    http://192.168.77.132:9200/_search?q=Hello

    测试访问

    常见问题:

    1. 错误提示:Unrecognized VM option 'UseParNewGC'
      JDK版本不正确。
    2. (LoadError) Unsupported platform: x86_64-linux
      切换JDK为1.8版本
    # 安装JDK
    [root@localhost ~]# yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
    # 切换JDK
    [root@localhost logstash-6.4.3]# alternatives --config java  
    
    共有 2 个提供“java”的程序。
    
      选项    命令
    -----------------------------------------------
     + 1           java-11-openjdk.x86_64 (/usr/lib/jvm/java-11-openjdk-11.0.4.11-1.el7_7.x86_64/bin/java)
    *  2           java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-1.el7_7.x86_64/jre/bin/java)
    
    按 Enter 保留当前选项[+],或者键入选项编号:2
    
    
    1. error=>"Got response code '401' contacting Elasticsearch at URL 'http://0.0.0.0:9200/'"
      原因:
      没有指定Elasticsearch 权限,修改配置添加elasticsearch 的用户与密码:
    output {
        stdout {
            codec => rubydebug { }
        }
        elasticsearch {
            hosts => ["0.0.0.0:9200"]
            user => elastic
            password => xW9dqAxThD5U4ShQV1JT
        }
    }
    
    1. YUM安装
      参考档安装:
      https://www.elastic.co/guide/en/logstash/6.4/installing-logstash.html#_yum

    三、采集MySql数据库数据

    1. 下载MySql-JDBC
      MySql-JDBC下载地址:https://dev.mysql.com/downloads/connector/j/5.1.html
      MySql-JDBC下载地址
      5.1下载:
    [root@localhost ~]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
     解压:
    [root@localhost ~]# tar -zxvf mysql-connector-java-5.1.48.tar.gz
    
    

    8.0下载地址:
    https://dev.mysql.com/downloads/connector/j/8.0.html

    8.0下载地址
    [root@localhost ~]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-8.0.17.tar.gz
    
    
    1. 安装MySql-JDBC插件
    #   安装插件
    [root@localhost logstash-6.4.3]# ./bin/logstash-plugin install logstash-input-jdbc
    Validating logstash-input-jdbc
    Installing logstash-input-jdbc
    Installation successful
    
    
    1. 创建数据库、表、与sql文件
      数据库:log_test 表:news


      数据库与表

      数据脚本:

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    -- ----------------------------
    -- Table structure for news
    -- ----------------------------
    DROP TABLE IF EXISTS `news`;
    CREATE TABLE `news`  (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
      `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '标题',
      `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '内容',
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
    
    SET FOREIGN_KEY_CHECKS = 1;
    
    

    创建执行SQL:

    # 创建目录
    [root@localhost logstash-6.4.3]# mkdir sql
    # 添加内容
    [root@localhost sql]# vi sql/jdbc.sql
    # 内容为
    select * from news
    

    要注意:这里的内容就是logstash依赖执行的sql命令,所以这里的表名要跟你实际的数据库表名一致,否则会失败。
    至于jdbc.sql的内容就是你的业务sql,只能有一条,且末尾不要加分号,否则出错!

    1. 单数据源同步
      单数据源同步是指,数据只写入一个index下(注意:6.x版本下,一个index下只能有一个type),jdbc块和elasticsearch块也是 一 一 对应的关系,具体看下同步conf的配置(示例配置文件名称: singledb.conf):
    # 创建配置文件
    [root@localhost logstash-6.4.3]# vi config/singledb.conf
    # 内容
    input {
        stdin {
        }
        jdbc {
          # mysql jdbc connection 连接地址
          jdbc_connection_string => "jdbc:mysql://10.2.129.166:3306/log_test?serverTimezone=Asia/Shanghai&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
          # 登录数据库的用户名、密码
          jdbc_user => "root"
          jdbc_password => "1234"
          # jdbc 驱动包路径
          jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"      # 连接驱动类名
          jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          statement_filepath => "/usr/local/logstash-6.4.3/config/jdbc.sql"
          # 以下表示定时执行任务,使用cron表达式,本文只全量同步一次,所以不配置定时器,如果要实现增量更新,需要配合定时器以及上次查询的最后一个值,具体要根据你的业务来定。
          # schedule => "* * * * *"
          type => "jdbc"
        }
    }
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
    output {
        elasticsearch {
            hosts => ["192.168.77.132:9200"]
            index => "mynews"
            document_id => "%{id}"
            user => elastic
            password => xW9dqAxThD5U4ShQV1JT
        }
        stdout {
            codec => json_lines
        }
    }
    
    #  复制jdbc jar  文件 
    [root@localhost logstash-6.4.3]# cp /root/mysql-connector-java-8.0.17/mysql-connector-java-8.0.17.jar config
    
    # 启动同步
    [root@localhost logstash-6.4.3]# ./bin/logstash -f config/singledb.conf
    
    

    注意:配置文件中的jar与sql一定要使用绝对路径。
    启动同步,同步的内容会输出到屏幕上。执行完成后可在es中查看数据。

    同步结果
    kibana中查看
    结果
    1. 多数据源同步
      多数据源同步是指,需要同步多种类型的数据到es中,input的配置添加相应的jdbc模块,output中根据type类型判断添加对应的elasticsearch模块:
    input {
        jdbc {
            jdbc_connection_string => "jdbc:mysql://192.168.91.149:3306/test"
            jdbc_user => "root"
            jdbc_password => "1234"
            # 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
            jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"
            jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
            jdbc_paging_enabled => "true"
            jdbc_page_size => "50000"
            # 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
            statement_filepath =>  "/usr/local/logstash-6.4.3/config/jdbc0.sql"
            type => "user"
        }
        jdbc {
            jdbc_connection_string => "jdbc:mysql://192.168.91.149:3306/test"
            jdbc_user => "root"
            jdbc_password => "1234"
            # 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
            jdbc_driver_library => "/usr/local/logstash-6.4.3/config/mysql-connector-java-8.0.17.jar"
            jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
            jdbc_paging_enabled => "true"
            jdbc_page_size => "50000"
            # 此处的路径最好是绝对路径,相对路径取决与允许命令的目录
            statement_filepath =>  "/usr/local/logstash-6.4.3/config/jdbc1.sql"
            type => "user_address"
        }
    }
    output {
        #设置窗口日志输出
        stdout {
            codec => json_lines
        }
           # 与JDBC定义的type对应
        if[type] == "user" {
            elasticsearch {
                hosts => ["192.168.77.132:9200"]
                # 注意index的值不支持大写字母
                index => "user"
                # document_type自行设置,不设置时,默认为doc
                # document_type => ""
                # 此处的值来自查询sql中的列名称,根据需要自行配置
                document_id => "%{id}"
                           user => elastic
                           password => xW9dqAxThD5U4ShQV1JT
            }
        }
        if[type] == "user_news" {
            elasticsearch {
                hosts => ["192.168.77.132:9200"]
                # 注意index的值不支持大写字母
                index => "user_news"
                # document_type自行设置,不设置时,默认为doc
                # document_type => ""
                # 此处的值来自查询sql中的列名称,根据需要自行配置
                document_id => "%{id}"
                           user => elastic
                           password => xW9dqAxThD5U4ShQV1JT
            }
        }
        
    }
    
    
    1. 全量同步
      以上的单数据源/多数据源同步都是全量同步,即没有任何条件地进行同步。

    2. 增量同步
      增量同步需要在jdbc模块添加相应的增量配置

    配置参数
    Schedule:是cron格式的同步周期,其它几个都是用来记录同步增量指标的,
    Tracking_column:是数据库中的增量指标字段名
    Tracking_columu_type:目前只支持两种numeric,timestamp,
    Last_run_metadata_path:是保存上次同步的增量指标值。
    而 :sql_last_value如果input里面use_column_value => true, 即如果设置为true的话,可以是我们设定的字段的上一次的值。
    默认 use_column_value => false, 这样 :sql_last_value为上一次更新的最后时刻值。
    也就是说,对于新增的值,才会更新。这样就实现了增量更新的目的。

    四、相关配置

    1. 持久队列基本配置(pipelines.yml)
    queue.type:persisted    # 默认是memory
    queue.max_bytes:4gb     # 队列存储最大数据量
    
    1. 线程相关配置(logstash.yml)
    pipeline.worksers | -w
    # pipeline线程数,即filter_output的处理线程数,默认是cpu核数
    pipeline.batch.size | -b
    # Batcher一次批量获取的待处理文档数,默认是125,可以根据输出进行调整,越大会占用越多的heap空间,可以通过jvm.options调整
    pipeline.batch.delay | -u
    # Batcher等待的时长,单位为ms
    
    
    1. Logstash配置文件:
      logstash设置相关的配置文件(在conf文件夹中)
      logstash.yml:logstash相关配置,比如node.name、path.data、pipeline.workers、queue.type等,这其中的配置可以被命令行参数中的相关参数覆盖
      jvm.options:修改jvm的相关参数,比如修改heap size等
      pipeline配置文件:定义数据处理流程的文件,以.conf结尾
      logstash.yml配置项:
    node.name:   节点名称,便于识别
    path.data:   持久化存储数据的文件夹,默认是logstash home目录下的data
    path.config: 设定pipeline配置文件的目录(如果指定文件夹,会默认把文件夹下的所有.conf文件按照字母顺序拼接为一个文件)
    path.log:    设定pipeline日志文件的目录
    pipeline.workers: 设定pipeline的线程数(filter+output),优化的常用项
    pipeline.batch.size/delay: 设定批量处理数据的数据和延迟
    queue.type: 设定队列类型,默认是memory
    queue.max_bytes: 队列总容量,默认是1g
    
    

    五、常见问题

    1. Error: Java::JavaSql::SQLException: null, message from server: "Host 'DESKTOP-T92R1EE' is not allowed to connect to this MySQL server"
      数据库没有开启远程连接
    select user,host from user;
    # 在Mysql数据库中执行以下sql
    update user set host='%' where user ='root';
    flush privileges;
    

    最好重启下Mysql


    执行SQL

    相关文章

      网友评论

        本文标题:数据采集--Logstash(四)

        本文链接:https://www.haomeiwen.com/subject/zgpvsctx.html