1、需求描述
将数据库中数据同步到ES中,借助ES全文检索,提高搜索效率
- 用户update之后,需要同步到ES;
- 支持增量更新,将新增数据同步到ES;
- 用户注销后,不能被检索。
2、启动logstash(前提先启动ES)
先做好配置文件
input{
jdbc {
jdbc_driver_library => "D:/software/elasticsearch/logstash-7.1.0/config/mysql-connector-java-5.1.32.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_example"
jdbc_user => root
jdbc_password => root
#启用追踪,如果为true,则需要指定tracking_column
use_column_value => true
#指定追踪的字段,
tracking_column => "last_updated"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "numeric"
#记录最后一次运行的结果
record_last_run => true
#上面运行结果的保存位置
last_run_metadata_path => "jdbc-position.txt"
statement => "SELECT * FROM user where last_updated >:sql_last_value;"
schedule => " * * * * * *"
}
}
output{
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "users"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}
启动logstash
bin\logstash.bat -f ..\config\test.conf
启动稍微费点时间
异常处理
报出异常如下
[2020-08-17T14:59:16,334][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-08-17T14:59:16,354][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.1.0"}
[2020-08-17T14:59:17,147][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, input, filter, output at line 1, column 1 (byte 1)", :backtrace=>["D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/java_pipeline.rb:23:in `initialize'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/agent.rb:325:in `block in converge_state'"]}
[2020-08-17T14:59:17,486][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2020-08-17T14:59:22,357][INFO ][logstash.runner ] Logstash shut down.
原因就是因为配置文件的编码格式不正确
最后使用Editplus另存为的时候,选择UTF-8格式就OK了,切记不是UTF-8+BOM格式
新建一个springboot 项目 传送门
首先给Mysql数据库中存入数据
# 新增用户
curl localhost:8080/demo/add -d name=Mike -d email=mike@xyz.com -d tags=Elasticsearch,IntelliJ
curl localhost:8080/demo/add -d name=Jack -d email=jack@xyz.com -d tags=Mysql,IntelliJ
curl localhost:8080/demo/add -d name=Bob -d email=bob@xyz.com -d tags=Mysql,IntelliJ
# 更新用户
curl -X PUT localhost:8080/demo/update -d id=16 -d name=Bob2 -d email=bob2@xyz.com -d tags=Mysql,IntelliJ
# 删除用户
curl -X DELETE localhost:8080/demo/delete -d id=3
使用kibana进行检索
# 创建 alias,只显示没有被标记 deleted的用户
POST /_aliases
{
"actions": [
{
"add": {
"index": "users",
"alias": "view_users",
"filter" : { "term" : { "is_deleted" : false } }
}
}
]
}
# 通过 Alias查询,查不到被标记成 deleted的用户
POST view_users/_search
{
}
#通过view_users索引查询不到标记删除的用户
{
POST view_users/_search
"query": {
"term": {
"name.keyword": {
"value": "Jack"
}
}
}
}
#通过users索引可以检索到已经是删除的用户
POST users/_search
{
"query": {
"term": {
"name.keyword": {
"value": "Jack"
}
}
}
}
完成
网友评论