美文网首页
[2] 如何快速将存量mysql 数据导入到ES

[2] 如何快速将存量mysql 数据导入到ES

作者: 不怕天黑_0819 | 来源:发表于2020-08-06 15:16 被阅读0次

    本文集主要是总结自己在项目中使用ES 的经验教训,包括各种实战和调优。

    需求

    为实现将mysql数据导入到elasticsearch中进行调研,发现使用插件可以很好地实现该功能。所以进行测试环境搭建,主要的参考链接如下。

    环境搭建链接:http://blog.csdn.net/yeyuma/article/details/50240595#quote

    input-jdbc官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html


    实现原理

    通过执行一条sql语句,将每一行查询结果生成为一个input events。


    Important Fields:

    1.statement:任何有效的sql声明都会执行,根据需求可以写复杂的sql语句,如使用INNER JOIN 或UNION ALL等
    2.parameters:通过:<filedname>引用语法可以在sql声明中指定参数值
    3.schedule:在执行sql时,可以同时进行有效的调度(这句话不是很理解)


    安装过程

    因为测试机不予许使用install、apt-get等方式,所以我们主要采用wget {url}或者本地下载好传到测试机的方式进行插件的下载。

    需要安装一下插件:

    1.安装gem:wget https://rubygems.org/rubygems/rubygems-2.6.11.tgz 参照链接修改数据源地址为https://ruby.taobao.org

    2.logstash-input-jdbc下载:wget https://github.com/logstash-plugins/logstash-input-jdbc/archive/v4.1.3.tar.gz

    可通过https://github.com/logstash-plugins/logstash-input-jdbc/releases进行版本选择,然后复制下载链接以后通过wget下载,也可下载以后传到测试机上。

    3.下载mysql-connector-java-5.1.36-bin.jar:直接百度搜索以后从csdn上下载传到测试机上。

    4.jdbc.conf 和jdbc.sql文件,见文档底部


    启动过程

    执行如下命令: ./bin/logstash -f /home/appops/logstash-5.3.0/conf/jdbc.conf

    同时通过curl命令查询相关信息。


    logstash-input-jdbc 相关用法总结

    Example:

    input {
      jdbc {
        jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
        jdbc_user => "mysql"
        parameters => { "favorite_artist" => "Beethoven" }
        schedule => "* * * * *"
        statement => "SELECT * from songs where artist = :favorite_artist"
      }
    }
    

    也可以通过设置statement_filepath的方式来指定sql语句。

    内置字段声明:可参照官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_predefined_parameters

    sql_last_value:用来计算数据执行到哪一行,一般为语句插入es的时间,如果clean_run设置为true则会忽略sql_last_value的值。

    设置use_column_value为true且tracking_column设置为数据库中的某一个字段,则可实现自定义判断条件,默认起始为0。


    必需配置项:

    jdbc_connection_string => ... 
    jdbc_driver_class => ... 
    jdbc_user => ...
    

    可选配置项:

    add_field : Value type is [hash](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#hash) ;Add a field to an event
    
    charset : Value type is string;默认utf-8,可以用columns_charset给特殊的列设置charset。
    
    columns_charset:Value type is [hash](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#hash);columns_charset => { "column0" => "ISO-8859-1" } //只对column0修改charset
    
    clean_run:Value type is boolean;Default value is false ;Whether the previous run state should be preserved
    
    codec:Value type is codec;Default value is "plain" The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.
    
    connection_retry_attempts:Default value is 1 Maximum number of times to try connecting to database
    
    connection_retry_attempts_wait_time:Default value is 0.5 Number of seconds to sleep between connection attempts
    
    enable_metric:Value type is [boolean](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#boolean);Default value is true
    
    id:暂时未整理,涉及多个相同类型的插件时会用到。
    
    jdbc_default_timezone:设置默认时区,这个很有用,Asia/Shanghai
    
    jdbc_driver_library:设置jdbc_driver库,可以使用第三方库的方法。
    
    jdbc_fetch_size:可能就是限制jdbc读入数据大小
    
    的地方,还没有测试。如果没有设置,则会使用各自驱动的默认值。默认值为Integer.MIN_VALUE。
    
    jdbc_page_size:默认是100000;This will cause a sql statement to be broken up into multiple queries. Each query will use limits and offsets to collectively retrieve the full result-set. The limit size is set with jdbc_page_size.主要为取数据的sql设置limit和offsets
    
    jdbc_paging_enabled:默认是false,设置为true以后可以自定义jdbc_page_size的大小。
    
    jdbc_password:
    
    jdbc_password_filepath:
    
    jdbc_pool_timeout:后面的就先自己看看,暂不总结
    


    关于使用logstash_input_jdbc中的sql_last_value比标准时间提前8小时的解决方案

    由于提前8小时,导致logstash_input_jdbc通过时间作为判断对新增数据写入es时会不停写入最近8小时的数据,导致version剧增。通过看文档从配置文件入手和从sql语句入手两个思路来解决。建议推荐使用方法2。

    方法一:
    修改sql语句写法
    select 
        h.id as id, 
        h.title as title, 
        h.update_time as update_time
    from 
        articles h
    where 
        h.update_time >= DATE_ADD(:sql_last_value,INTERVAL 8 HOUR)
    借助DATE_ADD方法实现对:sql_last_value的自增八小时。
    
    方法二:
    可通过jdbc_default_timezone属性设置任意标准时区。如 Asia/Shanghai
    注:添加在jdbc.conf里。可参照最后整理的搭建环境文档。
    

    相关配置和jar包可以从网盘下载。
    链接: https://pan.baidu.com/s/1HVhuTOvJJ4ux3BL61nBVOA 提取码: q6cn

    相关文章

      网友评论

          本文标题:[2] 如何快速将存量mysql 数据导入到ES

          本文链接:https://www.haomeiwen.com/subject/gkiirktx.html