美文网首页Docker+ES系列课程
(九)同步数据库数据到ES中

(九)同步数据库数据到ES中

作者: 木人呆呆 | 来源:发表于2020-08-18 10:32 被阅读0次

    1、需求描述

    将数据库中数据同步到ES中,借助ES全文检索,提高搜索效率

    • 用户update之后,需要同步到ES;
    • 支持增量更新,将新增数据同步到ES;
    • 用户注销后,不能被检索。

    2、启动logstash(前提先启动ES)

    先做好配置文件

    input{
      jdbc {
        jdbc_driver_library => "D:/software/elasticsearch/logstash-7.1.0/config/mysql-connector-java-5.1.32.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/db_example"
        jdbc_user => root
        jdbc_password => root
        #启用追踪,如果为true,则需要指定tracking_column
        use_column_value => true
        #指定追踪的字段,
        tracking_column => "last_updated"
        #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
        tracking_column_type => "numeric"
        #记录最后一次运行的结果
        record_last_run => true
        #上面运行结果的保存位置
        last_run_metadata_path => "jdbc-position.txt"
        statement => "SELECT * FROM user where last_updated >:sql_last_value;"
        schedule => " * * * * * *"
      }
    }
    output{
      elasticsearch {
        document_id => "%{id}"
        document_type => "_doc"
        index => "users"
        hosts => ["http://localhost:9200"]
      }
      stdout{
        codec => rubydebug
      }
    }
    

    启动logstash

    bin\logstash.bat  -f ..\config\test.conf
    

    启动稍微费点时间

    异常处理

    报出异常如下

    [2020-08-17T14:59:16,334][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2020-08-17T14:59:16,354][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.1.0"}
    [2020-08-17T14:59:17,147][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, input, filter, output at line 1, column 1 (byte 1)", :backtrace=>["D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/java_pipeline.rb:23:in `initialize'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "D:/software/elasticsearch/logstash-7.1.0/logstash-core/lib/logstash/agent.rb:325:in `block in converge_state'"]}
    [2020-08-17T14:59:17,486][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    [2020-08-17T14:59:22,357][INFO ][logstash.runner          ] Logstash shut down.
    

    原因就是因为配置文件的编码格式不正确
    最后使用Editplus另存为的时候,选择UTF-8格式就OK了,切记不是UTF-8+BOM格式

    新建一个springboot 项目 传送门

    首先给Mysql数据库中存入数据

    # 新增用户
    curl localhost:8080/demo/add -d name=Mike -d email=mike@xyz.com -d tags=Elasticsearch,IntelliJ
    curl localhost:8080/demo/add -d name=Jack -d email=jack@xyz.com -d tags=Mysql,IntelliJ
    curl localhost:8080/demo/add -d name=Bob -d email=bob@xyz.com -d tags=Mysql,IntelliJ
    
    # 更新用户
    curl -X PUT localhost:8080/demo/update -d id=16 -d name=Bob2 -d email=bob2@xyz.com -d tags=Mysql,IntelliJ
    
    # 删除用户
    curl -X DELETE localhost:8080/demo/delete -d id=3
    

    使用kibana进行检索

    # 创建 alias,只显示没有被标记 deleted的用户
    POST /_aliases
    {
      "actions": [
        {
          "add": {
            "index": "users",
            "alias": "view_users",
             "filter" : { "term" : { "is_deleted" : false } }      
          }
        }
      ]
    }
    
    # 通过 Alias查询,查不到被标记成 deleted的用户
    POST view_users/_search
    {
    
    }
    
    #通过view_users索引查询不到标记删除的用户
    {
    POST view_users/_search       
      "query": {
        "term": {
          "name.keyword": {
            "value": "Jack"
          }
        }
      }
    }
    
    #通过users索引可以检索到已经是删除的用户
    POST users/_search        
    {
      "query": {
        "term": {
          "name.keyword": {
            "value": "Jack"
          }
        }
      }
    }
    
    

    完成

    相关文章

      网友评论

        本文标题:(九)同步数据库数据到ES中

        本文链接:https://www.haomeiwen.com/subject/sovfjktx.html