美文网首页
使用logstash同步MySQL数据到elasticsearc

使用logstash同步MySQL数据到elasticsearc

作者: 惜鸟 | 来源:发表于2020-05-29 19:54 被阅读0次

    需求:将MySQL的数据同步到es中,通过es提供搜索和数据分析服务
    网上有很多教程,但是自己在使用过程中还是遇到了很多坑,为了避免遇到同样的问题
    下面将自己学习过程和资料整理下来,方便自己查阅,也希望对有同样需求的同学提供帮助

    上一篇文章简单介绍了logstash的安装步骤,这篇文章主要介绍logstash同步mysql数据到es的配置文件的编写

    一、logstash 配置文件介绍

    logstash 设计了自己的 DSL(Domain-Specific Language 领域特定语言) 包括有区域,注释,数据类型(布尔值,字符串,数值,数组,哈希),条件判断,字段引用等。

    logstash{} 来定义区域,区域内可以包括插件区域定义,你可以在一个区域内定义多个插件,插件区域内则可以定义键值对设置,示例如下:

    input {}
    filter {}
    output {}
    

    logstash input插件
    logstash filter插件
    logstash output插件

    二、同步MySQL数据到ES的配置文件logstash-mysql.conf

    1. 创建logstash-mysql.conf

    首先创建logstash的配置文件,直接从logstash-6.7.0/config/logstash-sample.conf中复制一份配置文件logstash-mysql.conf, 使用如下命令:

    # 复制配置文件,并重命名
    cp config/logstash-sample.conf mysql/logstash-mysql.conf
    # 编辑配置文件
    vim mysql/logstash-mysql.conf
    

    修改内容如下:

    input {
      # 用于接收控制台输入数据,进行测试
      stdin { }
      jdbc {
          # 数据库  数据库名称为elk,表名为book_table
          jdbc_connection_string => "jdbc:mysql://localhost:3306/elk?characterEncoding=UTF-8"
          # 数据库用户名
          jdbc_user => "root"
          # 数据库密码
          jdbc_password => "root"
          # jar包的位置
          jdbc_driver_library => "mysql/mysql-connector-java-5.1.40.jar"
          # mysql的Driver
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          # 开启分页
          jdbc_paging_enabled => "true"
          # 每页条数
          jdbc_page_size => "100"
          # 需要执行的mysql文件的位置,会执行文件中的sql语句
          #statement_filepath => "mysql/test.sql"
          statement => "select * from book_table where id > :sql_last_value"
          # 调度时间,默认为1分钟执行一次
          schedule => "* * * * *"
          #是否记录上次执行结果, 如果为true,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
          record_last_run => true
     
          #是否需要记录某个column 的值,如果 record_last_run 为true,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
          use_column_value => true
     
          #如果 use_column_value 为true,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.
          tracking_column => id
     
          #指定文件,来记录上次执行到的 tracking_column 字段的值
          #比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.
          #我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
          last_run_metadata_path => "config/metadata"
     
          #是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录
          clean_run => false
     
          #是否将 column 名称转小写
          #lowercase_column_names => false
          # 标识输入的类型,自定义字符串,可以用于输出类型选择
          type => "book_table"
        }
    }
    filter {
     # 默认使用的是Unix时间,我们需要将logstash同步数据到ES时间+8小时
    #  数据库中的时间字段也可以这样设置
       ruby {
          code => "event.set('@timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
       }
    }
    
    output {
     if [type] == "book_table" {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        user => "elastic"
            password => "changeme"
        #按分钟
        #index => "mysql-%{+YYYY.MM.dd.HH.mm}"
        #按小时
        # 索引名字,必须小写
        index => "mysql-%{+YYYY.MM.dd.HH}"
        #index => "logstash-%{[fields][document_type]}-%{+YYYY.MM.dd}"
            #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        # 数据唯一索引(建议使用数据库主键 KeyID)
        document_id => "%{id}"
      }
      # 以json格式输出到控制台
      stdout {
            codec => json_lines
      }
     }
        
    }
    

    注意点:
    1、 logstash默认使用的是Unix时间,我们需要将logstash同步数据到ES时间+8小时
    2、数据库中的时间字段也可以这样设置

     filter {
       ruby {
         code => "event.set('@timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
        }
      }
    

    3、logstash里面不支持驼峰字段
    4、因为logstash里面使用了type关键字,所有数据库中的字段不能命名为type,如果有type需要使用别名替换

    2. 配置说明

    从上面的配置中可以看出,需要创建一个mysql目录,并且下载mysql-connector-java-5.1.40.jar用于连接mysql数据库,可以直接从maven仓库:https://mvnrepository.com/中搜索mysql-connector-java下载jar包

    在logstash-6.7.0目录下创建mysql目录

    # 创建mysql目录
    mkdir mysql
    # 进入mysql 目录
    cd mysql
    # 下载mysql-connector-java.jar
    wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.40/mysql-connector-java-5.1.40.jar
    

    3. 运行logstash

    bin/logstash -f mysql/logstash-mysql.conf
    

    参考文档:
    Logstash 简易教程
    https://blog.csdn.net/diaobatian/article/details/98624709

    相关文章

      网友评论

          本文标题:使用logstash同步MySQL数据到elasticsearc

          本文链接:https://www.haomeiwen.com/subject/lbwxzhtx.html