美文网首页玩转大数据大数据大数据&云计算
基于SCF实现Elasticsearch索引的批量shrink

基于SCF实现Elasticsearch索引的批量shrink

作者: bellengao | 来源:发表于2020-07-31 19:31 被阅读0次

    在冷热分离的Elasticsearch集群架构中,往往会使用拥有较高规格cpu和内存、以及SSD盘的机器作为热节点,用于保证高并发的写入。通常索引都是按天创建的,只有当天的索引会进行写入,存量的索引会定期比如说在索引创建15天后迁移到冷节点或者warm节点上,冷节点或者warm节点的cpu和内存配置都可以低一些,并且使用SATA盘存储降低成本。

    在热节点上的索引,为了保证写入性能,通常分片数会设置的和热节点的数量一致,使得每台机器的资源都可以利用上。但是随着索引数量的不断增加,集群整体的分片数量也越来越多,如果分片数量达到了数万,对集群的稳定性和性能都会有不小的影响。所以需要对集群整体的分片数量进行控制,避免分片数过多而导致集群不稳定,好在ES本身有shrink 索引的功能,可以降低索引的分片数,但是shrink操作有一些前置条件和使用限制,不是直接对索引的分片数调低,而是新建一个分片数量少的索引,硬链接到老的索引,然后对新索引执行recovery直至新索引变green。我们可以在新索引变green后,删除老的索引,然后对新索引建立别名,别名和老索引的名称完全一样,可以按照老索引的名称查询数据。

    本文尝试使用SCF云函数对存量的大量的老索引,通过shrink,降低索引的分片数量。

    实施步骤

    1. 创建SCF云函数

    如图,基于名为"ES写入函数"的模板,创建一个新的函数:


    image

    点击"下一步"进入函数编辑界面,直接复制如下函数代码粘贴到编辑框,修改ES的vip和用户名密码,以及索引前缀名称等信息:

    # -*- coding: utf8 -*-
    from datetime import datetime
    from elasticsearch import Elasticsearch
    import time
    import math
    
    ESServer = Elasticsearch(["xxx:9200"],http_auth=('elastic', 'xxx'))
    
    # 临时索引,用于记录当前正在执行哪一个老的索引
    tempIndex = "temp-snapshot"
    # 老的索引前缀
    indexPattern = "test-*"
    currentIndex = None
    
    # 根据磁盘使用量,获取用量最小的warm节点id
    def get_warm_node():
        rsp = ESServer.nodes.stats(metric="indices",index_metric="store")
        nodesInfo = rsp["nodes"]
        minStorageSizeNodeId = None
        minStorageSize = 0
        for node in nodesInfo:
            nodeInfo = nodesInfo[node]
            if nodeInfo["attributes"]["temperature"] != "warm":
                continue
            nodeStorageSize = nodeInfo["indices"]["store"]["size_in_bytes"]
            if minStorageSize == 0:
                minStorageSize = nodeStorageSize
                minStorageSizeNodeId = node
            if nodeStorageSize < minStorageSize:
                minStorageSize = nodeStorageSize
                minStorageSizeNodeId = node
        return minStorageSizeNodeId
    
    # 检查老索引的状态,判断是否有正在迁移中的分片
    def check_old_index_status(index):
        params = {}
        params["format"] = "json"
        rsp = ESServer.cat.recovery(index = index, params = params)
        for shardStats in rsp:
            if shardStats["stage"] != "done":
                return False
        return True
    
    # 检查新索引的状态,判断是否green
    def check_new_index_status(index):
        rsp = ESServer.cluster.health(index = index)
        if rsp != None and rsp["status"] == "green":
            return True
        return False
    
    # 根据索引数据量确定要shrink到几个分片
    def calTargeIndexShardsNum(index):
        params = {}
        params["format"] = "json"
        rsp = ESServer.cat.indices(index = index, params = params)
        indexInfo = rsp[0]
        storageSize = indexInfo["pri.store.size"]
        shardNum = indexInfo["pri"]
        if storageSize.endswith("gb"):
            size = float(storageSize[0:rfind("gb")])
            targetShardsNum =  int(math.ceil(size/50))
            while shardNum / targetShardsNum * targetShardsNum < shardNum:
                targetShardsNum = targetShardsNum + 1
            return targetShardsNum
        else:
            return 1
    
    # 执行shrink
    def shrink_index(index):
        body = {}
        body["settings"]={}
        body["settings"]["index.number_of_replicas"]=0
        targetShardsNum = calTargeIndexShardsNum(index)
        print "shrink index: " + index + ", target shards num:" + str(targetShardsNum)
        body["settings"]["index.number_of_shards"] = targetShardsNum
        body["settings"]["index.routing.allocation.require._id"] = None
        rsp = ESServer.indices.shrink(index=index, target="shrink-"+index, body=body)
        if rsp is not None and rsp["acknowledged"] == True:
            return True
        else:
            return False
    
    # 添加别名
    def add_alias(index):
        shrinkIndex = "shrink-"+index
        rsp = ESServer.indices.put_alias(index=shrinkIndex, name=index)
        if rsp is not None and rsp["acknowledged"] == True:
            return True
        else:
            return False
    
    # 删除索引
    def delete_index(index):
        rsp = ESServer.indices.delete(index=index, ignore=[400, 404])
        if rsp is not None and rsp["acknowledged"] == True:
            return True
        else:
            return False
    
    # 选择需要执行shrink的老索引
    def selectNeedToShrinkIndex():
        params = {}
        params["format"] = "json"
        rsp = ESServer.cat.indices(index = indexPattern, params = params)
        for indexInfo in rsp:
            indexName = indexInfo["index"]
            if indexName.startswith("shrink-"):
                continue
            if indexInfo["health"] == 'green' and indexInfo["status"] == 'open' and indexInfo["pri"] == "60":
                indexSettings = ESServer.indices.get_settings(index=indexName)
                allocationSettings = indexSettings[indexName]["settings"]["index"]["routing"]["allocation"]["require"]
                if allocationSettings["temperature"] == 'warm' and "_id" not in allocationSettings:
                    return indexName
        return None
    
    # 把老索引的分片都迁移至一个节点上
    def reallocatingOldIndex(index):
        warmNodeId = get_warm_node()
        if warmNodeId == None:
            print "warmNodeId is null"
            return
        print "warmNodeId: " + warmNodeId
        params = {}
        params["index.blocks.write"] = "true"
        params["index.routing.allocation.require._id"] = warmNodeId
        ESServer.indices.put_settings(index= index, body=params)
    
    # 记录当前正在执行的老索引,便于后续轮询
    def recordCurrentIndex(currentIndex):
        indexBody ={}
        indexBody["currentIndex"] = currentIndex
        headers= {}
        headers["Content-Type"] = "application/json"
        indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="2")
        print "index current index success!"
    
    # 记录当前正在执行中的新索引,便于后续轮询
    def recordShrinkIndex(shrinkIndex):
        indexBody ={}
        indexBody["shrinkIndex"] = shrinkIndex
        headers= {}
        headers["Content-Type"] = "application/json"
        indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="3")
        print "index current index success!"
    
    # 检查shrink操作
    def checkShrink(index):
        shrinkIndex = "shrink-"+index
        isShrinkIndexReady = check_new_index_status(shrinkIndex)
        if isShrinkIndexReady == True:
            deleteSuccess = delete_index(index)
            if deleteSuccess == True:
                success = add_alias(index)
                if success == True:
                    deleteDocument(tempIndex, "2")
                    deleteDocument(tempIndex, "3")
                    body = {}
                    body["indexName"] = index
                    addDocument(tempIndex, body)
                    forceMerge(index)
                    print "shrink index "+ index + "finished!"
                else:
                    print "add alias failed"
            else:
                print "delete old index: "+ index + "failed"
    
    # 删除临时索引中的记录
    def deleteDocument(index, docId):
        rsp = ESServer.delete(index=index, id=docId, doc_type="_doc")
        if rsp is not None:
            print "delete document: " + index + ", id: "+ docId + "success"
            return True
        return False
    
    # 在临时索引中记录所有的已完成shrink的索引名称
    def addDocument(index, body):
        rsp = ESServer.index(index=index, doc_type="_doc", body = body)
        if rsp is not None:
            print "record document: " + index + " success"
            return True
        return False
    
    # 对新索引执行merge
    def forceMerge(index):
        params = {}
        params["max_num_segments"] = 1
        rsp = ESServer.indices.forcemerge(index=index, params =params)
        if rsp is not None:
            print "forcemerge index: " + index + " success"
            return True
        return False
    
    # 执行shrink
    def execShrink(index):
        isOldIndexReady = check_old_index_status(index)
        if isOldIndexReady == True:
            print "old index: " + index + " is ready"
            success = shrink_index(index)
            if success == True:
                recordShrinkIndex("shrink-"+index)
            else:
                print "shrink failed"
        else:
            print "old index: " + index + " is reallocating"
    
    # 选择老索引
    def selectIndexToExecShrink():
        currentIndex = selectNeedToShrinkIndex()
        if currentIndex == None:
            print "No new index needs to be shrinken"
        else:
            print "current index: " + currentIndex
            recordCurrentIndex(currentIndex)
            reallocatingOldIndex(currentIndex)
        return currentIndex
    
    def check():
        existed = ESServer.indices.exists(tempIndex)
        if existed == True:
            getTempDoc = ESServer.get(tempIndex, doc_type="_doc", id="2", ignore=[404])
            if getTempDoc["found"] == False:
                currentIndex = selectIndexToExecShrink()
            else:
                currentIndex = getTempDoc["_source"]["currentIndex"]
                print "current index: " + currentIndex
            if currentIndex == None:
                return
    
            tempDoc = ESServer.get(tempIndex, doc_type="_doc", id="3", ignore=[404])
            if tempDoc["found"] == True:
                checkShrink(currentIndex)
            else:
                execShrink(currentIndex)
        else:
            selectIndexToExecShrink()
    
    
    def main_handler(event,context):
        check()
    
    

    该函数主要的逻辑有:

    1. 通过索引名称通配符找到老的索引,选择一个索引
    2. 选择一个固定的warm节点,把1中选出的索引的分片全部移动到这个warm节点上去(索引完整的一份数据都在一个节点上,才能执行shrink,因为要进行硬链接)
    3. 根据老索引的数据量确定目标分片数量,按一个分片50GB确定,向上取整,并且使得目标分片数量为老索引分片数量的因子
    4. 分片移动完毕后,执行shrink操作,新索引的名称为shrink-{老索引的名称},新索引的分片数量只能为老索引分片数量的因子,比如老索引的分片数为10, 则新索引的分片数只能为1、2或者5(为了保证数据不用rehash)
    5. 检查新索引的状态,等待状态变为green并且没有在初始化中的分片
    6. 删除老索引
    7. 对新索引添加别名,别名为老索引的名称
    8. 继续执行步骤1-7, 直至所有的老索引都执行完毕,此举是为了避免大量的分片移动、初始化的操作对索引的新建产生影响,在规模较大的集群中容易出现该问题
    image

    点击"完成"即可完成云函数的创建。

    2. 配置云函数

    创建完云函数后,需要进行配置才能使用,如下图,可以配置函数的私有网络VPC和Subnet(选择和ES相同的VPC和Subnet):


    image

    3. 测试云函数

    配置完云函数后,可以对函数代码进行测试,保证能够正常运行;如果需要进行编辑,可以直接编辑然后点击"保存并测试":


    image

    4. 配置触发器

    配置触发器,可以自定义选择执行周期触发函数:


    image

    相关文章

      网友评论

        本文标题:基于SCF实现Elasticsearch索引的批量shrink

        本文链接:https://www.haomeiwen.com/subject/ojterktx.html