美文网首页
logstash 增量同步mysql的一对多记录到ES的Join

logstash 增量同步mysql的一对多记录到ES的Join

作者: 觉释 | 来源:发表于2020-08-05 10:32 被阅读0次

    mysql 数据模型

    在这里插入图片描述
    • esdatabase 的建库语句
    CREATE DATABASE IF NOT EXISTS esdatabase DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
    
    
    • brand建表语句
    CREATE TABLE `brand` (
      `ability_id`  int auto_increment NOT NULL,
      `brand_name` varchar(128) DEFAULT NULL,
      PRIMARY KEY (`ability_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    • product 建表语句
    CREATE TABLE `product` (
      `product_id`  int auto_increment NOT NULL,
      `brand_name` varchar(128) DEFAULT NULL,
      `product_name` varchar(128) DEFAULT NULL,
      `product_price`   float(5,2)  DEFAULT NULL,
      `product_color` varchar(128) DEFAULT NULL,
      PRIMARY KEY (`product_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    

    product 表没有创建外键。这里还是看个人观点吧,添加外键有可能影响性能,但是需要应用维护一致性。总之,要维护product和brand的一致性。

    • 插入测试数据
    insert brand(brand_name) value ('Nike'),('Address'),('360');
    insert product(brand_name,product_name,product_price,product_color) value ('1','shoe',31,'red'),('1','cloth',32,'green'),('1','trouser',55,'black');
    insert product(brand_name,product_name,product_price,product_color) value ('3','shoe',11,'white'),('3','cloth',32,'green'),('3','trouser',18,'pink');
    insert product(brand_name,product_name,product_price,product_color) value ('4','shoe',111,'red'),('4','cloth',132,'purple'),('4','trouser',155,'black');
    
    

    ES 创建索引

    http://x.x.x.x:9200/testjon
    
    
    {
      "mappings": {
        "doc": {
          "dynamic": "strict",
          "properties": {
            "type": {
              "type": "text"
            },
             "ability_id": {
              "type": "integer"
            },
             "product_id": {
              "type": "integer"
            },
                    "brand_name": {
              "type": "integer"
            },
            "brand_name": {
              "type": "text"
            },
            "product_name": {
              "type": "text"
            },
            "product_price": {
              "type": "float"
            },
            "product_color": {
              "type": "text"
            },
            "product_list": {
              "type": "join",
              "relations": {
                "brand": "product"
              }
            }
          }
        }
      }
    }
    
    

    type ability_id product_id brand_name 字段其实没有什么用途,我是被迫添加的。
    因为output阶段需要使用这些字段,所以不能再filter阶段删除掉,但是output阶段又没有删除字段的功能
    这个问题困惑着我,还请各位大神指点一下。。。。

    logstash 配置文件

    logstash 用了jdbc和alter两个插件,要是大家不方便安装的话,QQ群里有我安装好的tar包,QQ群号在底部。

    input {
        jdbc {
          type => "brand"
          jdbc_connection_string => "jdbc:mysql://1.1.1.1:3306/esdatabase" 
          jdbc_user => "esuser"    
          jdbc_password => "123.com"
          jdbc_driver_library => "/app/logstash/mysql-connector-java-5.1.47.jar"
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "1000"
          last_run_metadata_path => "/app/logstash/logstash_jdbc_last_run"
          use_column_value => true
          tracking_column => "ability_id"
          statement => "select * from brand where ability_id> :sql_last_value"
          schedule => "*/1 * * * *"
        }
    
        jdbc {
          type => "product"
          jdbc_connection_string => "jdbc:mysql://1.1.1.1::3306/esdatabase" 
          jdbc_user => "esuser"    
          jdbc_password => "123.com"
          jdbc_driver_library =>  "/app/logstash/mysql-connector-java-5.1.47.jar"
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          last_run_metadata_path => "/app/logstash/logstash_jdbc_last_run_02"
          use_column_value => true
          tracking_column => "product_id"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "1000"
          statement => " select * from product where product_id >  :sql_last_value"
          schedule => "*/2 * * * *"
        }
    }
    filter {
            if[type] == "brand"{
                      alter {
                        add_field  => {
                              "[product_list][name]"=> "brand"
                        }
                      }
            }
            if[type] == "product"{
                      alter {
                        add_field  => {
                             "[product_list][name]" => "product"
                             "[product_list][parent]" =>  "%{brand_name}"
                        }
                      }
            }
             mutate {
                remove_field => [ "@timestamp","@version"]
              }
    }
    output{ 
            if [type] == "brand" { 
                    elasticsearch { 
                            hosts => "1.1.1.1::9200" 
                            index => "testjoin" 
                            document_id => "brand_%{ability_id}"
                    } 
            }
            if [type]== "product"  { 
                            elasticsearch { 
                                    hosts => "1.1.1.1::9200" 
                                    index => "testjoin" 
                                    document_id => "product_%{product_id}"
                                    routing => "brand_%{brand_name}"
                                      } 
                                    } 
         } 
    
    
    说几点
    • schedule,建议brand表也就是较小的表应该在较大的表之前全部同步到ES,防止建立Join类型时找不到父文档,所以这里的时间请大家自己衡量。
    • last_run_metadata_path use_column_value tracking_column sql_last_value这几个结合起来用于增量同步,具体字段的功能可以去官网看看
    • document_id 我这里把父文档设置为 brand_ 开头,把子文档设置为 product_ 开头,这样可以避免 id重复的问题,因为在mysql中这两张表的id都是从1开始的
    • routing 这个字段必须有的,保证子文档和父文档在一个分片上

    查看结果

    启动logstash

    logstash -f  testJoin.yml
    
    
    观察ES结果
    在这里插入图片描述
    插入几条数据,看看是不是增量
    insert brand(brand_name) value ('lining');
    insert product(brand_name,product_name,product_price,product_color) value ('1','shoe',678,'blue');
     insert product(brand_name,product_name,product_price,product_color) value ('4','shoe',278,'black');
    
    
    在这里插入图片描述

    相关文章

      网友评论

          本文标题:logstash 增量同步mysql的一对多记录到ES的Join

          本文链接:https://www.haomeiwen.com/subject/dvsnrktx.html