美文网首页
聊聊PowerJob的WorkerHealthReporter

聊聊PowerJob的WorkerHealthReporter

作者: go4it | 来源:发表于2023-12-24 09:36 被阅读0次

    本文主要研究一下PowerJob的WorkerHealthReporter

    WorkerHealthReporter

    tech/powerjob/worker/background/WorkerHealthReporter.java

    @Slf4j
    @RequiredArgsConstructor
    public class WorkerHealthReporter implements Runnable {
    
        private final WorkerRuntime workerRuntime;
    
        @Override
        public void run() {
    
            // 没有可用Server,无法上报
            String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
            if (StringUtils.isEmpty(currentServer)) {
                log.warn("[WorkerHealthReporter] no available server,fail to report health info!");
                return;
            }
    
            SystemMetrics systemMetrics;
    
            if (workerRuntime.getWorkerConfig().getSystemMetricsCollector() == null) {
                systemMetrics = SystemInfoUtils.getSystemMetrics();
            } else {
                systemMetrics = workerRuntime.getWorkerConfig().getSystemMetricsCollector().collect();
            }
    
            WorkerHeartbeat heartbeat = new WorkerHeartbeat();
    
            heartbeat.setSystemMetrics(systemMetrics);
            heartbeat.setWorkerAddress(workerRuntime.getWorkerAddress());
            heartbeat.setAppName(workerRuntime.getWorkerConfig().getAppName());
            heartbeat.setAppId(workerRuntime.getAppId());
            heartbeat.setHeartbeatTime(System.currentTimeMillis());
            heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
            heartbeat.setProtocol(workerRuntime.getWorkerConfig().getProtocol().name());
            heartbeat.setClient("KingPenguin");
            heartbeat.setTag(workerRuntime.getWorkerConfig().getTag());
    
            // 上报 Tracker 数量
            heartbeat.setLightTaskTrackerNum(LightTaskTrackerManager.currentTaskTrackerSize());
            heartbeat.setHeavyTaskTrackerNum(HeavyTaskTrackerManager.currentTaskTrackerSize());
            // 是否超载
            if (workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() <= LightTaskTrackerManager.currentTaskTrackerSize() || workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum() <= HeavyTaskTrackerManager.currentTaskTrackerSize()){
                heartbeat.setOverload(true);
            }
            // 获取当前加载的容器列表
            heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());
            // 发送请求
            if (StringUtils.isEmpty(currentServer)) {
                return;
            }
            // log
            log.info("[WorkerHealthReporter] report health status,appId:{},appName:{},isOverload:{},maxLightweightTaskNum:{},currentLightweightTaskNum:{},maxHeavyweightTaskNum:{},currentHeavyweightTaskNum:{}" ,
                    heartbeat.getAppId(),
                    heartbeat.getAppName(),
                    heartbeat.isOverload(),
                    workerRuntime.getWorkerConfig().getMaxLightweightTaskNum(),
                    heartbeat.getLightTaskTrackerNum(),
                    workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum(),
                    heartbeat.getHeavyTaskTrackerNum()
            );
    
            TransportUtils.reportWorkerHeartbeat(heartbeat, currentServer, workerRuntime.getTransporter());
        }
    }
    

    WorkerHealthReporter实现了Runnable接口,其run方法先获取currentServer,再获取systemMetrics,接着构建WorkerHeartbeat,最后通过TransportUtils.reportWorkerHeartbeat上报

    reportWorkerHeartbeat

    tech/powerjob/worker/common/utils/TransportUtils.java

        public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) {
            final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_HEARTBEAT, address);
            transporter.tell(url, req);
        }
    
        public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) {
            HandlerLocation handlerLocation = new HandlerLocation()
                    .setRootPath(rootPath)
                    .setMethodPath(handlerPath);
            return new URL()
                    .setServerType(serverType)
                    .setAddress(Address.fromIpv4(address))
                    .setLocation(handlerLocation);
        }    
    

    reportWorkerHeartbeat通过transporter.tell发送请求,其rootPath为server,其handlerPath为workerHeartbeat

    processWorkerHeartbeat

    tech/powerjob/server/core/handler/AbWorkerRequestHandler.java

        @Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING)
        public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) {
            long startMs = System.currentTimeMillis();
            WorkerHeartbeatEvent event = new WorkerHeartbeatEvent()
                    .setAppName(heartbeat.getAppName())
                    .setAppId(heartbeat.getAppId())
                    .setVersion(heartbeat.getVersion())
                    .setProtocol(heartbeat.getProtocol())
                    .setTag(heartbeat.getTag())
                    .setWorkerAddress(heartbeat.getWorkerAddress())
                    .setDelayMs(startMs - heartbeat.getHeartbeatTime())
                    .setScore(heartbeat.getSystemMetrics().getScore());
            processWorkerHeartbeat0(heartbeat, event);
            monitorService.monitor(event);
        }
    

    processWorkerHeartbeat方法将heartbeat转换为WorkerHeartbeatEvent,然后执行processWorkerHeartbeat0及monitorService.monitor(event)

    processWorkerHeartbeat0

    tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java

        protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {
            WorkerClusterManagerService.updateStatus(heartbeat);
        }
    

    processWorkerHeartbeat0通过WorkerClusterManagerService.updateStatus(heartbeat)来更新状态

    WorkerClusterManagerService.updateStatus

    tech/powerjob/server/remote/worker/WorkerClusterManagerService.java

        public static void updateStatus(WorkerHeartbeat heartbeat) {
            Long appId = heartbeat.getAppId();
            String appName = heartbeat.getAppName();
            ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
            clusterStatusHolder.updateStatus(heartbeat);
        }
    

    updateStatus先获取appId对应的clusterStatusHolder,然后更新status

    ClusterStatusHolder.updateStatus

    tech/powerjob/server/remote/worker/ClusterStatusHolder.java

        public void updateStatus(WorkerHeartbeat heartbeat) {
    
            String workerAddress = heartbeat.getWorkerAddress();
            long heartbeatTime = heartbeat.getHeartbeatTime();
    
            WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {
                WorkerInfo wf = new WorkerInfo();
                wf.refresh(heartbeat);
                return wf;
            });
            long oldTime = workerInfo.getLastActiveTime();
            if (heartbeatTime < oldTime) {
                log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
                return;
            }
    
            workerInfo.refresh(heartbeat);
    
            List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
            if (!CollectionUtils.isEmpty(containerInfos)) {
                containerInfos.forEach(containerInfo -> {
                    Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
                    infos.put(workerAddress, containerInfo);
                });
            }
        }
    

    ClusterStatusHolder的updateStatus方法先获取workerInfo,判断其heartbeatTime是否小于lastActiveTime,是则返回,否则执行workerInfo.refresh(heartbeat),最后更新一下heartbeat.getContainerInfos()

    refresh

    tech/powerjob/server/common/module/WorkerInfo.java

        public void refresh(WorkerHeartbeat workerHeartbeat) {
            address = workerHeartbeat.getWorkerAddress();
            lastActiveTime = workerHeartbeat.getHeartbeatTime();
            protocol = workerHeartbeat.getProtocol();
            client = workerHeartbeat.getClient();
            tag = workerHeartbeat.getTag();
            systemMetrics = workerHeartbeat.getSystemMetrics();
            containerInfos = workerHeartbeat.getContainerInfos();
    
            lightTaskTrackerNum = workerHeartbeat.getLightTaskTrackerNum();
            heavyTaskTrackerNum = workerHeartbeat.getHeavyTaskTrackerNum();
    
            if (workerHeartbeat.isOverload()) {
                overloading = true;
                lastOverloadTime = workerHeartbeat.getHeartbeatTime();
                log.warn("[WorkerInfo] worker {} is overload!", getAddress());
            } else {
                overloading = false;
            }
        }
    

    WorkerInfo的refresh方法根据workerHeartbeat更新lastActiveTime及overloading等信息

    DisconnectedWorkerFilter

    tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java

    @Slf4j
    @Component
    public class DisconnectedWorkerFilter implements WorkerFilter {
    
        @Override
        public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
            boolean timeout = workerInfo.timeout();
            if (timeout) {
                log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime());
            }
            return timeout;
        }
    }
    

    DisconnectedWorkerFilter实现了WorkerFilter接口,其filter方法返回workerInfo.timeout()

    timeout

    tech/powerjob/server/common/module/WorkerInfo.java

        private static final long WORKER_TIMEOUT_MS = 60000;
    
        public boolean timeout() {
            long timeout = System.currentTimeMillis() - lastActiveTime;
            return timeout > WORKER_TIMEOUT_MS;
        }
    

    timeout方法判断当前时间与lastActiveTime的时间差,之后与默认的WORKER_TIMEOUT_MS(60s)对比

    getSuitableWorkers

    tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

        public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {
    
            List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());
    
            workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
    
            DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
            switch (dispatchStrategy) {
                case RANDOM:
                    Collections.shuffle(workers);
                    break;
                case HEALTH_FIRST:
                    workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
                    break;
                default:
                    // do nothing
            }
    
            // 限定集群大小(0代表不限制)
            if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
                workers = workers.subList(0, jobInfo.getMaxWorkerCount());
            }
            return workers;
        }
    
        private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) {
            for (WorkerFilter filter : workerFilters) {
                if (filter.filter(workerInfo, jobInfo)) {
                    return true;
                }
            }
            return false;
        }    
    

    getSuitableWorkers方法会remove掉filterWorker(workerInfo, jobInfo)为true的worker

    小结

    PowerJob的WorkerHealthReporter实现了Runnable接口,其run方法先获取currentServer,再获取systemMetrics,接着构建WorkerHeartbeat,最后通过TransportUtils.reportWorkerHeartbeat上报;reportWorkerHeartbeat通过transporter.tell发送请求,其rootPath为server,其handlerPath为workerHeartbeat;服务端通过WorkerClusterManagerService.updateStatus(heartbeat)来更新状态,主要是执行WorkerInfo的refresh方法,它根据workerHeartbeat更新lastActiveTime及overloading等信息;而DisconnectedWorkerFilter实现了WorkerFilter接口,其filter方法返回workerInfo.timeout(),它会将心跳超时的worker给排除掉。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的WorkerHealthReporter

          本文链接:https://www.haomeiwen.com/subject/grthndtx.html