美文网首页
利用logstash将mysql多表数据增量同步到es

利用logstash将mysql多表数据增量同步到es

作者: 挡不住的柳Willow | 来源:发表于2018-07-27 17:40 被阅读512次

    首先下载与es版本对应的logstash,解压,打开目录,创建文件夹,该文件夹中放三个东西:

    • 1、jdbc-connector的jar包
    • 2、执行mysql操作的.sql文件
    • 3、配置文件.conf
      其中最主要的是配置文件,本文的配置文件如下:
    input {
        stdin {
        }
        jdbc {
          # mysql 数据库链接
          jdbc_connection_string => "jdbc:mysql:ip-address//:3306/database1?characterEncoding=utf8"
          # 用户名和密码
          jdbc_user => "username"
          jdbc_password => "password"
          # 驱动
          jdbc_driver_library => "/data/mysql-sync/logstash-6.3.0/mysql-test/mysql-connector-java-5.1.6.jar"
          # 驱动类名
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          # 执行的sql 文件路径+名称
          statement_filepath => ""
          # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
          schedule => "0 1 * * *"
          # 索引类型
          type => "Type1"
         # 防止自动将大小转为小写
          lowercase_column_names => false
          # 记录上一次运行记录
          record_last_run => true
          # 使用字段值
          use_column_value => true
         # 追踪字段名
          tracking_column => "updateTime"
          # 字段类型
          tracking_column_type => "timestamp"
         # 上一次运行元数据保存路径
          last_run_metadata_path => ""
         # 是否删除记录的数据
          clean_run => false
        }
      jdbc {
          # mysql 数据库链接
          jdbc_connection_string => "jdbc:mysql://ipaddress:3306/database2?characterEncoding=utf8"
          # 用户名和密码
          jdbc_user => "username"
          jdbc_password => "password"
          # 驱动
          jdbc_driver_library => "/data/mysql-sync/logstash-6.3.0/mysql-test/mysql-connector-java-5.1.6.jar"
          # 驱动类名
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          # 执行的sql 文件路径+名称
          statement_filepath => "/data/my_sql2.sql"
          # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
          schedule => "0 1 * * *"
          # 索引类型
          type => "TInquiryInfo"
          lowercase_column_names => false
          #是否记录最后一次运行内容
          record_last_run => true
          # 是否适用列元素
          use_column_value => true
          # 追踪的元素名,对应保存到es上面的字段名而不是数据库字段名
          tracking_column => "updateTime"
          # 默认为number,如果为日期必须声明为timestamp
          tracking_column_type => "timestamp"
          # 设置记录的路径
          last_run_metadata_path => "/data/logstash-6.3.0/mysql_sync/order_last_time"
          # 每次运行是否清除
          clean_run => false
        }
    }
    
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
    
    output {
       if [type]=="TInquiryInfo"{
        elasticsearch {
            hosts => ["localhost:9200"]
           # 索引名称
            index => "ca-inquiry"
          # type名称
            document_type => "TInquiryInfo"
          # 文档id,inquiryId为sql文件中查询出的字段名
            document_id => "%{inquiryId}"
        }
    }
    if [type]=="TOrderInfo"{
       elasticsearch {
            hosts => ["localhost:9200"]
           # 索引名称
            index => "ca-order"
          # type名称
            document_type => "TOrderInfo"
          # 文档id,orderId为sql文件中查询出的字段名
            document_id => "%{orderId}"
        }
    }
    }
    

    创建的数据库文件中写查询语句,查询更新时间大于上一次记录时间的文档:

    SELECT order_id orderId, message, update_time updataTime FROM sales_order WHERE update_time > :sql_last_value
    
    SELECT inquiry_id inquiryId, message, update_time updataTime FROM inquiry WHERE update_time > :sql_last_value
    

    然后执行命令即可进行同步

    logstash路径/bin/logstash -f 你的配置文件.conf
    

    注意:数据库中查询的字段如果有type会报错,需要将type名称as为其他名称

    相关文章

      网友评论

          本文标题:利用logstash将mysql多表数据增量同步到es

          本文链接:https://www.haomeiwen.com/subject/dazwmftx.html