Rollover Pattern 索引滚动
- 为什么要使用滚动索引
比如现在有一个要实时记录用户定位信息的需求,每隔5秒钟就要收集一次。
这时我们将数据存储在ES中。
而我们索引只有一个,伴随着数据越来越多,索引的读写肯定会越来越吃力。
这时我们就可以将索引拆开,将一个索引拆分为多个索引。
这种拆分可以利用索引模板来实现,按照时间创建索引可以使用Rollover 来实现。
rollover使您可以根据索引大小,文档数或使用期限自动过渡到新索引。
当rollover触发后,将创建新索引,写别名(write alias)将更新为指向新索引,所有后续更新都将写入新索引。
注意:对于基于时间的rollover来说,基于大小,文档数或使用期限过渡至新索引是比较适合的。
基于时间的rollover通常会导致许多小的索引,这可能会对性能和资源使用产生负面影响。
对于Rollover后的历史数据,可能已经没有了价值,我们不得不进行删除,但是有一些数据可能对于分析很有用处
ES在6.3之后引入了下面的新功能用来处理历史数据的问题
- 以紧凑的聚合格式保存旧数据
- 仅保存您感兴趣的数据这两项功能
2.创建流程
- 创建索引模板,此时定义模板的别名为zz_user_location_search
curl -XPUT "http://host:port/_template/zz_user_location" -H 'Content-Type: application/json' -d'
{
"index_patterns": "zz_user_location*",
"settings": {
"index":{
"max_result_window": "1000000"
},
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"location": {
"properties": {
"jd": {
"type": "keyword"
},
"wd": {
"type": "keyword"
},
"wz": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"device": {
"type": "keyword"
},
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
},
"aliases": {
"zz_user_location_search": {}
}
}'
- 创建索引并给当前索引创建写别名
PUT /zz_user_location_20200909
{
"aliases": {
"zz_user_location_save": {}
}
}
- 在写别名上执行新增操作
POST /zz_user_location_save/location
{
"jd":"555"
}
- 查询索引拥有的别名
GET /zz_user_location_20200909/_alias/*
--------------result-----------------
{
"zz_user_location_20200909" : {
"aliases" : {
"zz_user_location_save" : { },
"zz_user_location_search" : { }
}
}
}
- 此时新增数据,两个索引均能查询出数据。比如我们设置当zz_user_location_20200909索引的文档数量大于等于5条或者时间大于等于7天或者索引容量大于等于1个G滚动新索引
dry_run只是测试是否满足条件,并不是真的运行
POST /zz_user_location_save/_rollover/zz_user_location_20200910?dry_run
{
"conditions": {
"max_age": "7d",
"max_docs": 2,
"max_size": "1gb"
}
}
-------------result--------------
"dry_run" 是false,证明不满足条件,如果其中一个满足条件,那么就是true了
{
"acknowledged" : false,
"shards_acknowledged" : false,
"old_index" : "zz_user_location_20200909",
"new_index" : "zz_user_location_20200910",
"rolled_over" : false,
"dry_run" : false,
"conditions" : {
"[max_size: 1gb]" : false,
"[max_docs: 2]" : false,
"[max_age: 7d]" : false
}
}
- 如果其中一个满足条件,执行索引滚动
POST zz_user_location_save/_rollover/zz_user_location_20200910
{
"conditions": {
"max_age": "7d",
"max_docs": 5,
"max_size": "1gb"
}
}
-------------result--------------
"rolled_over" 是true,证明滚动成功
{
"acknowledged" : true,
"shards_acknowledged" : true,
"old_index" : "zz_user_location_20200909",
"new_index" : "zz_user_location_20200910",
"rolled_over" : true,
"dry_run" : false,
"conditions" : {
"[max_size: 1gb]" : false,
"[max_docs: 5]" : true,
"[max_age: 7d]" : false
}
}
-此时在zz_user_location_save新增一条数据,查询zz_user_location_search,查询的文档分布在不同的索引上
{
"_index" : "zz_user_location_20200909",
"_type" : "location",
"_id" : "2GCKdXQBHICozJThgsAb",
"_score" : 1.0,
"_source" : {
"jd" : "555"
}
},
{
"_index" : "zz_user_location_20200910",
"_type" : "location",
"_id" : "d2CPdXQBHICozJThG8X0",
"_score" : 1.0,
"_source" : {
"jd" : "555"
}
}
创建模板和索引
- sh脚本
#!/bin/bash
TIME=$(date "+%Y%m%d")
#ES_URL
ES_URL=$1
#模板前缀
TEMPLATE_PREFIX="zz_user_location"
#初始化写索引名称
INIT_INDEX="${TEMPLATE_PREFIX}_${TIME}"
#写别名
INDEX_SAVE_ALIASE="${TEMPLATE_PREFIX}_save"
#读别名
INDEX_SEARCH_ALIASE="${TEMPLATE_PREFIX}_search"
#获取模板
GET_TEMPLATE_RESULT=$(curl -XGET ${ES_URL}/_template/${TEMPLATE_PREFIX})
#创建模板
createTemplate() {
RESULT=$(curl -XPUT ${ES_URL}"/_template/${TEMPLATE_PREFIX}" -H 'Content-Type: application/json' -d'
{
"index_patterns": "'${TEMPLATE_PREFIX}'*",
"settings": {
"index": {
"max_result_window": "1000000"
},
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"location": {
"properties": {
"user_name": {
"type": "keyword"
},
"jd": {
"type": "keyword"
},
"wd": {
"type": "keyword"
},
"wz": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"device": {
"type": "keyword"
},
"time": {
"type": "date",
"format": "yyyy-MM-dd||epoch_millis"
},
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
},
"aliases": {
"'${INDEX_SEARCH_ALIASE}'": {}
}
}')
echo "$RESULT"
}
#初始化索引
initTemplateIndex() {
RESULT=$(curl -XPUT "$ES_URL/${INIT_INDEX}" -H 'Content-Type: application/json' -d'
{
"aliases": {
"'${INDEX_SAVE_ALIASE}'": {}
}
}')
echo "$RESULT"
}
if test "$GET_TEMPLATE_RESULT" = "{}"; then
echo '模板未创建!'
echo '开始创建模板!'
CREATETEMPLATE_RESULT=$(createTemplate)
if test "$CREATETEMPLATE_RESULT"="{"acknowledged":true}"; then
echo '模板创建成功!'
INITTEMPLATE_RESULT=$(initTemplateIndex)
echo "result${INITTEMPLATE_RESULT}"
fi
else
echo '模板已创建!'
fi
- shellutil工具类
public class ShellUtil {
private static Logger logger = LoggerFactory.getLogger(ShellUtil.class);
/**
* 执行shell脚本
*
* @param shell
* @return
*/
public static List<String> runShell(String shell) {
Process process;
List<String> processList = new ArrayList<>();
try {
process = Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", shell});
try {
int exitValue = process.waitFor();
if (0 != exitValue) {
logger.error("call shell failed. error code is :" + exitValue);
return processList;
}
} catch (InterruptedException e) {
e.printStackTrace();
return processList;
}
BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = "";
while ((line = input.readLine()) != null) {
processList.add(line);
}
input.close();
} catch (IOException e) {
e.printStackTrace();
}
return processList;
}
}
- 初始化脚本的方法
String basePath;
try {
basePath = getBasePath();
String userLocationEsShPath = basePath + File.separator + "bin/userlocation/es_init.sh";
List<String> list = ShellUtil.runShell(userLocationEsShPath + " " + esSetting.getEsHttpHost());
for (String s : list) {
if (s.startsWith("result")) {
logger.info("脚本索引创建返回值为{}", s.substring(6));
logger.info("{}脚本执行成功", "userlocation");
}
logger.info(s);
}
} catch (IOException e) {
logger.error("userlocation脚本执行失败", e);
}
满足一定条件后执行滚动
- sh脚本
#!/bin/bash
TIME=$(date "+%Y%m%d")
#最长时间
max_age=$1
#最大数量
max_docs=$2
#url
url=$3
curl -XPOST "${url}/zz_user_location_save/_rollover/zz_user_location_${TIME}" -H 'Content-Type: application/json' -d'
{
"conditions": {
"max_age": "'${max_age}'",
"max_docs": '${max_docs}'
}
}'
- java定时任务执行方法
try {
String basePath = new File("").getCanonicalPath();
String userLocationEsShPath = basePath + File.separator + "bin/userlocation/es_rollover.sh";
//param1 最长时间 param2 最大数量 param3 ES_URL
List<String> list = ShellUtil.runShell(userLocationEsShPath + " " + esSetting.getRolloverTime() + " " + esSetting.getRolloverSize() + " " + esSetting.getEsHttpHost());
if (CollectionUtils.isNotEmpty(list)) {
String s = list.get(0);
try {
JSONObject object = JSONObject.parseObject(s);
if (!object.getBoolean("rolled_over")) {
logger.info("userlocation索引未达滚动要求");
return;
}
logger.info("userlocation已滚动至新索引");
} catch (Exception e) {
logger.error("snapshot的返回json解析失败", e);
}
}
} catch (IOException e) {
logger.error("snapshot脚本执行失败", e);
}
网友评论