使用工具 :
- elasticdump
- logstash
初步方案:
1. 直接通过logstash 进行不同集群之间的数据迁移。
优点 :处理方便,没有繁琐步骤。
缺点 :同一版本集群,或集群版本相差不大
2. 可以通过elasticdump 进行数据导出,再进行logtash 将导出数据写入目标集群。
优点:跨版本导数据不易出现异常。频繁读写,不会影响线上集群服务 。
缺点:分段操作较繁琐,占用本地磁盘
3. 导到目标集群前,需先建好索引及索引配置,可以通过模板预建立
准备步骤:
logstash 下载
下载地址:https://www.elastic.co/cn/downloads/logstash
环境准备 :
环境 | 备注 |
---|---|
JDK 8 或 11 | 需要配置环境jdk 为 jdk8 或 jdk11 |
ruby | 如需要通过脚本做特属于处理需要安装,不做处理可以不安装 |
logstash 导数据 elasticsearch:
input {
elasticsearch {
hosts => "ip:port"
index => "index_name"
query => '{ "query": { "match_all":{ }} }'
size => 500
scroll => "5m"
docinfo => true
codec => json
}
}
filter {
mutate {
rename => {
# 将字段http_referer重命名为http_source
"createTime" => "created_at"
"likeNum" => "like_counter"
}
# logstash 导出数据会产生一些特殊字段
remove_tag => ["_grokparsefailure"]
remove_field => ["@timestamp" , "@version"]
}
# 导出数据可以添加一些过滤条件
if[delete] == 1 {
drop {}
}
}
# 输出到目标集群
output {
elasticsearch {
hosts => ["target_ip:target_port"]
action => "update" #upsert 有则更新,无则插入
document_type => "_doc" # 索引type
document_id => "%{[@metadata][_id]}" # 索引对应的id 字段
index => "target_index_name" # 目标索引名称
doc_as_upsert => true
}
}
elasticdump 下载
安装准备:
一 、
安装NodeJS
下载源码:wget http://nodejs.org/dist/v0.10.32/node-v0.10.32-linux-x64.tar.gz
解压:tar xvf node-v0.10.22-linux-x64.tar.gz
二 、
安装NPM
curl -L https://npmjs.org/install.sh | sh**
三、
安装elasticdump
npm install elasticdump -g
数据导出:
elasticdump 导出数据脚本:
elasticdump --input=ip:port/index_name --output=file_path --type=data --sourceOnly=true --ignore-errors=true
参数说明 :
--ignore-errors=true :忽略异常造成的中断
--sourceOnly=true : 只导出source 下的数据
其余参数参考官方文档:https://github.com/taskrabbit/elasticsearch-dump
logstash 通过文件导入数据:
input {
file{
path => ["file_path"]
codec => "json"
start_position => "beginning"
}
}
filter {
mutate {
remove_tag => ["_grokparsefailure"]
remove_field => ["@timestamp" , "@version", "host", "path"]
}
## 通过ruby 进行特殊字段添加处理
ruby {
code => ' time = event.get("createTime") ;
likeNum = event.get("likeNum");
if !time.nil?
event.set("create_time_range",Time.at(Integer(time)).strftime("%Y-%m"))
end
if !likeNum.nil?
if Integer(likeNum)>300
event.set("like_range","1")
end
end
'
}
}
output {
elasticsearch {
hosts => ["target_ip:target_port"]
action => "update"
document_type => "index_type"
document_id => "%{id}"
index => "index_name"
doc_as_upsert => true
}
}
备注:
logstash 多次使用后,需要清理游标,否则会导致数据写入不进去。
-
清理sincedb_path
image.png
sincedb_path :表示文件读取进度的记录,每行表示一个文件,每行有两个数字
-
清理logstash/data/ 目录下隐藏文件 .lock
网友评论