美文网首页
mysql数据同步elasticsearch(es)全文检索容器

mysql数据同步elasticsearch(es)全文检索容器

作者: AaChoxsu | 来源:发表于2018-03-26 12:53 被阅读0次

    一、安装ElasticSearch(下面统称es,版本6.0.0,环境windows10)

    直接上下载地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.0.0.zip

    解压后目录如下:
    1

    启动es,./bin/elasticsearch.bat;启动成功如图

    2

    默认cluster_name是elasticsearch和端口9200可以修改,需要修改在config/elasticsearch.yml;上图

    3

    二、安装logstash

    下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-6.0.0.zip

    解压目录
    4

    先安装logstash-input-jdbc插件
    ./bin/logstash-plugin.bat install logstash-input-jdbc

    5

    在logstash目录下创建config-mysql,见图4

    6

    创建配置文件load_data.conf,配置文件随便取名,可以创建sql文件,也可以在conf配置文件中定义,具体下面有说明

    先上配置文件内容
    input {
        stdin {
        }
        jdbc {
          jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/jfinal_club?characterEncoding=utf8&useSSL=false"
          jdbc_user => "root"
          jdbc_password => "root"
          jdbc_driver_library => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/mysql-connector-java-5.1.43.jar"
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          statement_filepath => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/store_list.sql"
          schedule => "* * * * *"
          use_column_value => false
          record_last_run => true
          last_run_metadata_path => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/run/store_list"
          type => "sl"
        }
    
    jdbc {
          jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/jfinal_club?characterEncoding=utf8&useSSL=false"
          jdbc_user => "root"
          jdbc_password => "root"
          jdbc_driver_library => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/mysql-connector-java-5.1.43.jar"
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          statement => "select * from store where updated > date_add(:sql_last_value, interval 8 hour)"
          schedule => "* * * * *"
          use_column_value => false
          record_last_run => true
          last_run_metadata_path => "D:/ELK/6.0.0/logstash-6.0.0/config-mysql/run/store_s"
          type => "st"
        }
    }
    
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
    
    output {
    
        if[type] == "sl"{
            elasticsearch {
                  hosts => ["127.0.0.1:9200"]
                  index => "store_list"
                  document_type => "jdbc"
                  document_id => "%{store_id}}"
            }
        }
    
        if[type] == "st"{
            elasticsearch {
                  hosts => ["127.0.0.1:9200"]
                  index => "store_st"
                  document_type => "jdbc"
                  document_id => "%{id}}"
            }
        }
    
        stdout {
            codec => json_lines
        }
    }
    

    字段解释;具体的见:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

    图6中有个run目录,在这里是用来存放:sql_last_value的时间值的
    store_list.sql
    7
    先在es中生成index
    PUT /store_list
    {
        "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "jdbc": {
          "properties": {
            "@timestamp": {
              "type": "date"
            },
            "@version": {
              "type": "keyword"
            },
            "store_id": {
              "type": "long"
            },
            "store_name": {
              "type": "keyword"
            },
            "uid": {
              "type": "text"
            },
            "telephone": {
              "type": "text"
            },
            "street_id": {
              "type": "text"
            },
            "detail": {
              "type": "keyword"
            },
            "address": {
              "type": "keyword"
            },
            "store_created": {
              "type": "date"
            },
            "store_updated": {
              "type": "date"
            },
            "detail_id": {
              "type": "long"
            },
            "type_name": {
              "type": "text"
            },
            "tag": {
              "type": "keyword"
            },
            "overall_rating": {
              "type": "text"
            },
            "navi_location_lng": {
              "type": "double"
            },
            "navi_location_lat": {
              "type": "double"
            },
            "detail_url": {
              "type": "text"
            },
            "comment_num": {
              "type": "integer"
            },
            "detail_created": {
              "type": "date"
            },
            "detail_updated": {
              "type": "date"
            },
            "location_id": {
              "type": "long"
            },
            "lng": {
              "type": "double"
            },
            "lat": {
              "type": "double"
            }
          }
        }
      }
    }
    

    上面这种方式可以通过es管理工具执行,比如kibana->dev tools;或者使用curl的方式也可以

    curl -XPUT "http://localhost:9200/store_list" -H 'Content-Type: application/json' -d'
    {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "jdbc": {
          "properties": {
            "@timestamp": {
              "type": "date"
            },
            "@version": {
              "type": "keyword"
            },
            "store_id": {
              "type": "long"
            },
            "store_name": {
              "type": "keyword"
            },
            "uid": {
              "type": "text"
            },
            "telephone": {
              "type": "text"
            },
            "street_id": {
              "type": "text"
            },
            "detail": {
              "type": "keyword"
            },
            "address": {
              "type": "keyword"
            },
            "store_created": {
              "type": "date"
            },
            "store_updated": {
              "type": "date"
            },
            "detail_id": {
              "type": "long"
            },
            "type_name": {
              "type": "text"
            },
            "tag": {
              "type": "keyword"
            },
            "overall_rating": {
              "type": "text"
            },
            "navi_location_lng": {
              "type": "double"
            },
            "navi_location_lat": {
              "type": "double"
            },
            "detail_url": {
              "type": "text"
            },
            "comment_num": {
              "type": "integer"
            },
            "detail_created": {
              "type": "date"
            },
            "detail_updated": {
              "type": "date"
            },
            "location_id": {
              "type": "long"
            },
            "lng": {
              "type": "double"
            },
            "lat": {
              "type": "double"
            }
          }
        }
      }
    }'
    

    然后通过http://localhost:9200/store_list/查看字段生成情况

    store_list就是index,相当于数据库的database

    8

    然后回到logstash目录下

    执行 nohup.exe ./bin/logstash.bat -f config-mysql/load_data.conf &

    9

    最好加上& 结尾,后台运行

    然后看数据库同步情况

    10

    可能有些细节没能写全,如果在集成中遇到什么情况,可以评论指出

    相关文章

      网友评论

          本文标题:mysql数据同步elasticsearch(es)全文检索容器

          本文链接:https://www.haomeiwen.com/subject/dtfrcftx.html