美文网首页
logstash同步ES数据

logstash同步ES数据

作者: wbool | 来源:发表于2017-10-16 15:55 被阅读0次

如何使用logstash同步es数据

1.下载

https://download.elastic.co/logstash/logstash/logstash-2.4.1.zip

2.安装

2.1 解压安装到目录

image.png

2.2 安装插件 logstash-input-mysql


cd logstash-2.4.1

bin/logstash-plugin install logstash-input-jdbc

如果插件更新失败, 是因为RubyGems 一直以来在国内都非常难访问到


请尽可能用比较新的 RubyGems 版本,建议 2.6.x 以上。

# 这里请翻墙一下

$ gem update --system

$ gem -v

2.6.3

$ gem sources --add https://gems.ruby-china.org/ --remove https://rubygems.org/

$ gem sources -l

https://gems.ruby-china.org

# 确保只有 gems.ruby-china.org

# 如果遇到 SSL 证书问题,你又无法解决,请直接用 http://gems.ruby-china.org 避免 SSL 的问题

(gem sources --add http://gems.ruby-china.org/ --remove https://gems.ruby-china.org/)

#修改源

cd logstash-2.4.1

vim Gemfile

vim Gemfile.jruby-1.9.lock

3 使用

参考

plugins-input-jdbc

plugins-output-elasticsearch

3.1 logstash.conf 配置


# 这里的路径相对 你执行命令所在路径

input {

   jdbc {

     #驱动

    jdbc_driver_library => "/opt/lib/mysql-connector-java-5.1.40.jar"

    #驱动类

    jdbc_driver_class => "com.mysql.jdbc.Driver"

    #数据库

    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"

     #数据库用户名

    jdbc_user => "root"

    #数据库密码

    jdbc_password => "password"

    #任务调度 (分、时、天、月、年,全部为*默认含义为每分钟都更新)

    schedule => "* * * * *"

    #脚本路径

    statement_filepath => "../scripts/patent.sql"

    #分页

    jdbc_paging_enabled => "true"

    #分页大小(默认100,000)

    jdbc_page_size => "100000"

    #---- 增量更新 ----

    #是否记录上次执行结果(执行最后一条记录的结果,为了增量更新,数据量大时,不需要重新查询)

    #默认true

    record_last_run=>"true"

    #是否记录某个column的值,如果false存储同步最后一条记录的时间戳

    use_column_value => "true"

    #如果 use_column_value 为真,需配置此参数,表示记录哪一列(注意是elasticsearch中document里的列,不是sql的列)

    tracking_column => "updateDate"

    #指定文件(记录上次执行到的 tracking_column 字段的值)

    last_run_metadata_path => "../lastrun/logstash_jdbc_last_run_patent_payable_fee"

    #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录

    #避免踩坑,在同步缴费信息过程中,将clean_run设置为true,

    #导致第一次同步50w条数据后,关闭logstash,再次开启同步,发现logstash居然从头查询,没有使用tracking_column的value

    #默认false

    clean_run => "false"

    #时区设置

    #建议不要使用该配置,可能会导致数据同步时遗漏数据

    #修改了之后发现last_run中记录的时间变成本地时间,当时es里还是存储的utc标准时间(比本地时间早8个小时)

    jdbc_default_timezone => "Asia/Shanghai"

  }

}

output {

    elasticsearch {

        #全文检索服务器

        hosts => "localhost:9200"

        #索引(数据库)

        index => "zl_dev"

        #类型(数据库表)

        document_type => "%{type}"

        #主键(防止重复)

        document_id => "%{id}"

    }

}

3.2 sql 脚本

这里sql_last_value 我们记录的是修改时间,这样容易完成增量更新


SELECT * FROM t_dp_patent WHERE update_date > :sql_last_value

3.3 执行脚本


cd /opt/logstash-2.4.1/bin

./logstash -f  ../scripts/patent.conf &

4.疑难杂症

映射文件

{

"settings": {

    "analysis": {

        "analyzer": {

              "my_analyzer": {

                  "type":        "pattern",

                  "pattern":    ",",

                  "tokenizer":    "whitespace",

                 "char_filter":  [ "html_strip"]

          }

      }

  }

},

"mappings": {

      "patent": {

          "properties": {

                 "patentHolder": {

                       "type": "string",

                        "analyzer": "ik"

                  },

                "patentHolder_": {

                   "type": "string",

                    "index":"not_analyzed"

                },

              "patentHolder_1": {

                    "type": "string",

                    "analyzer":"my_analyzer"

                  },

              "applicant_": {

                    "type": "string",

                    "index":"not_analyzed"

                  },

              "name": {

                    "type": "string",

                    "analyzer": "ik"

                  },

              "name_": {

                    "type": "string",

                    "index":"not_analyzed"

                  },

              "applicationNumber": {

                    "type": "string",

                    "analyzer": "ik"

                  },

              "applicationNumber_": {

                    "type": "string",

                    "index":"not_analyzed"

                  },

              "applicant": {

                    "type": "string",

                    "analyzer": "ik"

                  },

              "applicant_": {

                    "type": "string",

                    "index":"not_analyzed"

                  },

              "applicant_1": {

                    "type": "string",

                    "analyzer":"my_analyzer"

                  },

              "agent": {

                    "type": "string",

                    "analyzer": "ik"

                  },

              "agency": {

                    "type": "string",

                    "analyzer": "ik"

                  },

              "agency_": {

                    "type": "string",

                    "index":"not_analyzed"

                  },

              "applicantAddr": {

                    "type": "string",

                    "analyzer": "ik"

                  },

              "inventor": {

                    "type": "string",

                    "analyzer": "ik"

                  },

              "synoptic": {

                    "type": "string",

                    "analyzer": "ik"

                  },

              "createDate": {

                    "format": "strict_date_optional_time||epoch_millis",

                    "type": "date"

                  },

              "updateDate": {

                    "format": "strict_date_optional_time||epoch_millis",

                    "type": "date"

                  },

              "ipc": {

                    "type": "string"

                  },

              "loc": {

                    "type": "string"

                  },

              "synopticPicId": {

                    "type": "string"

                  },

              "censor": {

                    "type": "string"

                  },

              "applicantArea": {

                    "type": "string"

                  },

              "applicationDate": {

                    "format": "strict_date_optional_time||epoch_millis",

                    "type": "date"

                  },

              "notificationDate": {

                    "format": "strict_date_optional_time||epoch_millis",

                    "type": "date"

                  },

              "notificationNumber": {

                    "type": "string"

                  },

              "id": {

                    "type": "long"

                  },

              "keysHash": {

                    "type": "string"

              },

              "patentNo": {

                    "type": "string"

              },

              "type_": {

                    "type": "string"

              },

              "status": {

                    "type": "long"

              },

              "delFlag": {

                    "type": "long"

              }

          }

      }

    }

}

相关文章

网友评论

      本文标题:logstash同步ES数据

      本文链接:https://www.haomeiwen.com/subject/vcozdxtx.html