美文网首页
将 MySQL 中的业务数据接入到 ElasticSearch

将 MySQL 中的业务数据接入到 ElasticSearch

作者: 乌鲁木齐001号程序员 | 来源:发表于2020-06-11 16:02 被阅读0次

业务数据接入 | 概述

所谓业务数据接入,就是将业务操作中产生的,存储在 MySQL 中的数据,导入到 ElasticSearch 中。整个导入的过程,大致上可以分成三步:索引构建、全量数据导入、增量数据导入。

索引构建
  • 在关系型数据库 MySQL 中,搜索涉及的数据可能分散在不同的表中;
  • 在构建 ElasticSearch 中的索引的时候,一般会将分散在不同表中的数据,平铺到一个索引中;
  • 技术上讲,索引的构建,就是设置索引的 mappings 和 settings;
全量数据导入
  • 全量数据导入的粒度一般以天为单位,在业务低峰时段完成;
增量数据导入
  • 增量数据导入的粒度一般以秒或分钟为单位,实时的将业务操作对 MySQL 的更新同步到 ElasticSearch 中;

索引构建 | 考量

索引要包含哪些字段
  • 根据搜索需要的字段,排序需要的字段等维度去决定需要哪些字段;
索引中字段的类型
  • 哪些字段需要用 text 类型分词,从而被搜索引擎感知;
  • 哪些字段需要做排序;
  • 哪些字段是地理位置信息;
  • 字段索引时的 Analyzer 和搜索时的 Analyzer 分别用什么;

索引构建 | 门店

涉及到 MySQL 中 shop,seller,category 三张表。

PUT /shop
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id":{"type": "integer"},
      "name":{"type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart"},
      "tags":{"type": "text", "analyzer": "whitespace", "fielddata": true},
      "location":{"type": "geo_point"},
      "remark_score":{"type": "double"},
      "price_per_man":{"type": "integer"},
      "category_id":{"type": "integer"},
      "category_name":{"type":"keyword"},
      "seller_id":{"type": "integer"},
      "seller_remark_score":{"type":"double"},
      "seller_disabled_flag":{"type":"integer"}
    }
  }
}

全量数据导入

安装 logstash-input-jdbc
  • 默认情况下的话,这个插件是安装了的;
  • 在使用命令 bin/logstash-plugins list 的时候,有时候会报错 Package jdk.internal.jimage.decompressor in both module java.base and module jtr.fs,解决方案是把 jdk 目录下的 jrt-fs.jar 删掉;
自备 MySQL 驱动
  • 拷贝驱动到 /home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/logstash-core/lib/jars/mysql-connector-java-8.0.11.jar
写个配置文件 | jdbc.conf
input {
    jdbc {
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianping?useSSL=false"
      jdbc_user => "root"
      jdbc_password => "Jiangdi_2018"
      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement_filepath => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/jdbc.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*,6 个 * 为每秒钟都更新
      schedule => "* * * * * *"
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "shop"
        document_type => "_doc"
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}
定时任务要执行的 SQL 写在一个文件中 | jdbc.sql
  • 美化过的,最好写在一行里吧;
SELECT 
    a.id,
    a.name,
    a.tags,
    CONCAT(a.latitude, ',', a.longitude) AS location,
    a.remark_score,
    a.price_per_man,
    a.category_id,
    b.name AS category_name,
    a.seller_id,
    c.remark_score AS seller_remark_score,
    c.disabled_flag AS seller_disabled_flag
FROM
    shop a
        INNER JOIN
    category b ON a.category_id = b.id
        INNER JOIN
    seller c ON c.id = a.seller_id
启动 logstash

./logstash -f mysql/jdbc.conf

数据导入成功了,通过 ElasticSearch 的 API POST /shop/_search 查看 ;


增量数据导入 | 基于 updated_at

修改 jdbc.conf
  • 设置 JDBC 的时区;
  • 指定 last_run_metadata_path 的路径,用来记录 MySQL 中的数据,上一次更新时间的;
input {
    jdbc {
      jdbc_default_timezone => "Asia/Shanghai"
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianping?useSSL=false"
      jdbc_user => "root"
      jdbc_password => "Jiangdi_2018"
      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      last_run_metadata_path => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/last_value_meta"
      statement_filepath => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/jdbc.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*,6 个 * 为每秒钟都更新
      schedule => "* * * * *"
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "shop"
        document_type => "_doc"
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}
在 last_value_meta 文件中设置初始值
2010-11-11 11:11:11
修改 jdbc.sql
  • 美化过的,最好写在一行里;
SELECT 
    a.id,
    a.name,
    a.tags,
    CONCAT(a.latitude, ',', a.longitude) AS location,
    a.remark_score,
    a.price_per_man,
    a.category_id,
    b.name AS category_name,
    a.seller_id,
    c.remark_score AS seller_remark_score,
    c.disabled_flag AS seller_disabled_flag
FROM
    shop a
        INNER JOIN
    category b ON a.category_id = b.id
        INNER JOIN
    seller c ON c.id = a.seller_id
WHERE a.updated_at > :sql_last_value or b.updated_at > :sql_last_value or c.updated_at > :sql_last_value
运行 logstash

./logstash -f mysql/jdbc.conf;

在 MySQL 中修改了数据之后,1 分钟内,会更新到 ElasticSearch 中;

增量数据导入 | 基于 updated_at | 缺陷
  • 如果 1 分钟内,MySQL 中有很多数据的更新,超过了 Logstash 和 ElasticSearch 的处理能力,延迟将无限制的放大;
  • 不管跑批的间隔设置的多小,仍然会有延迟;

相关文章

网友评论

      本文标题:将 MySQL 中的业务数据接入到 ElasticSearch

      本文链接:https://www.haomeiwen.com/subject/dtvrtktx.html