美文网首页
Elasticsearch 与mysql数据同步

Elasticsearch 与mysql数据同步

作者: wycdavid | 来源:发表于2018-08-15 11:53 被阅读0次

    简介

    Elasticsearch有一系列的api,可以进行索引的建立,数据的增删改查等,当mysql插入数据的时候,你当然可以同时调用Elasticsearch的api来插入检索数据,但是这样感觉有点麻烦,所有这里介绍另外一种方案,利用logstash插件、logstash-input-jdbc插件(额,这个是logstash的插件),与mysql数据进行增量同步。

    这里增量同步包括,初始数据,新增数据、修改数据的同步,但是不包括删除数据,删除数据还是需要调用api来操作。简单描述下工作原理,就是定时去查询mysql数据库,把新数据和修改的数据批量插入到索引中。

    那么什么样的数据才是新数据或者修改过的数据呢,这里需要数据库的表设计来配合,比如设定一个时间戳字段updatetime,logstash插入索引数据的时候会自动缓存最后一条记录的指定的字段(假设指定这个updatetime),那么只要updatetime大于缓存值就是新数据或者修改过的数据,对于没有修改需求的记录我们可以直接指定主键id就可以。

    安装

    cd /opt/logstash/
    sudo bin/plugin install logstash-input-jdbc
    

    这一步可能会安装的很慢,这个插件是ruby开发的,国内访问不稳定,可以先替换淘宝镜像,具体操作自己百度吧,不过我使用的是阿里云服务器,直接就安装好了 (不过不排除我以前已经替换过淘宝镜像了)

    打开这个地址,然后选择下载,或者你自己去百度下载也行
    我当时下载的是 mysql-connector-java-5.1.46-bin.jar

    具体实施

    如果上面所有的安装都已经成功,那么我可以开是具体的配置了

    • 总结下前提条件
    1. 假设已经有db_search的数据库,里面有张msg的表, msg表里面有主键id字段 (可以根据实际情况来)
    2. 已经启动 elasticsearch . 地址是 119.215.99.45 端口:9200.
    3. 已经安装 logstash, 地址在 /opt/logstash
    4. 已经安装 logstash-input-jdbc插件
    • 准备两个文件jdbc.conf、msg.sql,名字不重要随便起
      jdbc.conf ,输入下面的内容
    input {
        jdbc {
          # 数据库连接字符串
          jdbc_connection_string => "jdbc:mysql://数据库IP地址:3306/db_search?characterEncoding=UTF-8&useSSL=false"
          # 数据库用户名和密码
          jdbc_user => "root"
          jdbc_password => "password"
          # 数据库java驱动包目录
          jdbc_driver_library => "/opt/logstash/mysql-connector-java-5.1.46-bin.jar"
          # the name of the driver class for mysql
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          # 关联第二个文件
          statement_filepath => "msg.sql"
          # 开启用户字段追踪判断,缓存最后记录的对应字段,这里缓存的是主键id字段
          use_column_value => true
          tracking_column => "id"
          record_last_run => true
          # 缓存文件目录,会把最后操作的记录的id缓存到这个文件中,可以先建好放在合适的位置
          last_run_metadata_path => "/opt/logstash/msg-id.last"
          # 设定同步周期,当前设置是每分钟一次,更多设置请自己百度
          schedule => "* * * * *"
          # 索引的分类
          type => "msg"
        }
    }
    
    filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
    }
    
    output {
        elasticsearch {
            # 地址
            hosts => "119.215.99.45:9200"
            # 索引的名称,可以随便起,我这边跟数据库起了一样的名字
            index => "db_search"
            # 索引下面的分类,直接用了上面配置中的值
            document_type => "%{type}"
        }
        stdout {
            codec => json_lines
        }
    }
    
    

    msg.sql 里面输入以下内容

    select 
        id,
        msg_title,
        msg_link,
        publish_time
    from 
        msg
    where 
        id > :sql_last_value
    

    这个就不需要多解释了,根据你的实际情况来设置需要存到索引中的字段,其中 :sql_last_value 这个值会自动读取之前设置的缓存文件msg-id.last中的值
    很显然我这个配置是不支持数据修改的,因为我用的是主键id字段,如果有数据修改的需求可以指定其他的字段,原理我在文章开头已经讲的很清楚了。

    测试运行

    • 启动
    sudo bin/logstash -f jdbc.conf
    

    然后你就可以往mysql中插入数据了,等会再到elasticsearch中查看是否已经自动建立相应的索引,索引的文档数是否等于mysql的新增数据数。具体我就不演示了

    注意事项,很重要!!!

    此方法自动建立的索引有个很重要的问题,就是分词器是用的系统默认的,对中文支持很烂,而且索引一旦建立是不能修改分词器的(切记,切记)!所以如果你需要指定分词器(比如IK),就必须先建立索引,然后再进行数据同步。

    相关文章

      网友评论

          本文标题:Elasticsearch 与mysql数据同步

          本文链接:https://www.haomeiwen.com/subject/upuibftx.html