美文网首页
聊聊flink的slot.idle.timeout配置

聊聊flink的slot.idle.timeout配置

作者: go4it | 来源:发表于2019-03-05 12:52 被阅读9次

    本文主要研究一下flink的slot.idle.timeout配置

    JobManagerOptions

    flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

    @PublicEvolving
    public class JobManagerOptions {
        //......
    
        /**
         * The timeout in milliseconds for a idle slot in Slot Pool.
         */
        public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
            key("slot.idle.timeout")
                // default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
                .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
                .withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
    
        //......
    }
    
    • slot.idle.timeout默认为HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue(),即50000L毫秒

    SlotPool

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java

    public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
    
        /** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level. */
        private static final int STATUS_LOG_INTERVAL_MS = 60_000;
    
        private final JobID jobId;
    
        private final SchedulingStrategy schedulingStrategy;
    
        private final ProviderAndOwner providerAndOwner;
    
        /** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */
        private final HashSet<ResourceID> registeredTaskManagers;
    
        /** The book-keeping of all allocated slots. */
        private final AllocatedSlots allocatedSlots;
    
        /** The book-keeping of all available slots. */
        private final AvailableSlots availableSlots;
    
        /** All pending requests waiting for slots. */
        private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;
    
        /** The requests that are waiting for the resource manager to be connected. */
        private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
    
        /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */
        private final Time rpcTimeout;
    
        /** Timeout for releasing idle slots. */
        private final Time idleSlotTimeout;
    
        private final Clock clock;
    
        /** Managers for the different slot sharing groups. */
        protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;
    
        /** the fencing token of the job manager. */
        private JobMasterId jobMasterId;
    
        /** The gateway to communicate with resource manager. */
        private ResourceManagerGateway resourceManagerGateway;
    
        private String jobManagerAddress;
    
        //......
    
        /**
         * Start the slot pool to accept RPC calls.
         *
         * @param jobMasterId The necessary leader id for running the job.
         * @param newJobManagerAddress for the slot requests which are sent to the resource manager
         */
        public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
            this.jobMasterId = checkNotNull(jobMasterId);
            this.jobManagerAddress = checkNotNull(newJobManagerAddress);
    
            // TODO - start should not throw an exception
            try {
                super.start();
            } catch (Exception e) {
                throw new RuntimeException("This should never happen", e);
            }
    
            scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
    
            if (log.isDebugEnabled()) {
                scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
            }
        }
    
        /**
         * Check the available slots, release the slot that is idle for a long time.
         */
        private void checkIdleSlot() {
    
            // The timestamp in SlotAndTimestamp is relative
            final long currentRelativeTimeMillis = clock.relativeTimeMillis();
    
            final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());
    
            for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
                if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
                    expiredSlots.add(slotAndTimestamp.slot);
                }
            }
    
            final FlinkException cause = new FlinkException("Releasing idle slot.");
    
            for (AllocatedSlot expiredSlot : expiredSlots) {
                final AllocationID allocationID = expiredSlot.getAllocationId();
                if (availableSlots.tryRemove(allocationID) != null) {
    
                    log.info("Releasing idle slot [{}].", allocationID);
                    final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
                        allocationID,
                        cause,
                        rpcTimeout);
    
                    freeSlotFuture.whenCompleteAsync(
                        (Acknowledge ignored, Throwable throwable) -> {
                            if (throwable != null) {
                                if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
                                    log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
                                        "Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
                                        throwable);
                                    tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
                                } else {
                                    log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
                                        "longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
                                }
                            }
                        },
                        getMainThreadExecutor());
                }
            }
    
            scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
        }
    
        //......
    }
    
    • SlotPool在start方法里头,调用scheduleRunAsync方法,延时idleSlotTimeout调度执行checkIdleSlot;checkIdleSlot方法会挨个检查availableSlots的SlotAndTimestamp,判断当前时间与slotAndTimestamp.timestamp的时间差是否超过idleSlotTimeout,超过的话,则放入expiredSlots,之后对expiredSlots挨个进行availableSlots.tryRemove,然后调用TaskManagerGateway.freeSlot进行释放,之后再次调用scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout)进行下一次的延时调度检测

    RpcEndpoint

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java

    public abstract class RpcEndpoint implements RpcGateway {
        //......
    
        /**
         * Execute the runnable in the main thread of the underlying RPC endpoint, with
         * a delay of the given number of milliseconds.
         *
         * @param runnable Runnable to be executed
         * @param delay    The delay after which the runnable will be executed
         */
        protected void scheduleRunAsync(Runnable runnable, Time delay) {
            scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
        }
    
        /**
         * Execute the runnable in the main thread of the underlying RPC endpoint, with
         * a delay of the given number of milliseconds.
         *
         * @param runnable Runnable to be executed
         * @param delay    The delay after which the runnable will be executed
         */
        protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
            rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));
        }
    
        //......
    }
    
    • RpcEndpoint提供了scheduleRunAsync,其最后调用的是rpcServer.scheduleRunAsync

    小结

    • slot.idle.timeout默认为HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue(),即50000L毫秒
    • SlotPool在start方法里头,调用scheduleRunAsync方法,延时idleSlotTimeout调度执行checkIdleSlot;checkIdleSlot方法会挨个检查availableSlots的SlotAndTimestamp,判断当前时间与slotAndTimestamp.timestamp的时间差是否超过idleSlotTimeout,超过的话,则放入expiredSlots,之后对expiredSlots挨个进行availableSlots.tryRemove,然后调用TaskManagerGateway.freeSlot进行释放,之后再次调用scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout)进行下一次的延时调度检测
    • RpcEndpoint提供了scheduleRunAsync,其最后调用的是rpcServer.scheduleRunAsync

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的slot.idle.timeout配置

          本文链接:https://www.haomeiwen.com/subject/ubmnuqtx.html