业务数据接入 | 概述
所谓业务数据接入,就是将业务操作中产生的,存储在 MySQL 中的数据,导入到 ElasticSearch 中。整个导入的过程,大致上可以分成三步:索引构建、全量数据导入、增量数据导入。
索引构建
- 在关系型数据库 MySQL 中,搜索涉及的数据可能分散在不同的表中;
- 在构建 ElasticSearch 中的索引的时候,一般会将分散在不同表中的数据,平铺到一个索引中;
- 技术上讲,索引的构建,就是设置索引的 mappings 和 settings;
全量数据导入
- 全量数据导入的粒度一般以天为单位,在业务低峰时段完成;
增量数据导入
- 增量数据导入的粒度一般以秒或分钟为单位,实时的将业务操作对 MySQL 的更新同步到 ElasticSearch 中;
索引构建 | 考量
索引要包含哪些字段
- 根据搜索需要的字段,排序需要的字段等维度去决定需要哪些字段;
索引中字段的类型
- 哪些字段需要用 text 类型分词,从而被搜索引擎感知;
- 哪些字段需要做排序;
- 哪些字段是地理位置信息;
- 字段索引时的 Analyzer 和搜索时的 Analyzer 分别用什么;
索引构建 | 门店
涉及到 MySQL 中 shop,seller,category 三张表。
PUT /shop
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id":{"type": "integer"},
"name":{"type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart"},
"tags":{"type": "text", "analyzer": "whitespace", "fielddata": true},
"location":{"type": "geo_point"},
"remark_score":{"type": "double"},
"price_per_man":{"type": "integer"},
"category_id":{"type": "integer"},
"category_name":{"type":"keyword"},
"seller_id":{"type": "integer"},
"seller_remark_score":{"type":"double"},
"seller_disabled_flag":{"type":"integer"}
}
}
}
全量数据导入
安装 logstash-input-jdbc
- 默认情况下的话,这个插件是安装了的;
- 在使用命令
bin/logstash-plugins list
的时候,有时候会报错Package jdk.internal.jimage.decompressor in both module java.base and module jtr.fs
,解决方案是把 jdk 目录下的 jrt-fs.jar 删掉;
自备 MySQL 驱动
- 拷贝驱动到
/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/logstash-core/lib/jars/mysql-connector-java-8.0.11.jar
;
写个配置文件 | jdbc.conf
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianping?useSSL=false"
jdbc_user => "root"
jdbc_password => "Jiangdi_2018"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*,6 个 * 为每秒钟都更新
schedule => "* * * * * *"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "shop"
document_type => "_doc"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
定时任务要执行的 SQL 写在一个文件中 | jdbc.sql
- 美化过的,最好写在一行里吧;
SELECT
a.id,
a.name,
a.tags,
CONCAT(a.latitude, ',', a.longitude) AS location,
a.remark_score,
a.price_per_man,
a.category_id,
b.name AS category_name,
a.seller_id,
c.remark_score AS seller_remark_score,
c.disabled_flag AS seller_disabled_flag
FROM
shop a
INNER JOIN
category b ON a.category_id = b.id
INNER JOIN
seller c ON c.id = a.seller_id
启动 logstash
./logstash -f mysql/jdbc.conf
;
数据导入成功了,通过 ElasticSearch 的 API POST /shop/_search
查看 ;
增量数据导入 | 基于 updated_at
修改 jdbc.conf
- 设置 JDBC 的时区;
- 指定 last_run_metadata_path 的路径,用来记录 MySQL 中的数据,上一次更新时间的;
input {
jdbc {
jdbc_default_timezone => "Asia/Shanghai"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianping?useSSL=false"
jdbc_user => "root"
jdbc_password => "Jiangdi_2018"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
last_run_metadata_path => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/last_value_meta"
statement_filepath => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*,6 个 * 为每秒钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "shop"
document_type => "_doc"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
在 last_value_meta 文件中设置初始值
2010-11-11 11:11:11
修改 jdbc.sql
- 美化过的,最好写在一行里;
SELECT
a.id,
a.name,
a.tags,
CONCAT(a.latitude, ',', a.longitude) AS location,
a.remark_score,
a.price_per_man,
a.category_id,
b.name AS category_name,
a.seller_id,
c.remark_score AS seller_remark_score,
c.disabled_flag AS seller_disabled_flag
FROM
shop a
INNER JOIN
category b ON a.category_id = b.id
INNER JOIN
seller c ON c.id = a.seller_id
WHERE a.updated_at > :sql_last_value or b.updated_at > :sql_last_value or c.updated_at > :sql_last_value
运行 logstash
./logstash -f mysql/jdbc.conf
;
在 MySQL 中修改了数据之后,1 分钟内,会更新到 ElasticSearch 中;
增量数据导入 | 基于 updated_at | 缺陷
- 如果 1 分钟内,MySQL 中有很多数据的更新,超过了 Logstash 和 ElasticSearch 的处理能力,延迟将无限制的放大;
- 不管跑批的间隔设置的多小,仍然会有延迟;
网友评论