美文网首页
使用ELK离线同步Mysql数据(非实时)

使用ELK离线同步Mysql数据(非实时)

作者: DreamsonMa | 来源:发表于2019-04-20 00:42 被阅读0次

    该文内容为利用Logstash的 logstash-input-jdbc 插件同步Mysql数据,实现Mysql到ElasticSearch的数据异构。这个方案非实时,调度任务最短间延1m。

    新增方案

    Logstashinput-jdbc-plugin插件天然支持全量,增量同步。

    更新方案

    使用一个更新时间的字段,作为每次Logstash增量更新的tracking_column。这样Logstash每次增量更新就会根据更新时间来作为标记。索引的document_id必须是数据库中的主键, 这样在每次增量更新的时候, 之前ID相同的数据就会被覆盖, 从而达到update的效果。

    删除方案

    删除是建立在上面更新的原理之上, 就是再加一个删除标记的字段作软删除。

    安装ELK

    很简单,按官网文档操作一遍就行。这里给出安装文档连接。文档写得很清晰,基本一看就懂,安装方法也多样。完美。Elasticsearch | Logstash | Kibana

    核心配置

    添加配置后,重启logstash,等一会儿就能看到结果。配置很简单,对着文档看看就能理解。快速略过看结果。。

    [root@local14 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
    [root@local14 ~]# mkdir -p /tmp/logstash ;touch /tmp/logstash/last_run_value.txt
    [root@local14 ~]# cat /etc/logstash/conf.d/logstash-mysql.conf 
    input {
        stdin {
        }
        jdbc {
            jdbc_connection_string => "jdbc:mysql://192.168.56.101:3306/test?autoReconnect=true&characterEncoding=UTF8"
            jdbc_user => "root"
            jdbc_password => "123456"
            jdbc_driver_library => "/mnt/mysql-connector-java-5.1.30.jar"
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            jdbc_paging_enabled => "true"
            jdbc_page_size => "1000"
            statement => "SELECT * from user where  test_id > :sql_last_value"
            schedule => "* * * * *"
            record_last_run => true
            last_run_metadata_path => "/tmp/logstash/last_run_value.txt"
            use_column_value => true
            lowercase_column_names =>true
            tracking_column => "test_id"
            tracking_column_type => "numeric"
            type => "jdbc"
        }
    }
    
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
    
    output {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "test"
        document_type => "user"
        document_id => "%{test_id}"
        #user => "elastic"
        #password => "changeme"
      }
      stdout {
        codec => json_lines
      }
    }
    [root@local14 ~]# systemctl start logstash ; systemctl enable logstash
    [root@local14 ~]# tail -f /var/log/logstash/logstash-plain.log 
    [2019-04-19T20:42:00,231][INFO ][logstash.inputs.jdbc     ] (0.000951s) SELECT version()
    [2019-04-19T20:42:00,236][INFO ][logstash.inputs.jdbc     ] (0.002074s) SELECT version()
    [2019-04-19T20:42:00,263][INFO ][logstash.inputs.jdbc     ] (0.010025s) SELECT count(*) AS `count` FROM (SELECT * from user where  test_id > 491604) AS `t1` LIMIT 1
    [2019-04-19T20:43:00,045][INFO ][logstash.inputs.jdbc     ] (0.000790s) SELECT version()
    [2019-04-19T20:43:00,053][INFO ][logstash.inputs.jdbc     ] (0.000930s) SELECT version()
    [2019-04-19T20:43:00,058][INFO ][logstash.inputs.jdbc     ] (0.001285s) SELECT count(*) AS `count` FROM (SELECT * from user where  test_id > 491604) AS `t1` LIMIT 1
    [2019-04-19T20:43:00,101][INFO ][logstash.inputs.jdbc     ] (0.015010s) SELECT * FROM (SELECT * from user where  test_id > 491604) AS `t1` LIMIT 1000 OFFSET 0
    

    对比同步结果

    数据库结果 Kibana中结果

    存在问题

    第一,从上面结果,可以看到任务每分钟执行一次,也就是有1分钟的间延。因此,并不适合做实时性要求高的处理。对实时性要求高的,可以使用canal+撸代码解决。
    第二,只能做软删除,如果有硬删要求,还请自行撸代码。
    第三,比对的标为更新时间,如果出现遗漏处理情况,请自行脑补。

    Mark些ES设计技巧

    先记录下,后面会另外写篇详细点的article。

    1、多表关联查询
    canal -> mq -> db -> es 多表关联(mq-db 用主键关联查),存部分字段即可。通过es查出主键,拿主键再查db (es相当于索引库)

    2、单表查询
    canal -> mq -> es 单表 ,存全量字段,查询只走es (es做db用)

    相关文章

      网友评论

          本文标题:使用ELK离线同步Mysql数据(非实时)

          本文链接:https://www.haomeiwen.com/subject/wayhgqtx.html