背景
算法同学经常要在我们的Alpha机器学习平台(以下简称Alpha)上启停实验,但由于机器学习任务耗费资源较多,且资源比较紧张,我们需要在用户启动实验之前根据用户所用资源类型,告知用户最大可启动的实例数。从而避免由于资源不足导致实验启动失败,而用户由不知道有多少资源可用的尴尬情形。如下图所示:
实时展示可用资源kube-dashboard nodes接口
在刚开始使用 K8s 时,我查遍了 K8s 的所有 API,都没能找到可以直接使用的接口。后来灵机一动,可以直接使用 kube-dashboard 的/api/v1/node
接口来获取到各个节点的可用资源状态,从而实现想要的效果,如下图所示:
于是开开心心地接入/api/v1/node
接口(以下简称node接口),测试效果达到预期,就高高兴兴地上线了。
接口超时
前期使用都很正常,但随着集群中节点数量逐渐增多,这种方案的问题逐步暴露出来。当节点数量超过150台
后,通过node接口实时计算并返回结果的耗时已经达到了用户无法忍受的程度,我先是通过后端异步更新数据并缓存
的方式临时解决了问题。
当节点数量超过250台
之后,我发现数据开始不太准确了,经过排查,发现后端异步调用node接口也超时了,导致缓存长时间不能更新。没办法,我就把全量查询换成了分页查询
,但由于分页查询更新时间比较长,导致数据准确性无法保证。
事情已经到了这步,再基于node接口,已经很难再有更好的优化方案。于是,我决定抛弃node接口,自己来设计一套方案。
翻译 kube-dashboard 源码
既然要自己实现,关键就是要知道怎么算出每个node上可用资源量,node接口不就是现成的参考吗?于是,我从github上clone了 kube-dashboard 的源码,在 dashboard/src/app/backend/resource/node/detail.go
中找到了代码实现,然后一行一行翻译成 Java 代码。
前方高能预警,一大波儿 Java 代码来袭~
KubeNode
为了保存各项指标数据,我定义了 KubeNode 类,继承自 io.fabric8.kubernetes.api.model.Node
类:
@Data
public class KubeNode extends Node {
// 节点是否 ready
private boolean ready;
// 存放cpuRequests、cpuLimits、memoryRequests、memoryLimits等指标
private final Map<String, Double> allocatedResources = Maps.newHashMap();
private List<Pod> podList;
// 节点是否不可调度
private boolean unschedulable;
}
计算过程
计算过程的代码比较长,我就不再详细讲解,这里简单讲解一下原理:
- 获取到Node关联的所有 Pod 信息
- 将这些 Pod 所申请的资源按 request 和 limits 分别对memory、cpu等相加
- 从
node.getStatus().getCapacity()
获取到该 Node 各项可分配资源总量,减去上一步计算得到的Pod已申请资源量,就能得到该 Node 当前可分配资源量
/**
* 将 node 和 podList 转换为 KubeNode 对象
* @param node
* @param podList
* @return
*/
private KubeNode toNodeDetail(Node node, List<Pod> podList) {
final KubeNode nodeDetail = new KubeNode();
nodeDetail.setPodList(podList);
// node 是否处于 Ready 状态
nodeDetail.setReady(StringUtils.isBlank(node.getStatus().getPhase()));
// node 是否不可调度
nodeDetail.setUnschedulable(node.getSpec().getTaints().size() > 0);
nodeDetail.getAdditionalProperties().putAll(node.getAdditionalProperties());
nodeDetail.setApiVersion(node.getApiVersion());
nodeDetail.setKind(node.getKind());
nodeDetail.setMetadata(node.getMetadata());
nodeDetail.setSpec(node.getSpec());
nodeDetail.setStatus(node.getStatus());
// 计算cpuRequests、cpuLimits、memoryRequests、memoryLimits等指标
nodeDetail.getAllocatedResources().putAll(getNodeAllocatedResources(node, podList));
return nodeDetail;
}
private ReqsAndLimits podRequestsAndLimits(Pod pod) {
final Map<String, Double> reqs = Maps.newHashMap();
final Map<String, Double> limits = Maps.newHashMap();
final List<Container> containers = pod.getSpec().getContainers();
for(Container container : containers) {
addResourceList(reqs, container.getResources().getRequests());
addResourceList(limits, container.getResources().getLimits());
}
final List<Container> initContainers = pod.getSpec().getInitContainers();
// init containers define the minimum of any resource
for(Container container : initContainers) {
maxResourceList(reqs, container.getResources().getRequests());
maxResourceList(limits, container.getResources().getLimits());
}
// Add overhead for running a pod to the sum of requests and to non-zero limits:
final Map<String, Quantity> overhead = pod.getSpec().getOverhead();
if(overhead != null) {
addResourceList(reqs, overhead);
addResourceList(limits, overhead);
}
return new ReqsAndLimits(reqs, limits);
}
// addResourceList adds the resources in newList to list
void addResourceList(Map<String, Double> multimap, Map<String, Quantity> quantityMap) {
if(quantityMap == null) {
return;
}
final Iterator<Map.Entry<String, Quantity>> iterator = quantityMap.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<String, Quantity> entry = iterator.next();
final String name = entry.getKey();
double request;
if(name.equals(MEMORY_LOWER_CASE)) {
request = ByteConverter.convertToMB(entry.getValue());
} else if(name.equals(CPU_LOWER_CASE)) {
request = ByteConverter.convertToM(entry.getValue());
} else {
continue;
}
final Double value = multimap.get(name);
if(value == null) {
multimap.put(name, request);
} else {
multimap.put(name, request + value);
}
}
}
// maxResourceList sets list to the greater of list/newList for every resource
// either list
void maxResourceList(Map<String, Double> multimap, Map<String, Quantity> quantityMap) {
if(quantityMap == null) {
return;
}
final Iterator<Map.Entry<String, Quantity>> iterator = quantityMap.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<String, Quantity> entry = iterator.next();
final String name = entry.getKey();
double request;
if (name.equals(MEMORY_LOWER_CASE)) {
request = ByteConverter.convertToMB(entry.getValue());
} else if (name.equals(CPU_LOWER_CASE)) {
request = ByteConverter.convertToM(entry.getValue());
} else {
continue;
}
final Double value = multimap.get(name);
if (value == null) {
multimap.put(name, request);
} else if (value < request) {
multimap.put(name, request);
}
}
}
private void mergeMaps(Map<String, Double> mergedMap, Map<String, Double> targetMap) {
Iterator<Map.Entry<String, Double>> iterator = targetMap.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<String, Double> entry = iterator.next();
final String name = entry.getKey();
final Double value = mergedMap.get(name);
if(value == null) {
mergedMap.put(name, entry.getValue());
} else {
mergedMap.put(name, value + entry.getValue());
}
}
}
private Map<String, Double> getNodeAllocatedResources(Node node, List<Pod> podList) {
final Map<String, Double> reqs = Maps.newHashMap();
final Map<String, Double> limits = Maps.newHashMap();
for (Pod pod : podList) {
final ReqsAndLimits reqsAndLimits = podRequestsAndLimits(pod);
final Map<String, Double> podLimits = reqsAndLimits.getLimits();
final Map<String, Double> podReqs = reqsAndLimits.getReqs();
mergeMaps(reqs, podReqs);
mergeMaps(limits, podLimits);
}
Double cpuRequests = reqs.get(CPU_LOWER_CASE);
if(cpuRequests == null) {
cpuRequests = 0d;
}
Double cpuLimits = limits.get(CPU_LOWER_CASE);
if(cpuLimits == null) {
cpuLimits = 0d;
}
Double memoryRequests = reqs.get(MEMORY_LOWER_CASE);
if(memoryRequests == null) {
memoryRequests = 0d;
}
Double memoryLimits = limits.get(MEMORY_LOWER_CASE);
if(memoryLimits == null) {
memoryLimits = 0d;
}
double cpuRequestsFraction = 0, cpuLimitsFraction = 0;
final double cpuCapacity = ByteConverter.convertToM(node.getStatus().getCapacity().get(CPU_LOWER_CASE));
if(cpuCapacity > 0) {
cpuRequestsFraction = cpuRequests / cpuCapacity * 100;
cpuLimitsFraction = cpuLimits / cpuCapacity * 100;
}
double memoryRequestsFraction = 0, memoryLimitsFraction = 0;
final double memoryCapacity = ByteConverter.convertToMB(node.getStatus().getCapacity().get(MEMORY_LOWER_CASE));
if(memoryCapacity > 0) {
memoryRequestsFraction = memoryRequests / memoryCapacity * 100;
memoryLimitsFraction = memoryLimits / memoryCapacity * 100;
}
final Map<String, Double> allocatedResources = Maps.newHashMap();
allocatedResources.put(CPU_REQUESTS, cpuRequests);
allocatedResources.put(CPU_LIMITS, cpuLimits);
allocatedResources.put(CPU_CAPACITY, cpuCapacity);
allocatedResources.put(MEMORY_REQUESTS, memoryRequests * 1024 * 1024);
allocatedResources.put(MEMORY_LIMITS, memoryLimits * 1024 * 1024);
allocatedResources.put(MEMORY_CAPACITY, memoryCapacity * 1024 * 1024);
allocatedResources.put(CPU_REQUESTS_FRACTION, cpuRequestsFraction);
allocatedResources.put(CPU_LIMITS_FRACTION, cpuLimitsFraction);
allocatedResources.put(MEMORY_REQUESTS_FRACTION, memoryRequestsFraction);
allocatedResources.put(MEMORY_LIMITS_FRACTION, memoryLimitsFraction);
return allocatedResources;
}
常量定义
public static final String CPU_LOWER_CASE = "cpu";
public static final String MEMORY_LOWER_CASE = "memory";
public static final String CPU_REQUESTS = "cpuRequests";
public static final String CPU_LIMITS = "cpuLimits";
public static final String MEMORY_CAPACITY = "memoryCapacity";
public static final String CPU_CAPACITY = "cpuCapacity";
public static final String MEMORY_LIMITS = "memoryLimits";
public static final String CPU_AVG_FRACTION = "cpuAvgFraction";
public static final String MEMORY_REQUESTS = "memoryRequests";
public static final String MEMORY_AVG_FRACTION = "memoryAvgFraction";
public static final String MEMORY_LIMITS_FRACTION = "memoryLimitsFraction";
public static final String MEMORY_REQUESTS_FRACTION = "memoryRequestsFraction";
public static final String CPU_LIMITS_FRACTION = "cpuLimitsFraction";
public static final String CPU_REQUESTS_FRACTION = "cpuRequestsFraction";
Redis缓存 + List-Watch
通过翻译 kube-dashboard 源码实现通过 Fabric8
调用 K8s nodes 和 pods接口实时计算集群可用资源,我成功将接口耗时降低到了47s,至少不会导致前端和后端超时了。但47s对用户来说还是不能接受的,需要想办法优化。
说到性能优化,第一反应就应该是缓存。
至于选用哪种缓存方案,就看自己手边哪个用着趁手了,我这里选用的是 Redis。
通过 nodes 和 pods 接口,我们可以定期获取所需的信息,并更新到缓存中。但是,当集群规模很庞大时,这两个接口的耗时还是很可观的,不一定能保证数据的实时性。
好在 K8s 有 List-Watch 机制
(如果对 List-Watch 机制不太了解,可以阅读 理解 K8S 的设计精髓之 List-Watch机制和Informer模块),可以让我们及时监听到 Node 和 Pod 的状态变化,从而对缓存进行增量更新。
前方高能预警,一大波儿 Java 代码来袭~
K8sCacheUtils
@Data
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
@RequiredArgsConstructor
public class K8sCacheUtils extends Thread {
@NonNull
// Fabric8 k8s 客户端
private KubernetesClient kubernetesClient;
@NonNull
// 用于操作 Redis
private RedisUtils redisUtils;
@NonNull
// 支持多个 K8s 集群,自定义枚举值
private K8sCluster cluster;
private String POD_CACHE_PREFIX = "K8sPodCache_test_";
private String NODE_CACHE_PREFIX = "K8sNodeCache_test_";
// 记录缓存已完成更新的 K8s node的hostname,node 缓存信息更新后再处理与其关联 list-watch 事件
// 从而避免 node 信息不一致
private final Set<String> cachedNodes = Sets.newConcurrentHashSet();
// 缓存更新完成标志,缓存全部更新后,再开始处理 node 相关 list-watch 事件
private volatile boolean cacheInitFinished = false;
@Override
public void run() {
log.info("Start init cache of cluster : {}", cluster);
// 监听 Pods 和 Nodes 变化
kubernetesClient.nodes().watch(new NodeWatcher(this));
kubernetesClient.pods().watch(new PodWatcher(this));
final List<Node> nodes = kubernetesClient.nodes().list().getItems();
final Map<Object, Object> nodeMap = Maps.newHashMap();
for(Node node : nodes) {
final String hostname = node.getMetadata().getName();
nodeMap.put(hostname, node);
// 更新前先清空
this.removePods(hostname);
final PodList nodePods = getNodePods(kubernetesClient, node);
addPods(hostname, nodePods);
cachedNodes.add(hostname);
}
// 先清空缓存数据
removeNodes();
// 更新节点列表,防止有节点不能被正常删除
final String redisKey = NODE_CACHE_PREFIX + cluster;
redisUtils.hmputAll(redisKey, nodeMap);
log.info("Finished init cache of cluster : {}", cluster);
cacheInitFinished = true;
}
/**
* 添加 node 信息到缓存中
* @param node
* @return
*/
public boolean addNode(Node node) {
if(! cacheInitFinished) {
return false;
}
final String hostname = node.getMetadata().getName();
if(StringUtils.isBlank(hostname)) {
return false;
}
final String redisKey = NODE_CACHE_PREFIX + cluster;
log.info("Insert node, redisKey={}, hostname={}", redisKey, hostname);
return redisUtils.hmput(redisKey, hostname, node);
}
/**
* 从缓存中移除 node 信息
* @param node
* @return
*/
public boolean removeNode(Node node) {
if(! cacheInitFinished) {
return false;
}
final String hostname = node.getMetadata().getName();
final String redisKey = NODE_CACHE_PREFIX + cluster;
log.info("Remove node, redisKey={}, hostname={}", redisKey, hostname);
redisUtils.hmdelete(redisKey, hostname);
return true;
}
/**
* 清空所有 nodes 缓存信息
* @return
*/
private boolean removeNodes() {
final String redisKey = NODE_CACHE_PREFIX + cluster;
redisUtils.del(redisKey);
return true;
}
/**
* 从缓存中获取 hostname 对应的 node 信息
* @param hostname
* @return
*/
public Node node(String hostname) {
final String redisKey = NODE_CACHE_PREFIX + cluster;
final Object value = redisUtils.hmget(redisKey, hostname);
if(value == null) {
return null;
}
return FastJsonUtils.toBean(value.toString(), Node.class);
}
/**
* 从缓存中获取所有 nodes 信息
* @return
*/
public List<Node> nodes() {
final List<Node> nodes = Lists.newArrayList();
final String redisKey = NODE_CACHE_PREFIX + cluster;
final Map<Object, Object> nodeMap = redisUtils.entries(redisKey);
final Iterator<Object> iterator = nodeMap.values().iterator();
while (iterator.hasNext()) {
nodes.add(FastJsonUtils.toBean(iterator.next().toString(), Node.class));
}
return nodes;
}
/**
* 从 K8s 获取 node 关联的 pods 信息
* @param client
* @param node
* @return
*/
private PodList getNodePods(KubernetesClient client, Node node) {
final String fieldSelector = "spec.nodeName=" + node.getMetadata().getName() +
",status.phase!=PodSucceeded,status.phase!=PodFailed" ;
final ListOptions listOptions = new ListOptions();
listOptions.setFieldSelector(fieldSelector);
return client.pods().list(listOptions);
}
/**
* 添加 hostname 关联的 pods 信息到缓存中
* @param hostname
* @param podList
* @return
*/
private boolean addPods(String hostname, PodList podList) {
final Map<Object, Object> podMap = Maps.newHashMap();
for(Pod pod : podList.getItems()) {
// 如果 pod 已停止,则不加入缓存
final String phase = pod.getStatus().getPhase();
if("Succeeded".equals(phase) || "Failed".equals(phase)) {
continue;
}
final String uid = pod.getMetadata().getUid();
podMap.put(uid, pod);
}
final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
log.info("Insert pods, redisKey={}, size={}", redisKey, podMap.size());
return redisUtils.hmputAll(redisKey, podMap);
}
/**
* 添加 pod 信息到缓存中
* @param pod
* @return
*/
public boolean addPod(Pod pod) {
// 如果 pod 已停止,则不加入缓存
final String phase = pod.getStatus().getPhase();
if("Succeeded".equals(phase) || "Failed".equals(phase)) {
removePod(pod);
return false;
}
final String hostname = pod.getSpec().getNodeName();
if(StringUtils.isBlank(hostname) || ! cachedNodes.contains(hostname)) {
return false;
}
final String uid = pod.getMetadata().getUid();
final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
log.info("Insert pod, redisKey={}, uid={}", redisKey, uid);
return redisUtils.hmput(redisKey, uid, pod);
}
/**
* 从缓存中移除 hostname 关联的 pods 信息
* @param hostname
* @return
*/
public boolean removePods(String hostname) {
log.info("Remove pods of hostname {}", hostname);
final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
redisUtils.del(redisKey);
return true;
}
/**
* 从缓存中移除 pod 信息
* @param pod
* @return
*/
public boolean removePod(Pod pod) {
final String hostname = pod.getSpec().getNodeName();
if(StringUtils.isBlank(hostname) || ! cachedNodes.contains(hostname)) {
return false;
}
final String uid = pod.getMetadata().getUid();
final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
log.info("Remove pod, redisKey={}, uid={}", redisKey, uid);
redisUtils.hmdelete(redisKey, uid);
return true;
}
/**
* 从缓存中获取 hostname 关联的 pods 信息
* @param hostname
* @return
*/
public List<Pod> pods(String hostname) {
final List<Pod> pods = Lists.newArrayList();
final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
final Map<Object, Object> podMap = redisUtils.entries(redisKey);
final Iterator<Object> iterator = podMap.values().iterator();
while (iterator.hasNext()) {
pods.add(FastJsonUtils.toBean(iterator.next().toString(), Pod.class));
}
return pods;
}
}
NodeWatcher
@Data
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class NodeWatcher implements Watcher<Node> {
@NonNull
private K8sCacheUtils k8sCacheUtils;
@Override
public void eventReceived(Action action, Node resource) {
switch (action) {
case ADDED:
k8sCacheUtils.addNode(resource);
break;
case ERROR:
k8sCacheUtils.removeNode(resource);
break;
case DELETED:
k8sCacheUtils.removeNode(resource);
break;
case MODIFIED:
k8sCacheUtils.addNode(resource);
break;
default:
}
}
@Override
public void onClose(KubernetesClientException cause) {
}
}
PodWatcher
@Data
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class PodWatcher implements Watcher<Pod> {
@NonNull
private K8sCacheUtils k8sCacheUtils;
@Override
public void eventReceived(Action action, Pod resource) {
log.info("eventReceived action={}, podName={}", action, resource.getMetadata().getName());
switch (action) {
case ADDED:
k8sCacheUtils.addPod(resource);
break;
case ERROR:
k8sCacheUtils.removePod(resource);
break;
case DELETED:
k8sCacheUtils.removePod(resource);
break;
case MODIFIED:
k8sCacheUtils.addPod(resource);
break;
default:
}
}
@Override
public void onClose(KubernetesClientException cause) {
}
}
缓存初始化
在系统启动时,为了避免缓存初始化导致启动时间过长,我启动了一个新线程来完成缓存的初始化。
public void initCache(K8sCluster cluster, KubernetesClient kubernetesClient, RedisUtils redisUtils, boolean cacheAndWatch) {
final K8sCacheUtils k8sCacheUtils = new K8sCacheUtils(kubernetesClient, redisUtils, cluster);
// 通过变量控制是否在系统启动时初始化 K8s 缓存
if(cacheAndWatch) {
k8sCacheUtils.start();
}
}
细心的朋友可能注意到了,在缓存初始化的过程中
,我们并没有去处理 List-Watch 事件,如果在这个过程中 Node 或 Pod 状态变更会不会导致数据不一致呢?答案是:会。我们主要是基于两点考虑:
- 缓存初始化的时间比较短(大约1分钟),而集群中Node变更频率非常低,在这期间我们几乎不可能对K8s增减节点
- 每处理一个 Node 时,都会实时获取其关联的 Pod 信息,缓存 Pod 信息后立刻启用 List-Watch 事件处理,这个时间非常短,几乎不可能漏掉事件
当然,如果一定要把方案做的滴水不漏,可以在缓存初始化期间把接收到的 List-Watch 事件暂且缓存起来,初始化完成后再处理一遍即可。
本地缓存
通过 Redis缓存 + List-Watch 机制,我们即实现了数据的快速获取,也保证了数据的一致性,这样下来,就把接口耗时缩短到了18s
。但,谁不喜欢那种快如闪电的畅爽呢?
于是,我通过定时从 Redis 中读取缓存数据,并聚合后缓存到本地,使接口耗时缩短到了1s内
,那种快如闪电的感觉,feel倍儿爽~。
虽然定时更新会带来一定的数据延迟(20s以上),但对我们来说完全可以接收。至此,我的改造方案就完成了!
进一步优化
我刚才也介绍过了,上面的方案会带来20s以上的数据更新延迟,如果有读者在实际生产过程中不能接受这个延迟的话,可以修改缓存初始化和 List-Watch 的处理过程。
可以在初始化时直接从nodes和pods接口把数据聚合好,然后接受到 List-Watch 事件后实时更新聚合后的数据,如此可以使用真正的实时更新。
总结
我的方案大致分成了三步来实现:
第一步:根据nodes和pods信息计算得到各个node可以资源数量。
第二步:通过Redis缓存+List-Watch实现对nodes和pods信息的缓存和实时增量更新,从而加快数据获取速度。
第三步:通过定时任务定期聚合,计算得到各个node可以资源数量,并保存到本地内存中,从而在前端请求到来时快速返回。
希望能给被相同问题困扰的朋友带来一些帮助。
网友评论