美文网首页Flink
聊聊flink的slot.request.timeout配置

聊聊flink的slot.request.timeout配置

作者: go4it | 来源:发表于2019-03-04 13:18 被阅读8次

    本文主要研究一下flink的slot.request.timeout配置

    JobManagerOptions

    flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

    @PublicEvolving
    public class JobManagerOptions {
        //......
    
        /**
         * The timeout in milliseconds for requesting a slot from Slot Pool.
         */
        public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT =
            key("slot.request.timeout")
            .defaultValue(5L * 60L * 1000L)
            .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool.");
    
        //......
    }
    
    • slot.request.timeout默认为5分钟

    SlotManagerConfiguration

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java

    public class SlotManagerConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class);
    
        private final Time taskManagerRequestTimeout;
        private final Time slotRequestTimeout;
        private final Time taskManagerTimeout;
    
        public SlotManagerConfiguration(
                Time taskManagerRequestTimeout,
                Time slotRequestTimeout,
                Time taskManagerTimeout) {
            this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
            this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
            this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
        }
    
        public Time getTaskManagerRequestTimeout() {
            return taskManagerRequestTimeout;
        }
    
        public Time getSlotRequestTimeout() {
            return slotRequestTimeout;
        }
    
        public Time getTaskManagerTimeout() {
            return taskManagerTimeout;
        }
    
        public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
            final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
            final Time rpcTimeout;
    
            try {
                rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
            } catch (NumberFormatException e) {
                throw new ConfigurationException("Could not parse the resource manager's timeout " +
                    "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
            }
    
            final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
            final Time taskManagerTimeout = Time.milliseconds(
                    configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
    
            return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout);
        }
    
        private static Time getSlotRequestTimeout(final Configuration configuration) {
            final long slotRequestTimeoutMs;
            if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) {
                LOGGER.warn("Config key {} is deprecated; use {} instead.",
                    ResourceManagerOptions.SLOT_REQUEST_TIMEOUT,
                    JobManagerOptions.SLOT_REQUEST_TIMEOUT);
                slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT);
            } else {
                slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
            }
            return Time.milliseconds(slotRequestTimeoutMs);
        }
    }
    
    • SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT

    SlotManager

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java

    public class SlotManager implements AutoCloseable {
        private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
    
        /** Scheduled executor for timeouts. */
        private final ScheduledExecutor scheduledExecutor;
    
        /** Timeout for slot requests to the task manager. */
        private final Time taskManagerRequestTimeout;
    
        /** Timeout after which an allocation is discarded. */
        private final Time slotRequestTimeout;
    
        /** Timeout after which an unused TaskManager is released. */
        private final Time taskManagerTimeout;
    
        /** Map for all registered slots. */
        private final HashMap<SlotID, TaskManagerSlot> slots;
    
        /** Index of all currently free slots. */
        private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
    
        /** All currently registered task managers. */
        private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
    
        /** Map of fulfilled and active allocations for request deduplication purposes. */
        private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
    
        /** Map of pending/unfulfilled slot allocation requests. */
        private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
    
        private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;
    
        /** ResourceManager's id. */
        private ResourceManagerId resourceManagerId;
    
        /** Executor for future callbacks which have to be "synchronized". */
        private Executor mainThreadExecutor;
    
        /** Callbacks for resource (de-)allocations. */
        private ResourceActions resourceActions;
    
        private ScheduledFuture<?> taskManagerTimeoutCheck;
    
        private ScheduledFuture<?> slotRequestTimeoutCheck;
    
        /** True iff the component has been started. */
        private boolean started;
    
        public SlotManager(
                ScheduledExecutor scheduledExecutor,
                Time taskManagerRequestTimeout,
                Time slotRequestTimeout,
                Time taskManagerTimeout) {
            this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
            this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
            this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
            this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
    
            slots = new HashMap<>(16);
            freeSlots = new LinkedHashMap<>(16);
            taskManagerRegistrations = new HashMap<>(4);
            fulfilledSlotRequests = new HashMap<>(16);
            pendingSlotRequests = new HashMap<>(16);
            pendingSlots = new HashMap<>(16);
    
            resourceManagerId = null;
            resourceActions = null;
            mainThreadExecutor = null;
            taskManagerTimeoutCheck = null;
            slotRequestTimeoutCheck = null;
    
            started = false;
        }
    
        public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
            LOG.info("Starting the SlotManager.");
    
            this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
            mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
            resourceActions = Preconditions.checkNotNull(newResourceActions);
    
            started = true;
    
            taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
                () -> mainThreadExecutor.execute(
                    () -> checkTaskManagerTimeouts()),
                0L,
                taskManagerTimeout.toMilliseconds(),
                TimeUnit.MILLISECONDS);
    
            slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
                () -> mainThreadExecutor.execute(
                    () -> checkSlotRequestTimeouts()),
                0L,
                slotRequestTimeout.toMilliseconds(),
                TimeUnit.MILLISECONDS);
        }
    
        /**
         * Suspends the component. This clears the internal state of the slot manager.
         */
        public void suspend() {
            LOG.info("Suspending the SlotManager.");
    
            // stop the timeout checks for the TaskManagers and the SlotRequests
            if (taskManagerTimeoutCheck != null) {
                taskManagerTimeoutCheck.cancel(false);
                taskManagerTimeoutCheck = null;
            }
    
            if (slotRequestTimeoutCheck != null) {
                slotRequestTimeoutCheck.cancel(false);
                slotRequestTimeoutCheck = null;
            }
    
            for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
                cancelPendingSlotRequest(pendingSlotRequest);
            }
    
            pendingSlotRequests.clear();
    
            ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet());
    
            for (InstanceID registeredTaskManager : registeredTaskManagers) {
                unregisterTaskManager(registeredTaskManager);
            }
    
            resourceManagerId = null;
            resourceActions = null;
            started = false;
        }
    
        public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
            checkInit();
    
            if (checkDuplicateRequest(slotRequest.getAllocationId())) {
                LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
    
                return false;
            } else {
                PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
    
                pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
    
                try {
                    internalRequestSlot(pendingSlotRequest);
                } catch (ResourceManagerException e) {
                    // requesting the slot failed --> remove pending slot request
                    pendingSlotRequests.remove(slotRequest.getAllocationId());
    
                    throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
                }
    
                return true;
            }
        }
    
        private void checkSlotRequestTimeouts() {
            if (!pendingSlotRequests.isEmpty()) {
                long currentTime = System.currentTimeMillis();
    
                Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();
    
                while (slotRequestIterator.hasNext()) {
                    PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();
    
                    if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
                        slotRequestIterator.remove();
    
                        if (slotRequest.isAssigned()) {
                            cancelPendingSlotRequest(slotRequest);
                        }
    
                        resourceActions.notifyAllocationFailure(
                            slotRequest.getJobId(),
                            slotRequest.getAllocationId(),
                            new TimeoutException("The allocation could not be fulfilled in time."));
                    }
                }
            }
        }
    
        //......
    
    }
    
    • SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
    • registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException
    • checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure

    小结

    • SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT;slot.request.timeout默认为5分钟
    • SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
    • registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException;checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure

    doc

    相关文章

      网友评论

        本文标题:聊聊flink的slot.request.timeout配置

        本文链接:https://www.haomeiwen.com/subject/mcabuqtx.html