美文网首页机器学习平台Kubernetes
《实时展示K8s可用资源》方案实现

《实时展示K8s可用资源》方案实现

作者: 王勇1024 | 来源:发表于2020-12-08 17:51 被阅读0次

背景

算法同学经常要在我们的Alpha机器学习平台(以下简称Alpha)上启停实验,但由于机器学习任务耗费资源较多,且资源比较紧张,我们需要在用户启动实验之前根据用户所用资源类型,告知用户最大可启动的实例数。从而避免由于资源不足导致实验启动失败,而用户由不知道有多少资源可用的尴尬情形。如下图所示:

实时展示可用资源

kube-dashboard nodes接口

在刚开始使用 K8s 时,我查遍了 K8s 的所有 API,都没能找到可以直接使用的接口。后来灵机一动,可以直接使用 kube-dashboard 的/api/v1/node 接口来获取到各个节点的可用资源状态,从而实现想要的效果,如下图所示:

kube-dashboard节点资源接口

于是开开心心地接入/api/v1/node接口(以下简称node接口),测试效果达到预期,就高高兴兴地上线了。

接口超时

前期使用都很正常,但随着集群中节点数量逐渐增多,这种方案的问题逐步暴露出来。当节点数量超过150台后,通过node接口实时计算并返回结果的耗时已经达到了用户无法忍受的程度,我先是通过后端异步更新数据并缓存的方式临时解决了问题。
当节点数量超过250台之后,我发现数据开始不太准确了,经过排查,发现后端异步调用node接口也超时了,导致缓存长时间不能更新。没办法,我就把全量查询换成了分页查询,但由于分页查询更新时间比较长,导致数据准确性无法保证。
事情已经到了这步,再基于node接口,已经很难再有更好的优化方案。于是,我决定抛弃node接口,自己来设计一套方案。

翻译 kube-dashboard 源码

既然要自己实现,关键就是要知道怎么算出每个node上可用资源量,node接口不就是现成的参考吗?于是,我从github上clone了 kube-dashboard 的源码,在 dashboard/src/app/backend/resource/node/detail.go中找到了代码实现,然后一行一行翻译成 Java 代码。

前方高能预警,一大波儿 Java 代码来袭~

KubeNode
为了保存各项指标数据,我定义了 KubeNode 类,继承自 io.fabric8.kubernetes.api.model.Node 类:

@Data
public class KubeNode extends Node {
    // 节点是否 ready
    private boolean ready;
    // 存放cpuRequests、cpuLimits、memoryRequests、memoryLimits等指标
    private final Map<String, Double> allocatedResources = Maps.newHashMap();
    private List<Pod> podList;
    // 节点是否不可调度
    private boolean unschedulable;
}

计算过程
计算过程的代码比较长,我就不再详细讲解,这里简单讲解一下原理:

  1. 获取到Node关联的所有 Pod 信息
  2. 将这些 Pod 所申请的资源按 request 和 limits 分别对memory、cpu等相加
  3. node.getStatus().getCapacity()获取到该 Node 各项可分配资源总量,减去上一步计算得到的Pod已申请资源量,就能得到该 Node 当前可分配资源量
    /**
     * 将 node 和 podList 转换为 KubeNode 对象
     * @param node
     * @param podList
     * @return
     */
    private KubeNode toNodeDetail(Node node, List<Pod> podList) {
        final KubeNode nodeDetail = new KubeNode();
        nodeDetail.setPodList(podList);
        // node 是否处于 Ready 状态
        nodeDetail.setReady(StringUtils.isBlank(node.getStatus().getPhase()));
        // node 是否不可调度
        nodeDetail.setUnschedulable(node.getSpec().getTaints().size() > 0);
        nodeDetail.getAdditionalProperties().putAll(node.getAdditionalProperties());
        nodeDetail.setApiVersion(node.getApiVersion());
        nodeDetail.setKind(node.getKind());
        nodeDetail.setMetadata(node.getMetadata());
        nodeDetail.setSpec(node.getSpec());
        nodeDetail.setStatus(node.getStatus());
        // 计算cpuRequests、cpuLimits、memoryRequests、memoryLimits等指标
        nodeDetail.getAllocatedResources().putAll(getNodeAllocatedResources(node, podList));
        return nodeDetail;
    }

    private ReqsAndLimits podRequestsAndLimits(Pod pod) {
        final Map<String, Double> reqs = Maps.newHashMap();
        final Map<String, Double> limits = Maps.newHashMap();
        final List<Container> containers = pod.getSpec().getContainers();
        for(Container container : containers) {
            addResourceList(reqs, container.getResources().getRequests());
            addResourceList(limits, container.getResources().getLimits());
        }
        final List<Container> initContainers = pod.getSpec().getInitContainers();
        // init containers define the minimum of any resource
        for(Container container : initContainers) {
            maxResourceList(reqs, container.getResources().getRequests());
            maxResourceList(limits, container.getResources().getLimits());
        }

        // Add overhead for running a pod to the sum of requests and to non-zero limits:
        final Map<String, Quantity> overhead = pod.getSpec().getOverhead();
        if(overhead != null) {
            addResourceList(reqs, overhead);
            addResourceList(limits, overhead);

        }
        return new ReqsAndLimits(reqs, limits);
    }

    // addResourceList adds the resources in newList to list
    void addResourceList(Map<String, Double> multimap, Map<String, Quantity> quantityMap) {
        if(quantityMap == null) {
            return;
        }
        final Iterator<Map.Entry<String, Quantity>> iterator = quantityMap.entrySet().iterator();
        while (iterator.hasNext()) {
            final Map.Entry<String, Quantity> entry = iterator.next();
            final String name = entry.getKey();

            double request;
            if(name.equals(MEMORY_LOWER_CASE)) {
                request = ByteConverter.convertToMB(entry.getValue());
            } else if(name.equals(CPU_LOWER_CASE)) {
                request = ByteConverter.convertToM(entry.getValue());
            } else {
                continue;
            }
            final Double value = multimap.get(name);
            if(value == null) {
                multimap.put(name, request);
            } else {
                multimap.put(name, request + value);
            }
        }
    }

    // maxResourceList sets list to the greater of list/newList for every resource
// either list
    void maxResourceList(Map<String, Double> multimap, Map<String, Quantity> quantityMap) {
        if(quantityMap == null) {
            return;
        }
        final Iterator<Map.Entry<String, Quantity>> iterator = quantityMap.entrySet().iterator();
        while (iterator.hasNext()) {
            final Map.Entry<String, Quantity> entry = iterator.next();
            final String name = entry.getKey();

            double request;
            if (name.equals(MEMORY_LOWER_CASE)) {
                request = ByteConverter.convertToMB(entry.getValue());
            } else if (name.equals(CPU_LOWER_CASE)) {
                request = ByteConverter.convertToM(entry.getValue());
            } else {
                continue;
            }
            final Double value = multimap.get(name);
            if (value == null) {
                multimap.put(name, request);
            } else if (value < request) {
                multimap.put(name, request);
            }
        }
    }

    private void mergeMaps(Map<String, Double> mergedMap, Map<String, Double> targetMap) {

        Iterator<Map.Entry<String, Double>> iterator = targetMap.entrySet().iterator();
        while (iterator.hasNext()) {
            final Map.Entry<String, Double> entry = iterator.next();
            final String name = entry.getKey();
            final Double value = mergedMap.get(name);
            if(value == null) {
                mergedMap.put(name, entry.getValue());
            } else {
                mergedMap.put(name, value + entry.getValue());
            }
        }
    }

    private Map<String, Double> getNodeAllocatedResources(Node node, List<Pod> podList) {
        final Map<String, Double> reqs = Maps.newHashMap();
        final Map<String, Double> limits = Maps.newHashMap();

        for (Pod pod : podList) {
            final ReqsAndLimits reqsAndLimits = podRequestsAndLimits(pod);
            final Map<String, Double> podLimits = reqsAndLimits.getLimits();
            final Map<String, Double> podReqs = reqsAndLimits.getReqs();

            mergeMaps(reqs, podReqs);
            mergeMaps(limits, podLimits);
        }

        Double cpuRequests = reqs.get(CPU_LOWER_CASE);
        if(cpuRequests == null) {
            cpuRequests = 0d;
        }
        Double cpuLimits = limits.get(CPU_LOWER_CASE);
        if(cpuLimits == null) {
            cpuLimits = 0d;
        }
        Double memoryRequests = reqs.get(MEMORY_LOWER_CASE);
        if(memoryRequests == null) {
            memoryRequests = 0d;
        }
        Double memoryLimits = limits.get(MEMORY_LOWER_CASE);
        if(memoryLimits == null) {
            memoryLimits = 0d;
        }

        double cpuRequestsFraction = 0, cpuLimitsFraction  = 0;
        final double cpuCapacity = ByteConverter.convertToM(node.getStatus().getCapacity().get(CPU_LOWER_CASE));
        if(cpuCapacity > 0) {
            cpuRequestsFraction = cpuRequests / cpuCapacity * 100;
            cpuLimitsFraction = cpuLimits / cpuCapacity * 100;
        }

        double memoryRequestsFraction = 0, memoryLimitsFraction = 0;
        final double memoryCapacity = ByteConverter.convertToMB(node.getStatus().getCapacity().get(MEMORY_LOWER_CASE));
        if(memoryCapacity > 0) {
            memoryRequestsFraction = memoryRequests / memoryCapacity * 100;
            memoryLimitsFraction = memoryLimits / memoryCapacity * 100;
        }
        final Map<String, Double> allocatedResources = Maps.newHashMap();
        allocatedResources.put(CPU_REQUESTS, cpuRequests);
        allocatedResources.put(CPU_LIMITS, cpuLimits);
        allocatedResources.put(CPU_CAPACITY, cpuCapacity);
        allocatedResources.put(MEMORY_REQUESTS, memoryRequests * 1024 * 1024);
        allocatedResources.put(MEMORY_LIMITS, memoryLimits * 1024 * 1024);
        allocatedResources.put(MEMORY_CAPACITY, memoryCapacity * 1024 * 1024);
        allocatedResources.put(CPU_REQUESTS_FRACTION, cpuRequestsFraction);
        allocatedResources.put(CPU_LIMITS_FRACTION, cpuLimitsFraction);
        allocatedResources.put(MEMORY_REQUESTS_FRACTION, memoryRequestsFraction);
        allocatedResources.put(MEMORY_LIMITS_FRACTION, memoryLimitsFraction);
        return allocatedResources;
    }

常量定义

public static final String CPU_LOWER_CASE = "cpu";
public static final String MEMORY_LOWER_CASE = "memory";
public static final String CPU_REQUESTS = "cpuRequests";
public static final String CPU_LIMITS = "cpuLimits";
public static final String MEMORY_CAPACITY = "memoryCapacity";
public static final String CPU_CAPACITY = "cpuCapacity";
public static final String MEMORY_LIMITS = "memoryLimits";
public static final String CPU_AVG_FRACTION = "cpuAvgFraction";
public static final String MEMORY_REQUESTS = "memoryRequests";
public static final String MEMORY_AVG_FRACTION = "memoryAvgFraction";
public static final String MEMORY_LIMITS_FRACTION = "memoryLimitsFraction";
public static final String MEMORY_REQUESTS_FRACTION = "memoryRequestsFraction";
public static final String CPU_LIMITS_FRACTION = "cpuLimitsFraction";
public static final String CPU_REQUESTS_FRACTION = "cpuRequestsFraction";

Redis缓存 + List-Watch

通过翻译 kube-dashboard 源码实现通过 Fabric8 调用 K8s nodes 和 pods接口实时计算集群可用资源,我成功将接口耗时降低到了47s,至少不会导致前端和后端超时了。但47s对用户来说还是不能接受的,需要想办法优化。

说到性能优化,第一反应就应该是缓存。至于选用哪种缓存方案,就看自己手边哪个用着趁手了,我这里选用的是 Redis。
通过 nodes 和 pods 接口,我们可以定期获取所需的信息,并更新到缓存中。但是,当集群规模很庞大时,这两个接口的耗时还是很可观的,不一定能保证数据的实时性。
好在 K8s 有 List-Watch 机制(如果对 List-Watch 机制不太了解,可以阅读 理解 K8S 的设计精髓之 List-Watch机制和Informer模块),可以让我们及时监听到 Node 和 Pod 的状态变化,从而对缓存进行增量更新。

Redis缓存 + List-Watch

前方高能预警,一大波儿 Java 代码来袭~

K8sCacheUtils

@Data
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
@RequiredArgsConstructor
public class K8sCacheUtils extends Thread {
    @NonNull
    // Fabric8 k8s 客户端
    private KubernetesClient kubernetesClient;
    @NonNull
    // 用于操作 Redis
    private RedisUtils redisUtils;
    @NonNull
    // 支持多个 K8s 集群,自定义枚举值
    private K8sCluster cluster;
    private String POD_CACHE_PREFIX = "K8sPodCache_test_";
    private String NODE_CACHE_PREFIX = "K8sNodeCache_test_";
    // 记录缓存已完成更新的 K8s node的hostname,node 缓存信息更新后再处理与其关联 list-watch 事件
    // 从而避免 node 信息不一致
    private final Set<String> cachedNodes = Sets.newConcurrentHashSet();
    // 缓存更新完成标志,缓存全部更新后,再开始处理 node 相关 list-watch 事件
    private volatile boolean cacheInitFinished = false;

    @Override
    public void run() {

        log.info("Start init cache of cluster : {}", cluster);
        // 监听 Pods 和 Nodes 变化
        kubernetesClient.nodes().watch(new NodeWatcher(this));
        kubernetesClient.pods().watch(new PodWatcher(this));

        final List<Node> nodes = kubernetesClient.nodes().list().getItems();
        final Map<Object, Object> nodeMap = Maps.newHashMap();
        for(Node node : nodes) {
            final String hostname = node.getMetadata().getName();
            nodeMap.put(hostname, node);
            // 更新前先清空
            this.removePods(hostname);
            final PodList nodePods = getNodePods(kubernetesClient, node);
            addPods(hostname, nodePods);
            cachedNodes.add(hostname);
        }
        // 先清空缓存数据
        removeNodes();
        // 更新节点列表,防止有节点不能被正常删除
        final String redisKey = NODE_CACHE_PREFIX + cluster;
        redisUtils.hmputAll(redisKey, nodeMap);
        log.info("Finished init cache of cluster : {}", cluster);
        cacheInitFinished = true;
    }

    /**
     * 添加 node 信息到缓存中
     * @param node
     * @return
     */
    public boolean addNode(Node node) {
        if(! cacheInitFinished) {
            return false;
        }
        final String hostname = node.getMetadata().getName();
        if(StringUtils.isBlank(hostname)) {
            return false;
        }
        final String redisKey = NODE_CACHE_PREFIX + cluster;
        log.info("Insert node, redisKey={}, hostname={}", redisKey, hostname);
        return redisUtils.hmput(redisKey, hostname, node);
    }

    /**
     * 从缓存中移除 node 信息
     * @param node
     * @return
     */
    public boolean removeNode(Node node) {
        if(! cacheInitFinished) {
            return false;
        }
        final String hostname = node.getMetadata().getName();
        final String redisKey = NODE_CACHE_PREFIX + cluster;
        log.info("Remove node, redisKey={}, hostname={}", redisKey, hostname);
        redisUtils.hmdelete(redisKey, hostname);
        return true;
    }

    /**
     * 清空所有 nodes 缓存信息
     * @return
     */
    private boolean removeNodes() {
        final String redisKey = NODE_CACHE_PREFIX + cluster;
        redisUtils.del(redisKey);
        return true;
    }

    /**
     * 从缓存中获取 hostname 对应的 node 信息
     * @param hostname
     * @return
     */
    public Node node(String hostname) {
        final String redisKey = NODE_CACHE_PREFIX + cluster;
        final Object value = redisUtils.hmget(redisKey, hostname);
        if(value == null) {
            return null;
        }
        return FastJsonUtils.toBean(value.toString(), Node.class);
    }

    /**
     * 从缓存中获取所有 nodes 信息
     * @return
     */
    public List<Node> nodes() {
        final List<Node> nodes = Lists.newArrayList();
        final String redisKey = NODE_CACHE_PREFIX + cluster;
        final Map<Object, Object> nodeMap = redisUtils.entries(redisKey);
        final Iterator<Object> iterator = nodeMap.values().iterator();
        while (iterator.hasNext()) {
            nodes.add(FastJsonUtils.toBean(iterator.next().toString(), Node.class));
        }
        return nodes;
    }

    /**
     * 从 K8s 获取 node 关联的 pods 信息
     * @param client
     * @param node
     * @return
     */
    private PodList getNodePods(KubernetesClient client, Node node) {
        final  String fieldSelector = "spec.nodeName=" + node.getMetadata().getName() +
                ",status.phase!=PodSucceeded,status.phase!=PodFailed" ;
        final ListOptions listOptions = new ListOptions();
        listOptions.setFieldSelector(fieldSelector);
        return client.pods().list(listOptions);
    }

    /**
     * 添加 hostname 关联的 pods 信息到缓存中
     * @param hostname
     * @param podList
     * @return
     */
    private boolean addPods(String hostname, PodList podList) {
        final Map<Object, Object> podMap = Maps.newHashMap();
        for(Pod pod : podList.getItems()) {
            // 如果 pod 已停止,则不加入缓存
            final String phase = pod.getStatus().getPhase();
            if("Succeeded".equals(phase) || "Failed".equals(phase)) {
                continue;
            }
            final String uid = pod.getMetadata().getUid();
            podMap.put(uid, pod);
        }
        final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
        log.info("Insert pods, redisKey={}, size={}", redisKey, podMap.size());
        return redisUtils.hmputAll(redisKey, podMap);
    }

    /**
     * 添加 pod 信息到缓存中
     * @param pod
     * @return
     */
    public boolean addPod(Pod pod) {
        // 如果 pod 已停止,则不加入缓存
        final String phase = pod.getStatus().getPhase();
        if("Succeeded".equals(phase) || "Failed".equals(phase)) {
            removePod(pod);
            return false;
        }

        final String hostname = pod.getSpec().getNodeName();
        if(StringUtils.isBlank(hostname) || ! cachedNodes.contains(hostname)) {
            return false;
        }
        final String uid = pod.getMetadata().getUid();
        final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
        log.info("Insert pod, redisKey={}, uid={}", redisKey, uid);
        return redisUtils.hmput(redisKey, uid, pod);
    }

    /**
     * 从缓存中移除 hostname 关联的 pods 信息
     * @param hostname
     * @return
     */
    public boolean removePods(String hostname) {
        log.info("Remove pods of hostname {}", hostname);
        final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
        redisUtils.del(redisKey);
        return true;
    }

    /**
     * 从缓存中移除 pod 信息
     * @param pod
     * @return
     */
    public boolean removePod(Pod pod) {
        final String hostname = pod.getSpec().getNodeName();
        if(StringUtils.isBlank(hostname) || ! cachedNodes.contains(hostname)) {
            return false;
        }
        final String uid = pod.getMetadata().getUid();
        final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;

        log.info("Remove pod, redisKey={}, uid={}", redisKey, uid);
        redisUtils.hmdelete(redisKey, uid);
        return true;
    }

    /**
     * 从缓存中获取 hostname 关联的 pods 信息
     * @param hostname
     * @return
     */
    public List<Pod> pods(String hostname) {
        final List<Pod> pods = Lists.newArrayList();
        final String redisKey = POD_CACHE_PREFIX + cluster + "_" + hostname;
        final Map<Object, Object> podMap = redisUtils.entries(redisKey);
        final Iterator<Object> iterator = podMap.values().iterator();
        while (iterator.hasNext()) {
            pods.add(FastJsonUtils.toBean(iterator.next().toString(), Pod.class));
        }
        return pods;
    }
}

NodeWatcher

@Data
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class NodeWatcher implements Watcher<Node> {
    @NonNull
    private K8sCacheUtils k8sCacheUtils;

    @Override
    public void eventReceived(Action action, Node resource) {
        switch (action) {
            case ADDED:
                k8sCacheUtils.addNode(resource);
                break;
            case ERROR:
                k8sCacheUtils.removeNode(resource);
                break;
            case DELETED:
                k8sCacheUtils.removeNode(resource);
                break;
            case MODIFIED:
                k8sCacheUtils.addNode(resource);
                break;
            default:
        }
    }

    @Override
    public void onClose(KubernetesClientException cause) {

    }
}

PodWatcher

@Data
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class PodWatcher implements Watcher<Pod> {
    @NonNull
    private K8sCacheUtils k8sCacheUtils;

    @Override
    public void eventReceived(Action action, Pod resource) {
        log.info("eventReceived action={}, podName={}", action, resource.getMetadata().getName());
        switch (action) {
            case ADDED:
                k8sCacheUtils.addPod(resource);
                break;
            case ERROR:
                k8sCacheUtils.removePod(resource);
                break;
            case DELETED:
                k8sCacheUtils.removePod(resource);
                break;
            case MODIFIED:
                k8sCacheUtils.addPod(resource);
                break;
            default:
        }
    }

    @Override
    public void onClose(KubernetesClientException cause) {

    }
}

缓存初始化
在系统启动时,为了避免缓存初始化导致启动时间过长,我启动了一个新线程来完成缓存的初始化。

public void initCache(K8sCluster cluster, KubernetesClient kubernetesClient, RedisUtils redisUtils, boolean cacheAndWatch) {
    final K8sCacheUtils k8sCacheUtils = new K8sCacheUtils(kubernetesClient, redisUtils, cluster);
    // 通过变量控制是否在系统启动时初始化 K8s 缓存
    if(cacheAndWatch) {
        k8sCacheUtils.start();
    }
}

细心的朋友可能注意到了,在缓存初始化的过程中,我们并没有去处理 List-Watch 事件,如果在这个过程中 Node 或 Pod 状态变更会不会导致数据不一致呢?答案是:会。我们主要是基于两点考虑:

  1. 缓存初始化的时间比较短(大约1分钟),而集群中Node变更频率非常低,在这期间我们几乎不可能对K8s增减节点
  2. 每处理一个 Node 时,都会实时获取其关联的 Pod 信息,缓存 Pod 信息后立刻启用 List-Watch 事件处理,这个时间非常短,几乎不可能漏掉事件

当然,如果一定要把方案做的滴水不漏,可以在缓存初始化期间把接收到的 List-Watch 事件暂且缓存起来,初始化完成后再处理一遍即可。

本地缓存

通过 Redis缓存 + List-Watch 机制,我们即实现了数据的快速获取,也保证了数据的一致性,这样下来,就把接口耗时缩短到了18s。但,谁不喜欢那种快如闪电的畅爽呢?
于是,我通过定时从 Redis 中读取缓存数据,并聚合后缓存到本地,使接口耗时缩短到了1s内,那种快如闪电的感觉,feel倍儿爽~。
虽然定时更新会带来一定的数据延迟(20s以上),但对我们来说完全可以接收。至此,我的改造方案就完成了!

进一步优化

我刚才也介绍过了,上面的方案会带来20s以上的数据更新延迟,如果有读者在实际生产过程中不能接受这个延迟的话,可以修改缓存初始化和 List-Watch 的处理过程。
可以在初始化时直接从nodes和pods接口把数据聚合好,然后接受到 List-Watch 事件后实时更新聚合后的数据,如此可以使用真正的实时更新。

总结

我的方案大致分成了三步来实现:
第一步:根据nodes和pods信息计算得到各个node可以资源数量。
第二步:通过Redis缓存+List-Watch实现对nodes和pods信息的缓存和实时增量更新,从而加快数据获取速度。
第三步:通过定时任务定期聚合,计算得到各个node可以资源数量,并保存到本地内存中,从而在前端请求到来时快速返回。
希望能给被相同问题困扰的朋友带来一些帮助。

相关文章

网友评论

    本文标题:《实时展示K8s可用资源》方案实现

    本文链接:https://www.haomeiwen.com/subject/mxzfgktx.html