美文网首页
利用logstash将mysql多表数据增量同步到es

利用logstash将mysql多表数据增量同步到es

作者: 挡不住的柳Willow | 来源:发表于2018-07-27 17:40 被阅读512次

首先下载与es版本对应的logstash,解压,打开目录,创建文件夹,该文件夹中放三个东西:

  • 1、jdbc-connector的jar包
  • 2、执行mysql操作的.sql文件
  • 3、配置文件.conf
    其中最主要的是配置文件,本文的配置文件如下:
input {
    stdin {
    }
    jdbc {
      # mysql 数据库链接
      jdbc_connection_string => "jdbc:mysql:ip-address//:3306/database1?characterEncoding=utf8"
      # 用户名和密码
      jdbc_user => "username"
      jdbc_password => "password"
      # 驱动
      jdbc_driver_library => "/data/mysql-sync/logstash-6.3.0/mysql-test/mysql-connector-java-5.1.6.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # 执行的sql 文件路径+名称
      statement_filepath => ""
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "0 1 * * *"
      # 索引类型
      type => "Type1"
     # 防止自动将大小转为小写
      lowercase_column_names => false
      # 记录上一次运行记录
      record_last_run => true
      # 使用字段值
      use_column_value => true
     # 追踪字段名
      tracking_column => "updateTime"
      # 字段类型
      tracking_column_type => "timestamp"
     # 上一次运行元数据保存路径
      last_run_metadata_path => ""
     # 是否删除记录的数据
      clean_run => false
    }
  jdbc {
      # mysql 数据库链接
      jdbc_connection_string => "jdbc:mysql://ipaddress:3306/database2?characterEncoding=utf8"
      # 用户名和密码
      jdbc_user => "username"
      jdbc_password => "password"
      # 驱动
      jdbc_driver_library => "/data/mysql-sync/logstash-6.3.0/mysql-test/mysql-connector-java-5.1.6.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # 执行的sql 文件路径+名称
      statement_filepath => "/data/my_sql2.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "0 1 * * *"
      # 索引类型
      type => "TInquiryInfo"
      lowercase_column_names => false
      #是否记录最后一次运行内容
      record_last_run => true
      # 是否适用列元素
      use_column_value => true
      # 追踪的元素名,对应保存到es上面的字段名而不是数据库字段名
      tracking_column => "updateTime"
      # 默认为number,如果为日期必须声明为timestamp
      tracking_column_type => "timestamp"
      # 设置记录的路径
      last_run_metadata_path => "/data/logstash-6.3.0/mysql_sync/order_last_time"
      # 每次运行是否清除
      clean_run => false
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
   if [type]=="TInquiryInfo"{
    elasticsearch {
        hosts => ["localhost:9200"]
       # 索引名称
        index => "ca-inquiry"
      # type名称
        document_type => "TInquiryInfo"
      # 文档id,inquiryId为sql文件中查询出的字段名
        document_id => "%{inquiryId}"
    }
}
if [type]=="TOrderInfo"{
   elasticsearch {
        hosts => ["localhost:9200"]
       # 索引名称
        index => "ca-order"
      # type名称
        document_type => "TOrderInfo"
      # 文档id,orderId为sql文件中查询出的字段名
        document_id => "%{orderId}"
    }
}
}

创建的数据库文件中写查询语句,查询更新时间大于上一次记录时间的文档:

SELECT order_id orderId, message, update_time updataTime FROM sales_order WHERE update_time > :sql_last_value

SELECT inquiry_id inquiryId, message, update_time updataTime FROM inquiry WHERE update_time > :sql_last_value

然后执行命令即可进行同步

logstash路径/bin/logstash -f 你的配置文件.conf

注意:数据库中查询的字段如果有type会报错,需要将type名称as为其他名称

相关文章

网友评论

      本文标题:利用logstash将mysql多表数据增量同步到es

      本文链接:https://www.haomeiwen.com/subject/dazwmftx.html