美文网首页
利用Rollover Api来进行索引管理(上)

利用Rollover Api来进行索引管理(上)

作者: 小周写程序 | 来源:发表于2020-09-10 09:14 被阅读0次

Rollover Pattern 索引滚动

  1. 为什么要使用滚动索引
    比如现在有一个要实时记录用户定位信息的需求,每隔5秒钟就要收集一次。
    这时我们将数据存储在ES中。
    而我们索引只有一个,伴随着数据越来越多,索引的读写肯定会越来越吃力。
    这时我们就可以将索引拆开,将一个索引拆分为多个索引。
    这种拆分可以利用索引模板来实现,按照时间创建索引可以使用Rollover 来实现。
    rollover使您可以根据索引大小,文档数或使用期限自动过渡到新索引。
    当rollover触发后,将创建新索引,写别名(write alias)将更新为指向新索引,所有后续更新都将写入新索引。
    注意:对于基于时间的rollover来说,基于大小,文档数或使用期限过渡至新索引是比较适合的。
    基于时间的rollover通常会导致许多小的索引,这可能会对性能和资源使用产生负面影响。

    对于Rollover后的历史数据,可能已经没有了价值,我们不得不进行删除,但是有一些数据可能对于分析很有用处
    ES在6.3之后引入了下面的新功能用来处理历史数据的问题
  • 以紧凑的聚合格式保存旧数据
  • 仅保存您感兴趣的数据这两项功能

2.创建流程

  • 创建索引模板,此时定义模板的别名为zz_user_location_search
curl -XPUT "http://host:port/_template/zz_user_location" -H 'Content-Type: application/json' -d'
{
  "index_patterns": "zz_user_location*",
  "settings": {
    "index":{
      "max_result_window": "1000000"
    },
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "location": {
      "properties": {
        "jd": {
          "type": "keyword"
        },
        "wd": {
          "type": "keyword"
        },
        "wz": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "device": {
          "type": "keyword"
        },
        "create_time": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
        }
      }
    }
  },
  "aliases": {
    "zz_user_location_search": {}
  }
}'
  • 创建索引并给当前索引创建写别名
PUT /zz_user_location_20200909
{
  "aliases": {
    "zz_user_location_save": {}
  }
}
  • 在写别名上执行新增操作
POST /zz_user_location_save/location
{
  "jd":"555"
}
  • 查询索引拥有的别名
GET /zz_user_location_20200909/_alias/* 
--------------result-----------------
{
  "zz_user_location_20200909" : {
    "aliases" : {
      "zz_user_location_save" : { },
      "zz_user_location_search" : { }
    }
  }
}
  • 此时新增数据,两个索引均能查询出数据。比如我们设置当zz_user_location_20200909索引的文档数量大于等于5条或者时间大于等于7天或者索引容量大于等于1个G滚动新索引
dry_run只是测试是否满足条件,并不是真的运行
POST /zz_user_location_save/_rollover/zz_user_location_20200910?dry_run
{
  "conditions": {
    "max_age": "7d",
    "max_docs": 2,
    "max_size": "1gb"
  }
}
-------------result--------------
"dry_run" 是false,证明不满足条件,如果其中一个满足条件,那么就是true了
{
  "acknowledged" : false,
  "shards_acknowledged" : false,
  "old_index" : "zz_user_location_20200909",
  "new_index" : "zz_user_location_20200910",
  "rolled_over" : false,
  "dry_run" : false,
  "conditions" : {
    "[max_size: 1gb]" : false,
    "[max_docs: 2]" : false,
    "[max_age: 7d]" : false
  }
}
  • 如果其中一个满足条件,执行索引滚动
POST zz_user_location_save/_rollover/zz_user_location_20200910
{
  "conditions": {
    "max_age":   "7d",
    "max_docs":  5,
    "max_size": "1gb"
  }
}
-------------result--------------
"rolled_over" 是true,证明滚动成功
{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "old_index" : "zz_user_location_20200909",
  "new_index" : "zz_user_location_20200910",
  "rolled_over" : true,
  "dry_run" : false,
  "conditions" : {
    "[max_size: 1gb]" : false,
    "[max_docs: 5]" : true,
    "[max_age: 7d]" : false
  }
}

-此时在zz_user_location_save新增一条数据,查询zz_user_location_search,查询的文档分布在不同的索引上

    {
        "_index" : "zz_user_location_20200909",
        "_type" : "location",
        "_id" : "2GCKdXQBHICozJThgsAb",
        "_score" : 1.0,
        "_source" : {
          "jd" : "555"
        }
      },
      {
        "_index" : "zz_user_location_20200910",
        "_type" : "location",
        "_id" : "d2CPdXQBHICozJThG8X0",
        "_score" : 1.0,
        "_source" : {
          "jd" : "555"
        }
      }

创建模板和索引

  • sh脚本
#!/bin/bash
TIME=$(date "+%Y%m%d")
#ES_URL
ES_URL=$1
#模板前缀
TEMPLATE_PREFIX="zz_user_location"
#初始化写索引名称
INIT_INDEX="${TEMPLATE_PREFIX}_${TIME}"
#写别名
INDEX_SAVE_ALIASE="${TEMPLATE_PREFIX}_save"
#读别名
INDEX_SEARCH_ALIASE="${TEMPLATE_PREFIX}_search"

#获取模板
GET_TEMPLATE_RESULT=$(curl -XGET ${ES_URL}/_template/${TEMPLATE_PREFIX})

#创建模板
createTemplate() {
  RESULT=$(curl -XPUT ${ES_URL}"/_template/${TEMPLATE_PREFIX}" -H 'Content-Type: application/json' -d'
{
  "index_patterns": "'${TEMPLATE_PREFIX}'*",
  "settings": {
    "index": {
      "max_result_window": "1000000"
    },
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "location": {
      "properties": {
        "user_name": {
          "type": "keyword"
        },
        "jd": {
          "type": "keyword"
        },
        "wd": {
          "type": "keyword"
        },
        "wz": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "device": {
          "type": "keyword"
        },
        "time": {
          "type": "date",
          "format": "yyyy-MM-dd||epoch_millis"
        },
        "create_time": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
        }
      }
    }
  },
  "aliases": {
    "'${INDEX_SEARCH_ALIASE}'": {}
  }
}')
  echo "$RESULT"
}

#初始化索引
initTemplateIndex() {
  RESULT=$(curl -XPUT "$ES_URL/${INIT_INDEX}" -H 'Content-Type: application/json' -d'
{
  "aliases": {
    "'${INDEX_SAVE_ALIASE}'": {}
  }
}')
  echo "$RESULT"
}

if test "$GET_TEMPLATE_RESULT" = "{}"; then
  echo '模板未创建!'
  echo '开始创建模板!'
  CREATETEMPLATE_RESULT=$(createTemplate)
  if test "$CREATETEMPLATE_RESULT"="{"acknowledged":true}"; then
    echo '模板创建成功!'
    INITTEMPLATE_RESULT=$(initTemplateIndex)
    echo "result${INITTEMPLATE_RESULT}"
  fi
else
  echo '模板已创建!'
fi
  • shellutil工具类
public class ShellUtil {
    private static Logger logger = LoggerFactory.getLogger(ShellUtil.class);

    /**
     * 执行shell脚本
     *
     * @param shell
     * @return
     */
    public static List<String> runShell(String shell) {
        Process process;
        List<String> processList = new ArrayList<>();
        try {
            process = Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", shell});
            try {
                int exitValue = process.waitFor();
                if (0 != exitValue) {
                    logger.error("call shell failed. error code is :" + exitValue);
                    return processList;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                return processList;
            }
            BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line = "";
            while ((line = input.readLine()) != null) {
                processList.add(line);
            }
            input.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return processList;
    }
}
  • 初始化脚本的方法
 String basePath;
        try {
            basePath = getBasePath();
            String userLocationEsShPath = basePath + File.separator + "bin/userlocation/es_init.sh";
            List<String> list = ShellUtil.runShell(userLocationEsShPath + " " + esSetting.getEsHttpHost());
            for (String s : list) {
                if (s.startsWith("result")) {
                    logger.info("脚本索引创建返回值为{}", s.substring(6));
                    logger.info("{}脚本执行成功", "userlocation");
                }
                logger.info(s);
            }
        } catch (IOException e) {
            logger.error("userlocation脚本执行失败", e);
        }

满足一定条件后执行滚动

  • sh脚本
#!/bin/bash
TIME=$(date "+%Y%m%d")
#最长时间
max_age=$1
#最大数量
max_docs=$2
#url
url=$3
curl -XPOST "${url}/zz_user_location_save/_rollover/zz_user_location_${TIME}" -H 'Content-Type: application/json' -d'
{
  "conditions": {
    "max_age": "'${max_age}'",
    "max_docs": '${max_docs}'
  }
}'
  • java定时任务执行方法
try {
            String basePath = new File("").getCanonicalPath();
            String userLocationEsShPath = basePath + File.separator + "bin/userlocation/es_rollover.sh";
            //param1 最长时间 param2 最大数量 param3 ES_URL
            List<String> list = ShellUtil.runShell(userLocationEsShPath + " " + esSetting.getRolloverTime() + " " + esSetting.getRolloverSize() + " " + esSetting.getEsHttpHost());
            if (CollectionUtils.isNotEmpty(list)) {
                String s = list.get(0);
                try {
                    JSONObject object = JSONObject.parseObject(s);
                    if (!object.getBoolean("rolled_over")) {
                        logger.info("userlocation索引未达滚动要求");
                        return;
                    }
                    logger.info("userlocation已滚动至新索引");
                } catch (Exception e) {
                    logger.error("snapshot的返回json解析失败", e);
                }
            }
        } catch (IOException e) {
            logger.error("snapshot脚本执行失败", e);
        }

相关文章

网友评论

      本文标题:利用Rollover Api来进行索引管理(上)

      本文链接:https://www.haomeiwen.com/subject/ihieektx.html