该文内容为利用Logstash的 logstash-input-jdbc 插件同步Mysql数据,实现Mysql到ElasticSearch的数据异构。这个方案非实时,调度任务最短间延1m。
新增方案
Logstash
的input-jdbc-plugin
插件天然支持全量,增量同步。
更新方案
使用一个更新时间的字段,作为每次Logstash
增量更新的tracking_column
。这样Logstash
每次增量更新就会根据更新时间来作为标记。索引的document_id
必须是数据库中的主键, 这样在每次增量更新的时候, 之前ID相同的数据就会被覆盖, 从而达到update的效果。
删除方案
删除是建立在上面更新的原理之上, 就是再加一个删除标记的字段作软删除。
安装ELK
很简单,按官网文档操作一遍就行。这里给出安装文档连接。文档写得很清晰,基本一看就懂,安装方法也多样。完美。Elasticsearch | Logstash | Kibana
核心配置
添加配置后,重启logstash
,等一会儿就能看到结果。配置很简单,对着文档看看就能理解。快速略过看结果。。
[root@local14 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
[root@local14 ~]# mkdir -p /tmp/logstash ;touch /tmp/logstash/last_run_value.txt
[root@local14 ~]# cat /etc/logstash/conf.d/logstash-mysql.conf
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.56.101:3306/test?autoReconnect=true&characterEncoding=UTF8"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "/mnt/mysql-connector-java-5.1.30.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "1000"
statement => "SELECT * from user where test_id > :sql_last_value"
schedule => "* * * * *"
record_last_run => true
last_run_metadata_path => "/tmp/logstash/last_run_value.txt"
use_column_value => true
lowercase_column_names =>true
tracking_column => "test_id"
tracking_column_type => "numeric"
type => "jdbc"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "test"
document_type => "user"
document_id => "%{test_id}"
#user => "elastic"
#password => "changeme"
}
stdout {
codec => json_lines
}
}
[root@local14 ~]# systemctl start logstash ; systemctl enable logstash
[root@local14 ~]# tail -f /var/log/logstash/logstash-plain.log
[2019-04-19T20:42:00,231][INFO ][logstash.inputs.jdbc ] (0.000951s) SELECT version()
[2019-04-19T20:42:00,236][INFO ][logstash.inputs.jdbc ] (0.002074s) SELECT version()
[2019-04-19T20:42:00,263][INFO ][logstash.inputs.jdbc ] (0.010025s) SELECT count(*) AS `count` FROM (SELECT * from user where test_id > 491604) AS `t1` LIMIT 1
[2019-04-19T20:43:00,045][INFO ][logstash.inputs.jdbc ] (0.000790s) SELECT version()
[2019-04-19T20:43:00,053][INFO ][logstash.inputs.jdbc ] (0.000930s) SELECT version()
[2019-04-19T20:43:00,058][INFO ][logstash.inputs.jdbc ] (0.001285s) SELECT count(*) AS `count` FROM (SELECT * from user where test_id > 491604) AS `t1` LIMIT 1
[2019-04-19T20:43:00,101][INFO ][logstash.inputs.jdbc ] (0.015010s) SELECT * FROM (SELECT * from user where test_id > 491604) AS `t1` LIMIT 1000 OFFSET 0
对比同步结果
数据库结果 Kibana中结果存在问题
第一,从上面结果,可以看到任务每分钟执行一次,也就是有1分钟的间延。因此,并不适合做实时性要求高的处理。对实时性要求高的,可以使用canal+撸代码解决。
第二,只能做软删除,如果有硬删要求,还请自行撸代码。
第三,比对的标为更新时间,如果出现遗漏处理情况,请自行脑补。
Mark些ES设计技巧
先记录下,后面会另外写篇详细点的article。
1、多表关联查询
canal -> mq -> db -> es 多表关联(mq-db 用主键关联查),存部分字段即可。通过es查出主键,拿主键再查db (es相当于索引库)
2、单表查询
canal -> mq -> es 单表 ,存全量字段,查询只走es (es做db用)
网友评论