本文集主要是总结自己在项目中使用ES 的经验教训,包括各种实战和调优。
需求
为实现将mysql数据导入到elasticsearch中进行调研,发现使用插件可以很好地实现该功能。所以进行测试环境搭建,主要的参考链接如下。
环境搭建链接:http://blog.csdn.net/yeyuma/article/details/50240595#quote
input-jdbc官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
实现原理
通过执行一条sql语句,将每一行查询结果生成为一个input events。
Important Fields:
1.statement:任何有效的sql声明都会执行,根据需求可以写复杂的sql语句,如使用INNER JOIN 或UNION ALL等
2.parameters:通过:<filedname>引用语法可以在sql声明中指定参数值
3.schedule:在执行sql时,可以同时进行有效的调度(这句话不是很理解)
安装过程
因为测试机不予许使用install、apt-get等方式,所以我们主要采用wget {url}或者本地下载好传到测试机的方式进行插件的下载。
需要安装一下插件:
1.安装gem:wget https://rubygems.org/rubygems/rubygems-2.6.11.tgz 参照链接修改数据源地址为https://ruby.taobao.org
2.logstash-input-jdbc下载:wget https://github.com/logstash-plugins/logstash-input-jdbc/archive/v4.1.3.tar.gz
可通过https://github.com/logstash-plugins/logstash-input-jdbc/releases进行版本选择,然后复制下载链接以后通过wget下载,也可下载以后传到测试机上。
3.下载mysql-connector-java-5.1.36-bin.jar:直接百度搜索以后从csdn上下载传到测试机上。
4.jdbc.conf 和jdbc.sql文件,见文档底部
启动过程
执行如下命令: ./bin/logstash -f /home/appops/logstash-5.3.0/conf/jdbc.conf
同时通过curl命令查询相关信息。
logstash-input-jdbc 相关用法总结
Example:
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "* * * * *"
statement => "SELECT * from songs where artist = :favorite_artist"
}
}
也可以通过设置statement_filepath的方式来指定sql语句。
内置字段声明:可参照官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_predefined_parameters
sql_last_value:用来计算数据执行到哪一行,一般为语句插入es的时间,如果clean_run设置为true则会忽略sql_last_value的值。
设置use_column_value为true且tracking_column设置为数据库中的某一个字段,则可实现自定义判断条件,默认起始为0。
必需配置项:
jdbc_connection_string => ...
jdbc_driver_class => ...
jdbc_user => ...
可选配置项:
add_field : Value type is [hash](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#hash) ;Add a field to an event
charset : Value type is string;默认utf-8,可以用columns_charset给特殊的列设置charset。
columns_charset:Value type is [hash](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#hash);columns_charset => { "column0" => "ISO-8859-1" } //只对column0修改charset
clean_run:Value type is boolean;Default value is false ;Whether the previous run state should be preserved
codec:Value type is codec;Default value is "plain" The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.
connection_retry_attempts:Default value is 1 Maximum number of times to try connecting to database
connection_retry_attempts_wait_time:Default value is 0.5 Number of seconds to sleep between connection attempts
enable_metric:Value type is [boolean](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#boolean);Default value is true
id:暂时未整理,涉及多个相同类型的插件时会用到。
jdbc_default_timezone:设置默认时区,这个很有用,Asia/Shanghai
jdbc_driver_library:设置jdbc_driver库,可以使用第三方库的方法。
jdbc_fetch_size:可能就是限制jdbc读入数据大小
的地方,还没有测试。如果没有设置,则会使用各自驱动的默认值。默认值为Integer.MIN_VALUE。
jdbc_page_size:默认是100000;This will cause a sql statement to be broken up into multiple queries. Each query will use limits and offsets to collectively retrieve the full result-set. The limit size is set with jdbc_page_size.主要为取数据的sql设置limit和offsets
jdbc_paging_enabled:默认是false,设置为true以后可以自定义jdbc_page_size的大小。
jdbc_password:
jdbc_password_filepath:
jdbc_pool_timeout:后面的就先自己看看,暂不总结
关于使用logstash_input_jdbc中的sql_last_value比标准时间提前8小时的解决方案
由于提前8小时,导致logstash_input_jdbc通过时间作为判断对新增数据写入es时会不停写入最近8小时的数据,导致version剧增。通过看文档从配置文件入手和从sql语句入手两个思路来解决。建议推荐使用方法2。
方法一:
修改sql语句写法
select
h.id as id,
h.title as title,
h.update_time as update_time
from
articles h
where
h.update_time >= DATE_ADD(:sql_last_value,INTERVAL 8 HOUR)
借助DATE_ADD方法实现对:sql_last_value的自增八小时。
方法二:
可通过jdbc_default_timezone属性设置任意标准时区。如 Asia/Shanghai
注:添加在jdbc.conf里。可参照最后整理的搭建环境文档。
相关配置和jar包可以从网盘下载。
链接: https://pan.baidu.com/s/1HVhuTOvJJ4ux3BL61nBVOA 提取码: q6cn
网友评论