首先下载与es版本对应的logstash,解压,打开目录,创建文件夹,该文件夹中放三个东西:
- 1、jdbc-connector的jar包
- 2、执行mysql操作的.sql文件
- 3、配置文件.conf
其中最主要的是配置文件,本文的配置文件如下:
input {
stdin {
}
jdbc {
# mysql 数据库链接
jdbc_connection_string => "jdbc:mysql:ip-address//:3306/database1?characterEncoding=utf8"
# 用户名和密码
jdbc_user => "username"
jdbc_password => "password"
# 驱动
jdbc_driver_library => "/data/mysql-sync/logstash-6.3.0/mysql-test/mysql-connector-java-5.1.6.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => ""
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "0 1 * * *"
# 索引类型
type => "Type1"
# 防止自动将大小转为小写
lowercase_column_names => false
# 记录上一次运行记录
record_last_run => true
# 使用字段值
use_column_value => true
# 追踪字段名
tracking_column => "updateTime"
# 字段类型
tracking_column_type => "timestamp"
# 上一次运行元数据保存路径
last_run_metadata_path => ""
# 是否删除记录的数据
clean_run => false
}
jdbc {
# mysql 数据库链接
jdbc_connection_string => "jdbc:mysql://ipaddress:3306/database2?characterEncoding=utf8"
# 用户名和密码
jdbc_user => "username"
jdbc_password => "password"
# 驱动
jdbc_driver_library => "/data/mysql-sync/logstash-6.3.0/mysql-test/mysql-connector-java-5.1.6.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "/data/my_sql2.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "0 1 * * *"
# 索引类型
type => "TInquiryInfo"
lowercase_column_names => false
#是否记录最后一次运行内容
record_last_run => true
# 是否适用列元素
use_column_value => true
# 追踪的元素名,对应保存到es上面的字段名而不是数据库字段名
tracking_column => "updateTime"
# 默认为number,如果为日期必须声明为timestamp
tracking_column_type => "timestamp"
# 设置记录的路径
last_run_metadata_path => "/data/logstash-6.3.0/mysql_sync/order_last_time"
# 每次运行是否清除
clean_run => false
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if [type]=="TInquiryInfo"{
elasticsearch {
hosts => ["localhost:9200"]
# 索引名称
index => "ca-inquiry"
# type名称
document_type => "TInquiryInfo"
# 文档id,inquiryId为sql文件中查询出的字段名
document_id => "%{inquiryId}"
}
}
if [type]=="TOrderInfo"{
elasticsearch {
hosts => ["localhost:9200"]
# 索引名称
index => "ca-order"
# type名称
document_type => "TOrderInfo"
# 文档id,orderId为sql文件中查询出的字段名
document_id => "%{orderId}"
}
}
}
创建的数据库文件中写查询语句,查询更新时间大于上一次记录时间的文档:
SELECT order_id orderId, message, update_time updataTime FROM sales_order WHERE update_time > :sql_last_value
SELECT inquiry_id inquiryId, message, update_time updataTime FROM inquiry WHERE update_time > :sql_last_value
然后执行命令即可进行同步
logstash路径/bin/logstash -f 你的配置文件.conf
网友评论