美文网首页
聊聊PowerJob的SystemInfoController

聊聊PowerJob的SystemInfoController

作者: go4it | 来源:发表于2024-01-28 09:17 被阅读0次

    本文主要研究一下PowerJob的SystemInfoController

    SystemInfoController

    tech/powerjob/server/web/controller/SystemInfoController.java

    @Slf4j
    @RestController
    @RequestMapping("/system")
    @RequiredArgsConstructor
    public class SystemInfoController {
    
        private final JobInfoRepository jobInfoRepository;
    
        private final InstanceInfoRepository instanceInfoRepository;
    
        private final ServerInfoService serverInfoService;
    
        private final WorkerClusterQueryService workerClusterQueryService;
    
        @GetMapping("/listWorker")
        public ResultDTO<List<WorkerStatusVO>> listWorker(Long appId) {
    
            List<WorkerInfo> workerInfos = workerClusterQueryService.getAllWorkers(appId);
            return ResultDTO.success(workerInfos.stream().map(WorkerStatusVO::new).collect(Collectors.toList()));
        }
    
        @GetMapping("/overview")
        public ResultDTO<SystemOverviewVO> getSystemOverview(Long appId) {
    
            SystemOverviewVO overview = new SystemOverviewVO();
    
            // 总任务数量
            overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV()));
            // 运行任务数
            overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));
            // 近期失败任务数(24H内)
            Date date = DateUtils.addDays(new Date(), -1);
            overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));
    
            // 服务器时区
            overview.setTimezone(TimeZone.getDefault().getDisplayName());
            // 服务器时间
            overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN));
    
            overview.setServerInfo(serverInfoService.fetchServiceInfo());
    
            return ResultDTO.success(overview);
        }
    
    }
    

    SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数

    getAllWorkers

    tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

        @DesignateServer
        public List<WorkerInfo> getAllWorkers(Long appId) {
            List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());
            workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
            return workers;
        }
    
        private Map<String, WorkerInfo> getWorkerInfosByAppId(Long appId) {
            ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId);
            if (clusterStatusHolder == null) {
                log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
                return Collections.emptyMap();
            }
            return clusterStatusHolder.getAllWorkers();
        }
    
        public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
            return WorkerClusterManagerService.getAppId2ClusterStatus();
        }        
    

    getAllWorkers通过getWorkerInfosByAppId获取WorkerInfo,然后根据getSystemMetrics().calculateScore()进行排序

    WorkerClusterManagerService

    tech/powerjob/server/remote/worker/WorkerClusterManagerService.java

    @Slf4j
    public class WorkerClusterManagerService {
    
        /**
         * 存储Worker健康信息,appId -> ClusterStatusHolder
         */
        private static final Map<Long, ClusterStatusHolder> APP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap();
    
        /**
         * 更新状态
         * @param heartbeat Worker的心跳包
         */
        public static void updateStatus(WorkerHeartbeat heartbeat) {
            Long appId = heartbeat.getAppId();
            String appName = heartbeat.getAppName();
            ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
            clusterStatusHolder.updateStatus(heartbeat);
        }
    
        /**
         * 清理不需要的worker信息
         * @param usingAppIds 需要维护的appId,其余的数据将被删除
         */
        public static void clean(List<Long> usingAppIds) {
            Set<Long> keys = Sets.newHashSet(usingAppIds);
            APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
        }
    
    
        /**
         * 清理缓存信息,防止 OOM
         */
        public static void cleanUp() {
            APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release);
        }
    
        protected static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
            return APP_ID_2_CLUSTER_STATUS;
        }
    
    }
    

    WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat)

    updateStatus

    tech/powerjob/server/remote/worker/ClusterStatusHolder.java

        public void updateStatus(WorkerHeartbeat heartbeat) {
    
            String workerAddress = heartbeat.getWorkerAddress();
            long heartbeatTime = heartbeat.getHeartbeatTime();
    
            WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {
                WorkerInfo wf = new WorkerInfo();
                wf.refresh(heartbeat);
                return wf;
            });
            long oldTime = workerInfo.getLastActiveTime();
            if (heartbeatTime < oldTime) {
                log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
                return;
            }
    
            workerInfo.refresh(heartbeat);
    
            List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
            if (!CollectionUtils.isEmpty(containerInfos)) {
                containerInfos.forEach(containerInfo -> {
                    Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
                    infos.put(workerAddress, containerInfo);
                });
            }
        }
    

    updateStatus方法先根据workerAddress获取workerInfo,若heartbeatTime大于等于lastActiveTime则执行workerInfo.refresh(heartbeat),同时更新containerInfos

    getSystemMetrics

    tech/powerjob/worker/common/utils/SystemInfoUtils.java

    public class SystemInfoUtils {
    
        private static final NumberFormat NF = NumberFormat.getNumberInstance();
        static {
            NF.setMaximumFractionDigits(4);
            NF.setMinimumFractionDigits(4);
            NF.setRoundingMode(RoundingMode.HALF_UP);
            // 不按照千分位输出
            NF.setGroupingUsed(false);
        }
    
        // JMX bean can be accessed externally and is meant for management tools like hyperic ( or even nagios ) - It would delegate to Runtime anyway.
        private static final Runtime runtime = Runtime.getRuntime();
        private static final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
    
        public static SystemMetrics getSystemMetrics() {
    
            SystemMetrics metrics = new SystemMetrics();
    
            fillCPUInfo(metrics);
            fillMemoryInfo(metrics);
            fillDiskInfo(metrics);
    
            // 在Worker完成分数计算,减小Server压力
            metrics.calculateScore();
            return metrics;
        }
    
        private static void fillCPUInfo(SystemMetrics metrics) {
            metrics.setCpuProcessors(osMXBean.getAvailableProcessors());
            metrics.setCpuLoad(miniDouble(osMXBean.getSystemLoadAverage()));
        }
    
        private static void fillMemoryInfo(SystemMetrics metrics) {
            // JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存,即-Xmx参数设置的值,totalMemory指JVM当前持久的总内存)
            long maxMemory = runtime.maxMemory();
            long usedMemory = runtime.totalMemory() - runtime.freeMemory();
            metrics.setJvmMaxMemory(bytes2GB(maxMemory));
            // 已使用内存:当前申请总量 - 当前空余量
            metrics.setJvmUsedMemory(bytes2GB(usedMemory));
            // 已用内存比例
            metrics.setJvmMemoryUsage(miniDouble((double) usedMemory / maxMemory));
        }
    
        private static void fillDiskInfo(SystemMetrics metrics) {
            long free = 0;
            long total = 0;
            File[] roots = File.listRoots();
            for (File file : roots) {
                free += file.getFreeSpace();
                total += file.getTotalSpace();
            }
    
            metrics.setDiskUsed(bytes2GB(total - free));
            metrics.setDiskTotal(bytes2GB(total));
            metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal()));
        }
    
        private static double bytes2GB(long bytes) {
            return miniDouble(bytes / 1024.0 / 1024 / 1024);
        }
    
        private static double miniDouble(double origin) {
            return Double.parseDouble(NF.format(origin));
        }
    
    }
    

    SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore();cpu信息通过osMXBean.getAvailableProcessors()、osMXBean.getSystemLoadAverage()获取;memory信息通过Runtime获取;disk信息则通过遍历File.listRoots()去统计freeSpace及totalSpace

    calculateScore

    tech/powerjob/common/model/SystemMetrics.java

        public int calculateScore() {
            if (score > 0) {
                return score;
            }
            // Memory is vital to TaskTracker, so we set the multiplier factor as 2.
            double memScore = (jvmMaxMemory - jvmUsedMemory) * 2;
            // Calculate the remaining load of CPU. Multiplier is set as 1.
            double cpuScore = cpuProcessors - cpuLoad;
            // Windows can not fetch CPU load, set cpuScore as 1.
            if (cpuScore > cpuProcessors) {
                cpuScore = 1;
            }
            score = (int) (memScore + cpuScore);
            return score;
        }
    

    SystemMetrics的calculateScore则是由memScore、cpuScore两部分相加而成;memScore为(jvmMaxMemory - jvmUsedMemory) * 2,cpuScore为cpuProcessors - cpuLoad

    小结

    SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数;WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat);WorkerInfo包含了SystemMetrics,SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore()。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的SystemInfoController

          本文链接:https://www.haomeiwen.com/subject/firwodtx.html