美文网首页
利用Rollover Api来进行索引管理(上)

利用Rollover Api来进行索引管理(上)

作者: 小周写程序 | 来源:发表于2020-09-10 09:14 被阅读0次

    Rollover Pattern 索引滚动

    1. 为什么要使用滚动索引
      比如现在有一个要实时记录用户定位信息的需求,每隔5秒钟就要收集一次。
      这时我们将数据存储在ES中。
      而我们索引只有一个,伴随着数据越来越多,索引的读写肯定会越来越吃力。
      这时我们就可以将索引拆开,将一个索引拆分为多个索引。
      这种拆分可以利用索引模板来实现,按照时间创建索引可以使用Rollover 来实现。
      rollover使您可以根据索引大小,文档数或使用期限自动过渡到新索引。
      当rollover触发后,将创建新索引,写别名(write alias)将更新为指向新索引,所有后续更新都将写入新索引。
      注意:对于基于时间的rollover来说,基于大小,文档数或使用期限过渡至新索引是比较适合的。
      基于时间的rollover通常会导致许多小的索引,这可能会对性能和资源使用产生负面影响。

      对于Rollover后的历史数据,可能已经没有了价值,我们不得不进行删除,但是有一些数据可能对于分析很有用处
      ES在6.3之后引入了下面的新功能用来处理历史数据的问题
    • 以紧凑的聚合格式保存旧数据
    • 仅保存您感兴趣的数据这两项功能

    2.创建流程

    • 创建索引模板,此时定义模板的别名为zz_user_location_search
    curl -XPUT "http://host:port/_template/zz_user_location" -H 'Content-Type: application/json' -d'
    {
      "index_patterns": "zz_user_location*",
      "settings": {
        "index":{
          "max_result_window": "1000000"
        },
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "location": {
          "properties": {
            "jd": {
              "type": "keyword"
            },
            "wd": {
              "type": "keyword"
            },
            "wz": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "device": {
              "type": "keyword"
            },
            "create_time": {
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
            }
          }
        }
      },
      "aliases": {
        "zz_user_location_search": {}
      }
    }'
    
    • 创建索引并给当前索引创建写别名
    PUT /zz_user_location_20200909
    {
      "aliases": {
        "zz_user_location_save": {}
      }
    }
    
    • 在写别名上执行新增操作
    POST /zz_user_location_save/location
    {
      "jd":"555"
    }
    
    • 查询索引拥有的别名
    GET /zz_user_location_20200909/_alias/* 
    --------------result-----------------
    {
      "zz_user_location_20200909" : {
        "aliases" : {
          "zz_user_location_save" : { },
          "zz_user_location_search" : { }
        }
      }
    }
    
    • 此时新增数据,两个索引均能查询出数据。比如我们设置当zz_user_location_20200909索引的文档数量大于等于5条或者时间大于等于7天或者索引容量大于等于1个G滚动新索引
    dry_run只是测试是否满足条件,并不是真的运行
    POST /zz_user_location_save/_rollover/zz_user_location_20200910?dry_run
    {
      "conditions": {
        "max_age": "7d",
        "max_docs": 2,
        "max_size": "1gb"
      }
    }
    -------------result--------------
    "dry_run" 是false,证明不满足条件,如果其中一个满足条件,那么就是true了
    {
      "acknowledged" : false,
      "shards_acknowledged" : false,
      "old_index" : "zz_user_location_20200909",
      "new_index" : "zz_user_location_20200910",
      "rolled_over" : false,
      "dry_run" : false,
      "conditions" : {
        "[max_size: 1gb]" : false,
        "[max_docs: 2]" : false,
        "[max_age: 7d]" : false
      }
    }
    
    • 如果其中一个满足条件,执行索引滚动
    POST zz_user_location_save/_rollover/zz_user_location_20200910
    {
      "conditions": {
        "max_age":   "7d",
        "max_docs":  5,
        "max_size": "1gb"
      }
    }
    -------------result--------------
    "rolled_over" 是true,证明滚动成功
    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "old_index" : "zz_user_location_20200909",
      "new_index" : "zz_user_location_20200910",
      "rolled_over" : true,
      "dry_run" : false,
      "conditions" : {
        "[max_size: 1gb]" : false,
        "[max_docs: 5]" : true,
        "[max_age: 7d]" : false
      }
    }
    

    -此时在zz_user_location_save新增一条数据,查询zz_user_location_search,查询的文档分布在不同的索引上

        {
            "_index" : "zz_user_location_20200909",
            "_type" : "location",
            "_id" : "2GCKdXQBHICozJThgsAb",
            "_score" : 1.0,
            "_source" : {
              "jd" : "555"
            }
          },
          {
            "_index" : "zz_user_location_20200910",
            "_type" : "location",
            "_id" : "d2CPdXQBHICozJThG8X0",
            "_score" : 1.0,
            "_source" : {
              "jd" : "555"
            }
          }
    

    创建模板和索引

    • sh脚本
    #!/bin/bash
    TIME=$(date "+%Y%m%d")
    #ES_URL
    ES_URL=$1
    #模板前缀
    TEMPLATE_PREFIX="zz_user_location"
    #初始化写索引名称
    INIT_INDEX="${TEMPLATE_PREFIX}_${TIME}"
    #写别名
    INDEX_SAVE_ALIASE="${TEMPLATE_PREFIX}_save"
    #读别名
    INDEX_SEARCH_ALIASE="${TEMPLATE_PREFIX}_search"
    
    #获取模板
    GET_TEMPLATE_RESULT=$(curl -XGET ${ES_URL}/_template/${TEMPLATE_PREFIX})
    
    #创建模板
    createTemplate() {
      RESULT=$(curl -XPUT ${ES_URL}"/_template/${TEMPLATE_PREFIX}" -H 'Content-Type: application/json' -d'
    {
      "index_patterns": "'${TEMPLATE_PREFIX}'*",
      "settings": {
        "index": {
          "max_result_window": "1000000"
        },
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "location": {
          "properties": {
            "user_name": {
              "type": "keyword"
            },
            "jd": {
              "type": "keyword"
            },
            "wd": {
              "type": "keyword"
            },
            "wz": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "device": {
              "type": "keyword"
            },
            "time": {
              "type": "date",
              "format": "yyyy-MM-dd||epoch_millis"
            },
            "create_time": {
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
            }
          }
        }
      },
      "aliases": {
        "'${INDEX_SEARCH_ALIASE}'": {}
      }
    }')
      echo "$RESULT"
    }
    
    #初始化索引
    initTemplateIndex() {
      RESULT=$(curl -XPUT "$ES_URL/${INIT_INDEX}" -H 'Content-Type: application/json' -d'
    {
      "aliases": {
        "'${INDEX_SAVE_ALIASE}'": {}
      }
    }')
      echo "$RESULT"
    }
    
    if test "$GET_TEMPLATE_RESULT" = "{}"; then
      echo '模板未创建!'
      echo '开始创建模板!'
      CREATETEMPLATE_RESULT=$(createTemplate)
      if test "$CREATETEMPLATE_RESULT"="{"acknowledged":true}"; then
        echo '模板创建成功!'
        INITTEMPLATE_RESULT=$(initTemplateIndex)
        echo "result${INITTEMPLATE_RESULT}"
      fi
    else
      echo '模板已创建!'
    fi
    
    • shellutil工具类
    public class ShellUtil {
        private static Logger logger = LoggerFactory.getLogger(ShellUtil.class);
    
        /**
         * 执行shell脚本
         *
         * @param shell
         * @return
         */
        public static List<String> runShell(String shell) {
            Process process;
            List<String> processList = new ArrayList<>();
            try {
                process = Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", shell});
                try {
                    int exitValue = process.waitFor();
                    if (0 != exitValue) {
                        logger.error("call shell failed. error code is :" + exitValue);
                        return processList;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return processList;
                }
                BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream()));
                String line = "";
                while ((line = input.readLine()) != null) {
                    processList.add(line);
                }
                input.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return processList;
        }
    }
    
    • 初始化脚本的方法
     String basePath;
            try {
                basePath = getBasePath();
                String userLocationEsShPath = basePath + File.separator + "bin/userlocation/es_init.sh";
                List<String> list = ShellUtil.runShell(userLocationEsShPath + " " + esSetting.getEsHttpHost());
                for (String s : list) {
                    if (s.startsWith("result")) {
                        logger.info("脚本索引创建返回值为{}", s.substring(6));
                        logger.info("{}脚本执行成功", "userlocation");
                    }
                    logger.info(s);
                }
            } catch (IOException e) {
                logger.error("userlocation脚本执行失败", e);
            }
    

    满足一定条件后执行滚动

    • sh脚本
    #!/bin/bash
    TIME=$(date "+%Y%m%d")
    #最长时间
    max_age=$1
    #最大数量
    max_docs=$2
    #url
    url=$3
    curl -XPOST "${url}/zz_user_location_save/_rollover/zz_user_location_${TIME}" -H 'Content-Type: application/json' -d'
    {
      "conditions": {
        "max_age": "'${max_age}'",
        "max_docs": '${max_docs}'
      }
    }'
    
    • java定时任务执行方法
    try {
                String basePath = new File("").getCanonicalPath();
                String userLocationEsShPath = basePath + File.separator + "bin/userlocation/es_rollover.sh";
                //param1 最长时间 param2 最大数量 param3 ES_URL
                List<String> list = ShellUtil.runShell(userLocationEsShPath + " " + esSetting.getRolloverTime() + " " + esSetting.getRolloverSize() + " " + esSetting.getEsHttpHost());
                if (CollectionUtils.isNotEmpty(list)) {
                    String s = list.get(0);
                    try {
                        JSONObject object = JSONObject.parseObject(s);
                        if (!object.getBoolean("rolled_over")) {
                            logger.info("userlocation索引未达滚动要求");
                            return;
                        }
                        logger.info("userlocation已滚动至新索引");
                    } catch (Exception e) {
                        logger.error("snapshot的返回json解析失败", e);
                    }
                }
            } catch (IOException e) {
                logger.error("snapshot脚本执行失败", e);
            }
    

    相关文章

      网友评论

          本文标题:利用Rollover Api来进行索引管理(上)

          本文链接:https://www.haomeiwen.com/subject/ihieektx.html