美文网首页
将 MySQL 中的业务数据接入到 ElasticSearch

将 MySQL 中的业务数据接入到 ElasticSearch

作者: 乌鲁木齐001号程序员 | 来源:发表于2020-06-11 16:02 被阅读0次

    业务数据接入 | 概述

    所谓业务数据接入,就是将业务操作中产生的,存储在 MySQL 中的数据,导入到 ElasticSearch 中。整个导入的过程,大致上可以分成三步:索引构建、全量数据导入、增量数据导入。

    索引构建
    • 在关系型数据库 MySQL 中,搜索涉及的数据可能分散在不同的表中;
    • 在构建 ElasticSearch 中的索引的时候,一般会将分散在不同表中的数据,平铺到一个索引中;
    • 技术上讲,索引的构建,就是设置索引的 mappings 和 settings;
    全量数据导入
    • 全量数据导入的粒度一般以天为单位,在业务低峰时段完成;
    增量数据导入
    • 增量数据导入的粒度一般以秒或分钟为单位,实时的将业务操作对 MySQL 的更新同步到 ElasticSearch 中;

    索引构建 | 考量

    索引要包含哪些字段
    • 根据搜索需要的字段,排序需要的字段等维度去决定需要哪些字段;
    索引中字段的类型
    • 哪些字段需要用 text 类型分词,从而被搜索引擎感知;
    • 哪些字段需要做排序;
    • 哪些字段是地理位置信息;
    • 字段索引时的 Analyzer 和搜索时的 Analyzer 分别用什么;

    索引构建 | 门店

    涉及到 MySQL 中 shop,seller,category 三张表。

    PUT /shop
    {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "id":{"type": "integer"},
          "name":{"type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart"},
          "tags":{"type": "text", "analyzer": "whitespace", "fielddata": true},
          "location":{"type": "geo_point"},
          "remark_score":{"type": "double"},
          "price_per_man":{"type": "integer"},
          "category_id":{"type": "integer"},
          "category_name":{"type":"keyword"},
          "seller_id":{"type": "integer"},
          "seller_remark_score":{"type":"double"},
          "seller_disabled_flag":{"type":"integer"}
        }
      }
    }
    

    全量数据导入

    安装 logstash-input-jdbc
    • 默认情况下的话,这个插件是安装了的;
    • 在使用命令 bin/logstash-plugins list 的时候,有时候会报错 Package jdk.internal.jimage.decompressor in both module java.base and module jtr.fs,解决方案是把 jdk 目录下的 jrt-fs.jar 删掉;
    自备 MySQL 驱动
    • 拷贝驱动到 /home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/logstash-core/lib/jars/mysql-connector-java-8.0.11.jar
    写个配置文件 | jdbc.conf
    input {
        jdbc {
          jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianping?useSSL=false"
          jdbc_user => "root"
          jdbc_password => "Jiangdi_2018"
          jdbc_driver_class => "com.mysql.jdbc.Driver"
    
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          statement_filepath => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/jdbc.sql"
          # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*,6 个 * 为每秒钟都更新
          schedule => "* * * * * *"
        }
    }
    
    output {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "shop"
            document_type => "_doc"
            document_id => "%{id}"
        }
        stdout {
            codec => json_lines
        }
    }
    
    定时任务要执行的 SQL 写在一个文件中 | jdbc.sql
    • 美化过的,最好写在一行里吧;
    SELECT 
        a.id,
        a.name,
        a.tags,
        CONCAT(a.latitude, ',', a.longitude) AS location,
        a.remark_score,
        a.price_per_man,
        a.category_id,
        b.name AS category_name,
        a.seller_id,
        c.remark_score AS seller_remark_score,
        c.disabled_flag AS seller_disabled_flag
    FROM
        shop a
            INNER JOIN
        category b ON a.category_id = b.id
            INNER JOIN
        seller c ON c.id = a.seller_id
    
    启动 logstash

    ./logstash -f mysql/jdbc.conf

    数据导入成功了,通过 ElasticSearch 的 API POST /shop/_search 查看 ;


    增量数据导入 | 基于 updated_at

    修改 jdbc.conf
    • 设置 JDBC 的时区;
    • 指定 last_run_metadata_path 的路径,用来记录 MySQL 中的数据,上一次更新时间的;
    input {
        jdbc {
          jdbc_default_timezone => "Asia/Shanghai"
          jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianping?useSSL=false"
          jdbc_user => "root"
          jdbc_password => "Jiangdi_2018"
          jdbc_driver_class => "com.mysql.jdbc.Driver"
    
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          last_run_metadata_path => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/last_value_meta"
          statement_filepath => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/jdbc.sql"
          # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*,6 个 * 为每秒钟都更新
          schedule => "* * * * *"
        }
    }
    
    output {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "shop"
            document_type => "_doc"
            document_id => "%{id}"
        }
        stdout {
            codec => json_lines
        }
    }
    
    在 last_value_meta 文件中设置初始值
    2010-11-11 11:11:11
    
    修改 jdbc.sql
    • 美化过的,最好写在一行里;
    SELECT 
        a.id,
        a.name,
        a.tags,
        CONCAT(a.latitude, ',', a.longitude) AS location,
        a.remark_score,
        a.price_per_man,
        a.category_id,
        b.name AS category_name,
        a.seller_id,
        c.remark_score AS seller_remark_score,
        c.disabled_flag AS seller_disabled_flag
    FROM
        shop a
            INNER JOIN
        category b ON a.category_id = b.id
            INNER JOIN
        seller c ON c.id = a.seller_id
    WHERE a.updated_at > :sql_last_value or b.updated_at > :sql_last_value or c.updated_at > :sql_last_value
    
    运行 logstash

    ./logstash -f mysql/jdbc.conf;

    在 MySQL 中修改了数据之后,1 分钟内,会更新到 ElasticSearch 中;

    增量数据导入 | 基于 updated_at | 缺陷
    • 如果 1 分钟内,MySQL 中有很多数据的更新,超过了 Logstash 和 ElasticSearch 的处理能力,延迟将无限制的放大;
    • 不管跑批的间隔设置的多小,仍然会有延迟;

    相关文章

      网友评论

          本文标题:将 MySQL 中的业务数据接入到 ElasticSearch

          本文链接:https://www.haomeiwen.com/subject/dtvrtktx.html