在冷热分离的Elasticsearch集群架构中,往往会使用拥有较高规格cpu和内存、以及SSD盘的机器作为热节点,用于保证高并发的写入。通常索引都是按天创建的,只有当天的索引会进行写入,存量的索引会定期比如说在索引创建15天后迁移到冷节点或者warm节点上,冷节点或者warm节点的cpu和内存配置都可以低一些,并且使用SATA盘存储降低成本。
在热节点上的索引,为了保证写入性能,通常分片数会设置的和热节点的数量一致,使得每台机器的资源都可以利用上。但是随着索引数量的不断增加,集群整体的分片数量也越来越多,如果分片数量达到了数万,对集群的稳定性和性能都会有不小的影响。所以需要对集群整体的分片数量进行控制,避免分片数过多而导致集群不稳定,好在ES本身有shrink 索引的功能,可以降低索引的分片数,但是shrink操作有一些前置条件和使用限制,不是直接对索引的分片数调低,而是新建一个分片数量少的索引,硬链接到老的索引,然后对新索引执行recovery直至新索引变green。我们可以在新索引变green后,删除老的索引,然后对新索引建立别名,别名和老索引的名称完全一样,可以按照老索引的名称查询数据。
本文尝试使用SCF云函数对存量的大量的老索引,通过shrink,降低索引的分片数量。
实施步骤
1. 创建SCF云函数
如图,基于名为"ES写入函数"的模板,创建一个新的函数:
image
点击"下一步"进入函数编辑界面,直接复制如下函数代码粘贴到编辑框,修改ES的vip和用户名密码,以及索引前缀名称等信息:
# -*- coding: utf8 -*-
from datetime import datetime
from elasticsearch import Elasticsearch
import time
import math
ESServer = Elasticsearch(["xxx:9200"],http_auth=('elastic', 'xxx'))
# 临时索引,用于记录当前正在执行哪一个老的索引
tempIndex = "temp-snapshot"
# 老的索引前缀
indexPattern = "test-*"
currentIndex = None
# 根据磁盘使用量,获取用量最小的warm节点id
def get_warm_node():
rsp = ESServer.nodes.stats(metric="indices",index_metric="store")
nodesInfo = rsp["nodes"]
minStorageSizeNodeId = None
minStorageSize = 0
for node in nodesInfo:
nodeInfo = nodesInfo[node]
if nodeInfo["attributes"]["temperature"] != "warm":
continue
nodeStorageSize = nodeInfo["indices"]["store"]["size_in_bytes"]
if minStorageSize == 0:
minStorageSize = nodeStorageSize
minStorageSizeNodeId = node
if nodeStorageSize < minStorageSize:
minStorageSize = nodeStorageSize
minStorageSizeNodeId = node
return minStorageSizeNodeId
# 检查老索引的状态,判断是否有正在迁移中的分片
def check_old_index_status(index):
params = {}
params["format"] = "json"
rsp = ESServer.cat.recovery(index = index, params = params)
for shardStats in rsp:
if shardStats["stage"] != "done":
return False
return True
# 检查新索引的状态,判断是否green
def check_new_index_status(index):
rsp = ESServer.cluster.health(index = index)
if rsp != None and rsp["status"] == "green":
return True
return False
# 根据索引数据量确定要shrink到几个分片
def calTargeIndexShardsNum(index):
params = {}
params["format"] = "json"
rsp = ESServer.cat.indices(index = index, params = params)
indexInfo = rsp[0]
storageSize = indexInfo["pri.store.size"]
shardNum = indexInfo["pri"]
if storageSize.endswith("gb"):
size = float(storageSize[0:rfind("gb")])
targetShardsNum = int(math.ceil(size/50))
while shardNum / targetShardsNum * targetShardsNum < shardNum:
targetShardsNum = targetShardsNum + 1
return targetShardsNum
else:
return 1
# 执行shrink
def shrink_index(index):
body = {}
body["settings"]={}
body["settings"]["index.number_of_replicas"]=0
targetShardsNum = calTargeIndexShardsNum(index)
print "shrink index: " + index + ", target shards num:" + str(targetShardsNum)
body["settings"]["index.number_of_shards"] = targetShardsNum
body["settings"]["index.routing.allocation.require._id"] = None
rsp = ESServer.indices.shrink(index=index, target="shrink-"+index, body=body)
if rsp is not None and rsp["acknowledged"] == True:
return True
else:
return False
# 添加别名
def add_alias(index):
shrinkIndex = "shrink-"+index
rsp = ESServer.indices.put_alias(index=shrinkIndex, name=index)
if rsp is not None and rsp["acknowledged"] == True:
return True
else:
return False
# 删除索引
def delete_index(index):
rsp = ESServer.indices.delete(index=index, ignore=[400, 404])
if rsp is not None and rsp["acknowledged"] == True:
return True
else:
return False
# 选择需要执行shrink的老索引
def selectNeedToShrinkIndex():
params = {}
params["format"] = "json"
rsp = ESServer.cat.indices(index = indexPattern, params = params)
for indexInfo in rsp:
indexName = indexInfo["index"]
if indexName.startswith("shrink-"):
continue
if indexInfo["health"] == 'green' and indexInfo["status"] == 'open' and indexInfo["pri"] == "60":
indexSettings = ESServer.indices.get_settings(index=indexName)
allocationSettings = indexSettings[indexName]["settings"]["index"]["routing"]["allocation"]["require"]
if allocationSettings["temperature"] == 'warm' and "_id" not in allocationSettings:
return indexName
return None
# 把老索引的分片都迁移至一个节点上
def reallocatingOldIndex(index):
warmNodeId = get_warm_node()
if warmNodeId == None:
print "warmNodeId is null"
return
print "warmNodeId: " + warmNodeId
params = {}
params["index.blocks.write"] = "true"
params["index.routing.allocation.require._id"] = warmNodeId
ESServer.indices.put_settings(index= index, body=params)
# 记录当前正在执行的老索引,便于后续轮询
def recordCurrentIndex(currentIndex):
indexBody ={}
indexBody["currentIndex"] = currentIndex
headers= {}
headers["Content-Type"] = "application/json"
indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="2")
print "index current index success!"
# 记录当前正在执行中的新索引,便于后续轮询
def recordShrinkIndex(shrinkIndex):
indexBody ={}
indexBody["shrinkIndex"] = shrinkIndex
headers= {}
headers["Content-Type"] = "application/json"
indexResult = ESServer.index(index=tempIndex, doc_type="_doc", body=indexBody,id="3")
print "index current index success!"
# 检查shrink操作
def checkShrink(index):
shrinkIndex = "shrink-"+index
isShrinkIndexReady = check_new_index_status(shrinkIndex)
if isShrinkIndexReady == True:
deleteSuccess = delete_index(index)
if deleteSuccess == True:
success = add_alias(index)
if success == True:
deleteDocument(tempIndex, "2")
deleteDocument(tempIndex, "3")
body = {}
body["indexName"] = index
addDocument(tempIndex, body)
forceMerge(index)
print "shrink index "+ index + "finished!"
else:
print "add alias failed"
else:
print "delete old index: "+ index + "failed"
# 删除临时索引中的记录
def deleteDocument(index, docId):
rsp = ESServer.delete(index=index, id=docId, doc_type="_doc")
if rsp is not None:
print "delete document: " + index + ", id: "+ docId + "success"
return True
return False
# 在临时索引中记录所有的已完成shrink的索引名称
def addDocument(index, body):
rsp = ESServer.index(index=index, doc_type="_doc", body = body)
if rsp is not None:
print "record document: " + index + " success"
return True
return False
# 对新索引执行merge
def forceMerge(index):
params = {}
params["max_num_segments"] = 1
rsp = ESServer.indices.forcemerge(index=index, params =params)
if rsp is not None:
print "forcemerge index: " + index + " success"
return True
return False
# 执行shrink
def execShrink(index):
isOldIndexReady = check_old_index_status(index)
if isOldIndexReady == True:
print "old index: " + index + " is ready"
success = shrink_index(index)
if success == True:
recordShrinkIndex("shrink-"+index)
else:
print "shrink failed"
else:
print "old index: " + index + " is reallocating"
# 选择老索引
def selectIndexToExecShrink():
currentIndex = selectNeedToShrinkIndex()
if currentIndex == None:
print "No new index needs to be shrinken"
else:
print "current index: " + currentIndex
recordCurrentIndex(currentIndex)
reallocatingOldIndex(currentIndex)
return currentIndex
def check():
existed = ESServer.indices.exists(tempIndex)
if existed == True:
getTempDoc = ESServer.get(tempIndex, doc_type="_doc", id="2", ignore=[404])
if getTempDoc["found"] == False:
currentIndex = selectIndexToExecShrink()
else:
currentIndex = getTempDoc["_source"]["currentIndex"]
print "current index: " + currentIndex
if currentIndex == None:
return
tempDoc = ESServer.get(tempIndex, doc_type="_doc", id="3", ignore=[404])
if tempDoc["found"] == True:
checkShrink(currentIndex)
else:
execShrink(currentIndex)
else:
selectIndexToExecShrink()
def main_handler(event,context):
check()
该函数主要的逻辑有:
- 通过索引名称通配符找到老的索引,选择一个索引
- 选择一个固定的warm节点,把1中选出的索引的分片全部移动到这个warm节点上去(索引完整的一份数据都在一个节点上,才能执行shrink,因为要进行硬链接)
- 根据老索引的数据量确定目标分片数量,按一个分片50GB确定,向上取整,并且使得目标分片数量为老索引分片数量的因子
- 分片移动完毕后,执行shrink操作,新索引的名称为shrink-{老索引的名称},新索引的分片数量只能为老索引分片数量的因子,比如老索引的分片数为10, 则新索引的分片数只能为1、2或者5(为了保证数据不用rehash)
- 检查新索引的状态,等待状态变为green并且没有在初始化中的分片
- 删除老索引
- 对新索引添加别名,别名为老索引的名称
- 继续执行步骤1-7, 直至所有的老索引都执行完毕,此举是为了避免大量的分片移动、初始化的操作对索引的新建产生影响,在规模较大的集群中容易出现该问题
点击"完成"即可完成云函数的创建。
2. 配置云函数
创建完云函数后,需要进行配置才能使用,如下图,可以配置函数的私有网络VPC和Subnet(选择和ES相同的VPC和Subnet):
image
3. 测试云函数
配置完云函数后,可以对函数代码进行测试,保证能够正常运行;如果需要进行编辑,可以直接编辑然后点击"保存并测试":
image
4. 配置触发器
配置触发器,可以自定义选择执行周期触发函数:
image
网友评论