美文网首页
聊聊flink taskmanager的jvm-exit-on-

聊聊flink taskmanager的jvm-exit-on-

作者: go4it | 来源:发表于2019-02-23 11:21 被阅读1次

    本文主要研究一下flink taskmanager的jvm-exit-on-oom配置

    taskmanager.jvm-exit-on-oom

    flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

    @PublicEvolving
    public class TaskManagerOptions {
        //......
    
        /**
         * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
         */
        public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
                key("taskmanager.jvm-exit-on-oom")
                .defaultValue(false)
                .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.");
    
        //......
    }
    
    • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager

    TaskManagerConfiguration

    flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

    public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
    
        private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);
    
        private final int numberSlots;
    
        private final String[] tmpDirectories;
    
        private final Time timeout;
    
        // null indicates an infinite duration
        @Nullable
        private final Time maxRegistrationDuration;
    
        private final Time initialRegistrationPause;
        private final Time maxRegistrationPause;
        private final Time refusedRegistrationPause;
    
        private final UnmodifiableConfiguration configuration;
    
        private final boolean exitJvmOnOutOfMemory;
    
        private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
    
        private final String[] alwaysParentFirstLoaderPatterns;
    
        @Nullable
        private final String taskManagerLogPath;
    
        @Nullable
        private final String taskManagerStdoutPath;
    
        public TaskManagerConfiguration(
            int numberSlots,
            String[] tmpDirectories,
            Time timeout,
            @Nullable Time maxRegistrationDuration,
            Time initialRegistrationPause,
            Time maxRegistrationPause,
            Time refusedRegistrationPause,
            Configuration configuration,
            boolean exitJvmOnOutOfMemory,
            FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
            String[] alwaysParentFirstLoaderPatterns,
            @Nullable String taskManagerLogPath,
            @Nullable String taskManagerStdoutPath) {
    
            this.numberSlots = numberSlots;
            this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
            this.timeout = Preconditions.checkNotNull(timeout);
            this.maxRegistrationDuration = maxRegistrationDuration;
            this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
            this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
            this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
            this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
            this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
            this.classLoaderResolveOrder = classLoaderResolveOrder;
            this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
            this.taskManagerLogPath = taskManagerLogPath;
            this.taskManagerStdoutPath = taskManagerStdoutPath;
        }
    
        public int getNumberSlots() {
            return numberSlots;
        }
    
        public Time getTimeout() {
            return timeout;
        }
    
        @Nullable
        public Time getMaxRegistrationDuration() {
            return maxRegistrationDuration;
        }
    
        public Time getInitialRegistrationPause() {
            return initialRegistrationPause;
        }
    
        @Nullable
        public Time getMaxRegistrationPause() {
            return maxRegistrationPause;
        }
    
        public Time getRefusedRegistrationPause() {
            return refusedRegistrationPause;
        }
    
        @Override
        public Configuration getConfiguration() {
            return configuration;
        }
    
        @Override
        public String[] getTmpDirectories() {
            return tmpDirectories;
        }
    
        @Override
        public boolean shouldExitJvmOnOutOfMemoryError() {
            return exitJvmOnOutOfMemory;
        }
    
        public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() {
            return classLoaderResolveOrder;
        }
    
        public String[] getAlwaysParentFirstLoaderPatterns() {
            return alwaysParentFirstLoaderPatterns;
        }
    
        @Nullable
        public String getTaskManagerLogPath() {
            return taskManagerLogPath;
        }
    
        @Nullable
        public String getTaskManagerStdoutPath() {
            return taskManagerStdoutPath;
        }
    
        // --------------------------------------------------------------------------------------------
        //  Static factory methods
        // --------------------------------------------------------------------------------------------
    
        public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
            int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
    
            if (numberSlots == -1) {
                numberSlots = 1;
            }
    
            final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);
    
            final Time timeout;
    
            try {
                timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
            } catch (Exception e) {
                throw new IllegalArgumentException(
                    "Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() +
                        "'.Use formats like '50 s' or '1 min' to specify the timeout.");
            }
    
            LOG.info("Messages have a max timeout of " + timeout);
    
            final Time finiteRegistrationDuration;
    
            try {
                Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
                if (maxRegistrationDuration.isFinite()) {
                    finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
                } else {
                    finiteRegistrationDuration = null;
                }
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid format for parameter " +
                    TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
            }
    
            final Time initialRegistrationPause;
            try {
                Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
                if (pause.isFinite()) {
                    initialRegistrationPause = Time.milliseconds(pause.toMillis());
                } else {
                    throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);
                }
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid format for parameter " +
                    TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
            }
    
            final Time maxRegistrationPause;
            try {
                Duration pause = Duration.create(configuration.getString(
                    TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
                if (pause.isFinite()) {
                    maxRegistrationPause = Time.milliseconds(pause.toMillis());
                } else {
                    throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);
                }
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid format for parameter " +
                    TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
            }
    
            final Time refusedRegistrationPause;
            try {
                Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
                if (pause.isFinite()) {
                    refusedRegistrationPause = Time.milliseconds(pause.toMillis());
                } else {
                    throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);
                }
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid format for parameter " +
                    TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
            }
    
            final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);
    
            final String classLoaderResolveOrder =
                configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
    
            final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration);
    
            final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
            final String taskManagerStdoutPath;
    
            if (taskManagerLogPath != null) {
                final int extension = taskManagerLogPath.lastIndexOf('.');
    
                if (extension > 0) {
                    taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";
                } else {
                    taskManagerStdoutPath = null;
                }
            } else {
                taskManagerStdoutPath = null;
            }
    
            return new TaskManagerConfiguration(
                numberSlots,
                tmpDirPaths,
                timeout,
                finiteRegistrationDuration,
                initialRegistrationPause,
                maxRegistrationPause,
                refusedRegistrationPause,
                configuration,
                exitOnOom,
                FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
                alwaysParentFirstLoaderPatterns,
                taskManagerLogPath,
                taskManagerStdoutPath);
        }
    }
    
    • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性

    Task

    flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java

    public class Task implements Runnable, TaskActions, CheckpointListener {
        //......
    
        @Override
        public void run() {
    
            // ----------------------------
            //  Initial State transition
            // ----------------------------
            //......
    
            // all resource acquisitions and registrations from here on
            // need to be undone in the end
            Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
            AbstractInvokable invokable = null;
    
            try {
                //......
    
                // ----------------------------------------------------------------
                //  call the user code initialization methods
                // ----------------------------------------------------------------
    
                TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
    
                Environment env = new RuntimeEnvironment(
                    jobId,
                    vertexId,
                    executionId,
                    executionConfig,
                    taskInfo,
                    jobConfiguration,
                    taskConfiguration,
                    userCodeClassLoader,
                    memoryManager,
                    ioManager,
                    broadcastVariableManager,
                    taskStateManager,
                    accumulatorRegistry,
                    kvStateRegistry,
                    inputSplitProvider,
                    distributedCacheEntries,
                    producedPartitions,
                    inputGates,
                    network.getTaskEventDispatcher(),
                    checkpointResponder,
                    taskManagerConfig,
                    metrics,
                    this);
    
                // now load and instantiate the task's invokable code
                invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
    
                // ----------------------------------------------------------------
                //  actual task core work
                // ----------------------------------------------------------------
    
                // we must make strictly sure that the invokable is accessible to the cancel() call
                // by the time we switched to running.
                this.invokable = invokable;
    
                // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
                if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                    throw new CancelTaskException();
                }
    
                // notify everyone that we switched to running
                taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
    
                // make sure the user code classloader is accessible thread-locally
                executingThread.setContextClassLoader(userCodeClassLoader);
    
                // run the invokable
                invokable.invoke();
    
                // make sure, we enter the catch block if the task leaves the invoke() method due
                // to the fact that it has been canceled
                if (isCanceledOrFailed()) {
                    throw new CancelTaskException();
                }
    
                // ----------------------------------------------------------------
                //  finalization of a successful execution
                // ----------------------------------------------------------------
    
                // finish the produced partitions. if this fails, we consider the execution failed.
                for (ResultPartition partition : producedPartitions) {
                    if (partition != null) {
                        partition.finish();
                    }
                }
    
                // try to mark the task as finished
                // if that fails, the task was canceled/failed in the meantime
                if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                    throw new CancelTaskException();
                }
            }
            catch (Throwable t) {
    
                // unwrap wrapped exceptions to make stack traces more compact
                if (t instanceof WrappingRuntimeException) {
                    t = ((WrappingRuntimeException) t).unwrap();
                }
    
                // ----------------------------------------------------------------
                // the execution failed. either the invokable code properly failed, or
                // an exception was thrown as a side effect of cancelling
                // ----------------------------------------------------------------
    
                try {
                    // check if the exception is unrecoverable
                    if (ExceptionUtils.isJvmFatalError(t) ||
                            (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {
    
                        // terminate the JVM immediately
                        // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete
                        try {
                            LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t);
                        } finally {
                            Runtime.getRuntime().halt(-1);
                        }
                    }
    
                    // transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
                    // loop for multiple retries during concurrent state changes via calls to cancel() or
                    // to failExternally()
                    while (true) {
                        ExecutionState current = this.executionState;
    
                        if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                            if (t instanceof CancelTaskException) {
                                if (transitionState(current, ExecutionState.CANCELED)) {
                                    cancelInvokable(invokable);
                                    break;
                                }
                            }
                            else {
                                if (transitionState(current, ExecutionState.FAILED, t)) {
                                    // proper failure of the task. record the exception as the root cause
                                    failureCause = t;
                                    cancelInvokable(invokable);
    
                                    break;
                                }
                            }
                        }
                        else if (current == ExecutionState.CANCELING) {
                            if (transitionState(current, ExecutionState.CANCELED)) {
                                break;
                            }
                        }
                        else if (current == ExecutionState.FAILED) {
                            // in state failed already, no transition necessary any more
                            break;
                        }
                        // unexpected state, go to failed
                        else if (transitionState(current, ExecutionState.FAILED, t)) {
                            LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current);
                            break;
                        }
                        // else fall through the loop and
                    }
                }
                catch (Throwable tt) {
                    String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId);
                    LOG.error(message, tt);
                    notifyFatalError(message, tt);
                }
            }
            finally {
                //......
            }
        }
    
        //......
    }
    
    • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM

    ExceptionUtils.isJvmFatalError

    flink-1.7.2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java

    @Internal
    public final class ExceptionUtils {
        //......
    
        /**
         * Checks whether the given exception indicates a situation that may leave the
         * JVM in a corrupted state, meaning a state where continued normal operation can only be
         * guaranteed via clean process restart.
         *
         * <p>Currently considered fatal exceptions are Virtual Machine errors indicating
         * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError},
         * and {@link java.util.zip.ZipError} (a special case of InternalError).
         * The {@link ThreadDeath} exception is also treated as a fatal error, because when
         * a thread is forcefully stopped, there is a high chance that parts of the system
         * are in an inconsistent state.
         *
         * @param t The exception to check.
         * @return True, if the exception is considered fatal to the JVM, false otherwise.
         */
        public static boolean isJvmFatalError(Throwable t) {
            return (t instanceof InternalError) || (t instanceof UnknownError) || (t instanceof ThreadDeath);
        }
    
        //......
    }
    
    • isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true

    Runtime.getRuntime().halt

    java.base/java/lang/Runtime.java

    public class Runtime {
        //......
    
        private static final Runtime currentRuntime = new Runtime();
    
        /**
         * Returns the runtime object associated with the current Java application.
         * Most of the methods of class {@code Runtime} are instance
         * methods and must be invoked with respect to the current runtime object.
         *
         * @return  the {@code Runtime} object associated with the current
         *          Java application.
         */
        public static Runtime getRuntime() {
            return currentRuntime;
        }
    
        /**
         * Forcibly terminates the currently running Java virtual machine.  This
         * method never returns normally.
         *
         * <p> This method should be used with extreme caution.  Unlike the
         * {@link #exit exit} method, this method does not cause shutdown
         * hooks to be started.  If the shutdown sequence has already been
         * initiated then this method does not wait for any running
         * shutdown hooks to finish their work.
         *
         * @param  status
         *         Termination status. By convention, a nonzero status code
         *         indicates abnormal termination. If the {@link Runtime#exit exit}
         *         (equivalently, {@link System#exit(int) System.exit}) method
         *         has already been invoked then this status code
         *         will override the status code passed to that method.
         *
         * @throws SecurityException
         *         If a security manager is present and its
         *         {@link SecurityManager#checkExit checkExit} method
         *         does not permit an exit with the specified status
         *
         * @see #exit
         * @see #addShutdownHook
         * @see #removeShutdownHook
         * @since 1.3
         */
        public void halt(int status) {
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) {
                sm.checkExit(status);
            }
            Shutdown.beforeHalt();
            Shutdown.halt(status);
        }
    
        //......
    }
    
    • halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

    小结

    • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
    • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
    • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM;isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true;halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

    doc

    相关文章

      网友评论

          本文标题:聊聊flink taskmanager的jvm-exit-on-

          本文链接:https://www.haomeiwen.com/subject/sbguyqtx.html