美文网首页diboot
如何通过Logstash同步多表关联数据至Elasticsear

如何通过Logstash同步多表关联数据至Elasticsear

作者: 一个鸡蛋壳儿 | 来源:发表于2019-08-07 21:53 被阅读0次

    如果你对 使用Logstash保持Elasticsearch与数据库同步 方案还不是很熟悉,建议先花点时间精读它。
    上面的文章以单表同步场景为例,清楚讲述了如何通过JDBC同步数据至ES,而对于实际开发中经常出现的多表关联同步并未提及,以下是我针对多表关联同步的趟坑过程希望对你有所帮助。

    数据库表的约定原则

    同步单表时我们对于表字段的约定:

    • 表中要有主键字段(如id),最近变更时间字段(如modification_time),软删除标记字段(如is_deleted),以便jdbc-input数据采集的轮询Job可以识别出增量变动的数据。
    • 提示:jdbc input轮询需要基于modification_time条件查询,所以给该字段加上索引。

    多表关联同步方案

    多表关联的情况下我们需要JOIN其他表查询得到结果,这个结果就是ES需要的打平后的宽表。ES新的版本中也增加了join操作,但这事不是ES擅长的,我们选择交给更擅长的数据库处理,让ES只存储打平后的单层索引。

    如果你理解单表同步而困惑多表关联同步的话,试着将关联查询的复杂SQL想象(定义)为视图,是不是后续操作就跟单表没区别了!

    我们来逐个看下多表关联的同步问题 (假设表a多对多关联表b):

    • 单表的id字段绑定到ES document的_id,可以实现ES索引幂等性,不会出现job原因导致索引文档重复。那对于多表关联的情况呢,可以使用各表id的组合作为document的_id。如SELECT:

      concat(a.id, '_', b.id) AS docid
      

      (如果你不关注幂等,也可以用_id默认生成策略。)

    • 单表基于modification_time就可以识别出自上次轮询后新的变化数据,多表关联的情况呢也类似:

      (CASE WHEN a.modification_time > b.modification_time THEN a.modification_time ELSE b.modification_time END) AS modification_time
      
    • 同理软删除字段is_deleted的处理逻辑:

      (CASE WHEN a.is_deleted=0 AND b.is_deleted=0 THEN 0 ELSE 1 END) AS is_deleted
      

      这样无论表a还是表b发生变更,都可以被logstash识别出来采集到。

    如此我们就可以写出多表关联同步的SQL了,为了方便更新维护SQL及保持logstash-jdbc端conf配置文件的简洁,你可以把SQL定义成一张视图,conf文件中的SQL statement可以像写单表处理一样了。

    示例conf:

    input {
      jdbc {
        jdbc_driver_library => "../drivers/mysql-connector-java-8.0.16.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/es_db?serverTimezone=UTC"
        jdbc_user => "usr"
        jdbc_password => "pwd"
        jdbc_paging_enabled => true
        tracking_column => "unix_ts_in_secs"
        use_column_value => true
        tracking_column_type => "numeric"
        schedule => "*/5 * * * * *"
        statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM esview WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
      }
    }
    filter {
      mutate {
        copy => { "docid" => "[@metadata][_id]"}
        remove_field => ["docid", "@version", "unix_ts_in_secs"]
      }
    }
    output {
      elasticsearch {
          index => "test_idx"
          document_id => "%{[@metadata][_id]}"
      }
    }
    

    diboot 简单高效的轻代码开发框架 (求star)

    相关文章

      网友评论

        本文标题:如何通过Logstash同步多表关联数据至Elasticsear

        本文链接:https://www.haomeiwen.com/subject/fehzdctx.html