美文网首页
Logstash 从Mysql同步数据到ES

Logstash 从Mysql同步数据到ES

作者: 54番茄 | 来源:发表于2021-08-19 18:29 被阅读0次

       从Mysql导出上亿数据到ES中,Logstash可所谓很方便的工具了,ES详细的安装步骤前面文章有写(ElasticSearch + Kibana基础搭建),就不赘述了。

    这节主要讲下Logstash的使用:Download Logstash官网地址

    首先下载logstash-7.12.0上传到linux服务器然后解压即可,版本最好是跟es的保持一致

    安装后的目录

    1.进入logstash-7.12.0文件夹

    $ cd logstash-7.12.0/
    

    2.进入logstash-7.12.0安装目录,新建一个文件夹,这里面等会需要放你的配置文件,当然你也可以直接放在安装目录

    $ mkdir my
    

    3.进入新建文件夹

    $ cd my
    

    4.创建一个配置文件,这个conf中配置你的mysql链接、过滤规则以及es的地址和索引模板

    $ touch logstash-test-log-sync.conf
    

    5.下载mysql的驱动到my文件夹下

       mysql-connector-java-8.0.19.jar
    

    6.如果你的sql很复杂,最好是使用文件sql文件,然后在conf中配置sql文件的路径,当然你也可以直接写在conf文件中。我这里选择使用sql文件,新建一个sql文件

    $ touch  my.sql 
    写入自己的sql
    $ vim  my.sql 
    

    自定义my文件夹的目录结构

    my文件夹下

    完整的logstash-test-log-sync.conf配置

    input {
        jdbc {
            # 设置 MySql/MariaDB 数据库url以及数据库名称
            jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/ps_manager_test?useUnicode=true&allowMultiQuerie=true&characterEncoding=utf-8&serverTimezone=UTC"
            # 用户名和密码
            jdbc_user => "root"
            jdbc_password => "123456"
            # 数据库驱动mysql-connector-java-8.0.19.jar所在位置,可以是绝对路径或者相对路径
            jdbc_driver_library => "/usr/local/nbin/logstash-7.12.0/my/mysql-connector-java-8.0.19.jar"
            # 驱动类名
            jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
             # 是否开启分页,ture为开启,我这里sql比较复杂所以就放弃使用这个,后面细讲
            jdbc_paging_enabled => false
             # 分页每页数量
            jdbc_page_size => "50"
            # 设置时区
            jdbc_default_timezone =>"Asia/Shanghai"
            # 执行的sql文件路径
            statement_filepath => "/usr/local/nbin/logstash-7.12.0/my/my.sql"
            #使用这个可以直接写sql语句,但是复杂的语句最好是写在文件内
            #statement =>
            # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
            schedule => "* * * * *"
            #是否需要记录某个字段值,如果为true,我们可以自定义要记录的数据库某个字段值,例如id或date字段。如果为false,记录的是上次执行的标记,默认是一个timestamp
            use_column_value => true
            #记录上次执行字段值路径。我们可以在sql语句中这么写:WHERE ID > :last_sql_value。其中 :sql_last_value 取得就是该文件中的值,这个last_id会以文件形式存在,上面截图有
            last_run_metadata_path => "/usr/local/nbin/logstash-7.12.0/my/last_id"
            #如果use_column_value为真,需配置此参数. 指定增量更新的字段名。当然该字段必须是递增的,比如id或date字段。
            tracking_column => "id"
            # tracking_column 对应字段的类型,只能选择timestamp或者numeric(数字类型),默认numeric,所以可以不写这个配置
            tracking_column_type => "numeric"
           #如果为true,每次会记录所更新的字段的值,并保存到 last_run_metadata_path 指定的文件中
            record_last_run => true
            # 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
            clean_run => false
             # 是否将字段名称转小写。默认是true。这里注意Elasticsearch是区分大小写的
            lowercase_column_names => false
        } 
    }
    ##过滤、格式化数据 这段单独有讲解,这里就不细说了
    filter{
        mutate {
            add_field => {"temp_ts" => "%{actionTimeStamp}"}
        } 
        date {
             match => ["temp_ts", "ISO8601"]
             target => "@timestamp"
        }   
        mutate  { 
            remove_field => ["@version","temp_ts","actionTimeStamp"]
        } 
    }
    output {
        elasticsearch {
             # es地址 集群数组hosts => ["127.0.0.1:9200","127.0.0.1:9201"]     
            hosts => ["127.0.0.1:9200","127.0.0.1:9201"] 
           # 同步的索引名必须要有@timestamp  不然yyyyMM不起效
            index => "ps_sign_log%{+yyyy}"
            # 设置_docID和数据相同
            document_id => "%{id}"
            #自定的模板名称
            template_name => "ps_seal_log"
            #自定义的模板配置文件
            template => "/usr/local/nbin/logstash-7.12.0/my/ps_test_log_template.json"
            #是否重写模板
            template_overwrite => true 
        }
      # 日志输出形式设置
        stdout {
            codec => json
            #codec => rubydebug
        }
    }
    

    其他的配置已经在上面的conf中注释了,这里主要单独说一下jdbc_paging_enabledjdbc_page_size这两个参数
    jdbc_paging_enabled: 开启JDBC启用分页

    • 值类型为布尔值
    • 默认值为 false

    jdbc_page_size:这将导致sql语句分解为多个查询。每个查询将使用限制和偏移量来集体检索完整的结果集。限制大小通过设置jdbc_page_size

    • 值类型是数字
    • 默认值为 100000
      分批处理的结果集意思就是,比如你sql里设置了查询1000条数据,而你jdbc_page_size设置的是500,那么他就会分为两个sql语句,第一条是0~500,第二条是500~1000,你可以观察下日志,最后打印的全部sql是这样的:
    SELECT * FROM (你的sql查询语句) AS `t1` LIMIT 0 OFFSET 500
    
    SELECT * FROM (你的sql查询语句) AS `t1` LIMIT 500 OFFSET 500
    

    注意哈,别理解成它是帮你sql设定大小分页
    比如我开始的sql,是这样写的,因为数据量特别大,所以一直无法返回数据:

    错误
    select id,seal_id,user_id,business_type,action_time from ps_log  where id > :sql_last_value
    正确✔️
    select id,seal_id,user_id,business_type,action_time from ps_log  where id > :sql_last_value limit 1000
    

    ps_test_log_template自定义模板的说明

    ES会根据你输入的数据,自己会对数据类型映射,所以你也可以不用自定义模板,也就是说下面这个三个配置可以去掉,也能正常同步数据到ES

    #自定的模板名称
    template_name => "ps_seal_log"
    #自定义的模板配置文件
    template => "/usr/local/nbin/logstash-7.12.0/my/ps_test_log_template.json"
    #是否重写模板
    template_overwrite => true 
    

    自定义模板索引千万注意不要加这个,不然logstash不会去es创建索引模板。

    manage_template => false
    

    如果自定义模板ps_test_log_template,在启动logstash时,会有打印

    启动日志会创建索引模板的打印

    模板索引创建详见之前的文章 ElasticSearch + Kibana基础搭建

    最后启动Logstash

    ./bin/logstash -f ./my/logstash-seal-log-sync.conf
    

    相关文章

      网友评论

          本文标题:Logstash 从Mysql同步数据到ES

          本文链接:https://www.haomeiwen.com/subject/rkqpvltx.html