需求
将数据库中的数据同步到 ES,借助 ES 的全文搜索,提高搜索的速度。
- 需要把新增用户信息同步到 ElasticSearch 中;
- 用户信息被 Update 后,需要能被更新到 ElasticSearch 中;
- 支持增量更新;
- 用户注销后,不能被 ES 搜索到;
JDBC Input Plugin
JDBC Input Plugin 可以将数据从数据库读到 Logstash。
- 需要自己提供所需的 JDBC Driver;
- JDBC Input Plugin 支持定时任务 Scheduling,其语法来自 Rufus-scheduler,其扩展了 Cron,使用 Cron 的语法可以完成任务的触发;
- JDBC Input Plugin 支持通过 Tracking_column / sql_last_value 的方式记录 State,最终实现增量的更新;
JDBC Input Plugin | 举个栗子
把驱动拷贝到 Logstash 的目录下
/home/lixinlei/application/logstash-7.7.0/logstash-core/lib/jars/mysql-connector-java-8.0.11.jar
准备好 JDBC Input Plugin 需要的 .yaml 文件
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_example?useSSL=false"
jdbc_user => root
jdbc_password => Jiangdi_2018
#启用增量更新,如果为true,则需要指定 tracking_column
use_column_value => true
#指定追踪的字段,
tracking_column => "last_updated"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "numeric"
#记录最后一次运行的结果
record_last_run => true
#最后一次运行的结果的保存位置,在 Logstash 的工作目录下
last_run_metadata_path => "jdbc-position.txt"
# 触发时执行的 SQL,sql_last_value 就是保存在 jdbc-position.txt 中的
statement => "SELECT * FROM user where last_updated >:sql_last_value;"
# Cron 语法,每秒做一次触发
schedule => " * * * * * *"
}
}
output {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "users"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}
准备 Springboot 的程序
- 这个程序的作用主要是对 MySQL 中的一张表做增删改查;
用 mysql-demo.yaml 启动 Logstash
bin/logstash -f mysql-demo.conf
通过 Springboot 程序向 MySQL 中写入数据
curl localhost:8080/demo/add -d name=Mike -d email=mike@xyz.com -d tags=Elasticsearch,IntelliJ
curl localhost:8080/demo/add -d name=Jack -d email=jack@xyz.com -d tags=Mysql,IntelliJ
curl localhost:8080/demo/add -d name=Bob -d email=bob@xyz.com -d tags=Mysql,IntelliJ
- Logstash 读到新数据,并写进 ElasticSearch;
- 通过 Kibana 可以查询到数据
GET users/_search
;
更新 MySQL 中的数据
curl -X PUT localhost:8080/demo/update -d id=3 -d name=Bob2 -d email=bob2@xyz.com -d tags=Mysql,IntelliJ
Logstash 可以读到更新,完了把更新也写到 ElasticSearch 中;
删除 MySQL 中的用户
curl -X DELETE localhost:8080/demo/delete -d id=3
Logstash 可以读到更新,完了把更新也写到 ElasticSearch 中;
通过给索引 users 设置 alias,使得查询只查没被删除的用户
- 通过 view_users 查询用户,就只能查到没有被删除的用户;
POST /_aliases
{
"actions": [
{
"add": {
"index": "users",
"alias": "view_users",
"filter" : {
"term" : {
"is_deleted" : false
}
}
}
}
]
}
POST view_users/_search
{
}
网友评论