美文网首页
【elasticsearch实操】1、logstash导入mys

【elasticsearch实操】1、logstash导入mys

作者: cutieagain | 来源:发表于2020-03-21 23:26 被阅读0次

    es安装

    【elasticsearch】2、kibana安装

    logstash安装

    【elasticsearch】3、logstash安装

    下载mysql的connector的jar包

    mysql的connector的jar包
    我这里是用的是mysql-connector-java-5.1.38.jar

    安装相关插件

    #安装jdbc的入插件
    bin/logstash-plugin install logstash-input-jdbc
    #安装elasticsearch的出插件
    bin/logstash-plugin install logstash-output-elasticsearch
    

    编写logstash-mysql-es.conf文件用于启动数据传输

    input {
      jdbc {
        # mysql相关jdbc配置
        jdbc_connection_string => "jdbc:mysql://127.0.0.1/database_name?useUnicode=true&characterEncoding=utf8&useSSL=true"
        jdbc_user => "xxx"
        jdbc_password => "xxx"
    
        # jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
        jdbc_driver_library => "../config/jar/mysql-connector-java-5.1.38.jar"
        # the name of the driver class for mysql
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => true
        jdbc_page_size => "50000"
    
        jdbc_default_timezone =>"Asia/Shanghai"
    
        # mysql文件, 也可以直接写SQL语句在此处,如下:
        statement => "select * from exp_store where create_time >= :sql_last_value"
        # statement_filepath => "./config/jdbc.sql"
    
        # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
        schedule => "* * * * *"
        #type => "jdbc"
    
        # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        #record_last_run => true
    
        # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
        use_column_value => true
    
        # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
        tracking_column => "create_time"
        
        tracking_column_type => "timestamp"
    
        last_run_metadata_path => "./logstash_ineyes_exp_store_last_id"
    
        # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
    
        #是否将 字段(column) 名称转小写
        lowercase_column_names => false
      }
    }
    
    output {
      elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "ineyes_exp_store"
        document_id => "%{id}"
        template_overwrite => true
      }
    
      # 这里输出调试,正式运行时可以注释掉
      stdout {
          codec => json_lines
      } 
    }
    

    启动数据传输

    #启动数据传输
    nohup bin/logstash -f config/transfer-config/logstash-mysql-es.conf > logs/logstash.out &
    

    启动问题

    block in load_driver_jars

    Sending Logstash logs to /Users/cutie/workspace/elasticsearch/logstash-7.6.0/logs which is now configured via log4j2.properties
    [2020-03-21T17:07:30,361][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2020-03-21T17:07:30,466][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.6.0"}
    [2020-03-21T17:07:32,347][INFO ][org.reflections.Reflections] Reflections took 33 ms to scan 1 urls, producing 20 keys and 40 values 
    [2020-03-21T17:07:33,115][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
    [2020-03-21T17:07:33,281][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://127.0.0.1:9200/"}
    [2020-03-21T17:07:33,326][INFO ][logstash.outputs.elasticsearch][main] ES Output version determined {:es_version=>7}
    [2020-03-21T17:07:33,330][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
    [2020-03-21T17:07:33,527][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1:9200"]}
    [2020-03-21T17:07:33,571][INFO ][logstash.outputs.elasticsearch][main] Using default mapping template
    [2020-03-21T17:07:33,604][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
    [2020-03-21T17:07:33,609][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["/Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf"], :thread=>"#<Thread:0x4f7fd90b run>"}
    [2020-03-21T17:07:33,631][INFO ][logstash.outputs.elasticsearch][main] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
    [2020-03-21T17:07:33,649][INFO ][logstash.outputs.elasticsearch][main] Installing elasticsearch template to _template/logstash
    [2020-03-21T17:07:34,622][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
    [2020-03-21T17:07:34,683][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    [2020-03-21T17:07:34,976][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
    /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/cronline.rb:77: warning: constant ::Fixnum is deprecated
    { 2054 rufus-scheduler intercepted an error:
      2054   job:
      2054     Rufus::Scheduler::CronJob "* * * * *" {}
      2054   error:
      2054     2054
      2054     LogStash::PluginLoadingError
      2054     unable to load /Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf from :jdbc_driver_library, no such file to load -- /Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:151:in `block in load_driver_jars'
      2054       org/jruby/RubyArray.java:1814:in `each'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:144:in `load_driver_jars'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:166:in `open_jdbc_connection'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:242:in `execute_statement'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/inputs/jdbc.rb:309:in `execute_query'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/inputs/jdbc.rb:276:in `block in run'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:234:in `do_call'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:258:in `do_trigger'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:300:in `block in start_work_thread'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:299:in `block in start_work_thread'
      2054       org/jruby/RubyKernel.java:1446:in `loop'
      2054       /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:289:in `block in start_work_thread'
      2054   tz:
      2054     ENV['TZ']: 
      2054     Time.now: 2020-03-21 17:08:00 +0800
      2054   scheduler:
      2054     object_id: 2052
      2054     opts:
      2054       {:max_work_threads=>1}
      2054       frequency: 0.3
      2054       scheduler_lock: #<Rufus::Scheduler::NullLock:0x403e43b8>
      2054       trigger_lock: #<Rufus::Scheduler::NullLock:0x7eba4eef>
      2054     uptime: 26.022160999999997 (26s23)
      2054     down?: false
      2054     threads: 2
      2054       thread: #<Thread:0x67e9c5b3>
      2054       thread_key: rufus_scheduler_2052
      2054       work_threads: 1
      2054         active: 1
      2054         vacant: 0
      2054         max_work_threads: 1
      2054       mutexes: {}
      2054     jobs: 1
      2054       at_jobs: 0
      2054       in_jobs: 0
      2054       every_jobs: 0
      2054       interval_jobs: 0
      2054       cron_jobs: 1
      2054     running_jobs: 1
      2054     work_queue: 0
    } 
    
    

    需要检查配置文件,检查数据库连接是否正确等

    成功运行启动

    启动后会定时根据sql拉取指定的数据已供后续的操作

    2020-03-21T23:13:00,350][INFO ][logstash.inputs.jdbc     ][main] (0.006952s) SELECT version()
    [2020-03-21T23:13:00,373][INFO ][logstash.inputs.jdbc     ][main] (0.019545s) SELECT version()
    [2020-03-21T23:13:00,391][INFO ][logstash.inputs.jdbc     ][main] (0.013471s) SELECT count(*) AS `count` FROM (select * from exp_store where create_time >= '2020-01-16 15:00:34') AS `t1` LIMIT 1
    [2020-03-21T23:13:00,417][INFO ][logstash.inputs.jdbc     ][main] (0.023679s) SELECT * FROM (select * from exp_store where create_time >= '2020-01-16 15:00:34') AS `t1` LIMIT 50000 OFFSET 0
    {"time_out":null,"create_time":"2020-01-16T07:00:34.000Z","retention_pay_amount":0,"exp_mail_id":2114,"express_name":null,"receiver_city":null,"receiver_address":null,"exp_no":null,"retention_status":0,"paid_exp_delay_reward":false,"express_id":null,"receiver_province":null,"courier_id":null,"update_time":"2020-01-16T07:00:34.000Z","arriver_time":"2020-01-16T07:00:34.000Z","push_type":null,"exp_locker_no":"C46E7B1A1D1D","exp_store_success_reason":null,"quit_store_exp_reason":null,"exp_type":"EXP_LOCKER","receiver_phone":null,"exp_agent_company_id":-1,"reverse_receiver_phone":null,"exp_locker_detail_id":150,"exp_locker_manager_id":12,"paid_exp_delivery_pay":false,"exp_locker_detail_money":0,"express_status":9,"take_delivery_code":null,"waybill_no":null,"push_flag":null,"exp_locker_id":5,"shop_id":null,"id":2106,"receiver_district":null,"leave_time":null,"delivery_pay_amount":0,"@version":"1","receiver_name":null,"is_secret":false,"@timestamp":"2020-03-21T15:13:00.419Z"}
    

    配置logstash以服务方式运行

    https://www.jianshu.com/p/54cdddf89989
    如何优雅关停logstash:https://www.elastic.co/guide/en/logstash/current/shutdown.html

    数据库新增字段同步是否有影响

    数据库新增字段,必须先在es中声明新字段的mapping信息,新增后再在数据库中新增并进行同步;原来的数据无影响

    相关文章

      网友评论

          本文标题:【elasticsearch实操】1、logstash导入mys

          本文链接:https://www.haomeiwen.com/subject/zzldehtx.html