美文网首页邵红晓
聊聊flink StreamOperator的initializ

聊聊flink StreamOperator的initializ

作者: go4it | 来源:发表于2018-12-08 14:20 被阅读29次

    本文主要研究一下flink StreamOperator的initializeState方法

    Task.run

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

    public class Task implements Runnable, TaskActions, CheckpointListener {
    
        public void run() {
    
            // ----------------------------
            //  Initial State transition
            // ----------------------------
            while (true) {
                ExecutionState current = this.executionState;
                if (current == ExecutionState.CREATED) {
                    if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                        // success, we can start our work
                        break;
                    }
                }
                else if (current == ExecutionState.FAILED) {
                    // we were immediately failed. tell the TaskManager that we reached our final state
                    notifyFinalState();
                    if (metrics != null) {
                        metrics.close();
                    }
                    return;
                }
                else if (current == ExecutionState.CANCELING) {
                    if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                        // we were immediately canceled. tell the TaskManager that we reached our final state
                        notifyFinalState();
                        if (metrics != null) {
                            metrics.close();
                        }
                        return;
                    }
                }
                else {
                    if (metrics != null) {
                        metrics.close();
                    }
                    throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
                }
            }
    
            // all resource acquisitions and registrations from here on
            // need to be undone in the end
            Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
            AbstractInvokable invokable = null;
    
            try {
                // ----------------------------
                //  Task Bootstrap - We periodically
                //  check for canceling as a shortcut
                // ----------------------------
    
                //......
    
                // ----------------------------------------------------------------
                //  call the user code initialization methods
                // ----------------------------------------------------------------
    
                TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
    
                Environment env = new RuntimeEnvironment(
                    jobId,
                    vertexId,
                    executionId,
                    executionConfig,
                    taskInfo,
                    jobConfiguration,
                    taskConfiguration,
                    userCodeClassLoader,
                    memoryManager,
                    ioManager,
                    broadcastVariableManager,
                    taskStateManager,
                    accumulatorRegistry,
                    kvStateRegistry,
                    inputSplitProvider,
                    distributedCacheEntries,
                    producedPartitions,
                    inputGates,
                    network.getTaskEventDispatcher(),
                    checkpointResponder,
                    taskManagerConfig,
                    metrics,
                    this);
    
                // now load and instantiate the task's invokable code
                invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
    
                // ----------------------------------------------------------------
                //  actual task core work
                // ----------------------------------------------------------------
    
                // we must make strictly sure that the invokable is accessible to the cancel() call
                // by the time we switched to running.
                this.invokable = invokable;
    
                // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
                if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                    throw new CancelTaskException();
                }
    
                // notify everyone that we switched to running
                taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
    
                // make sure the user code classloader is accessible thread-locally
                executingThread.setContextClassLoader(userCodeClassLoader);
    
                // run the invokable
                invokable.invoke();
    
                // make sure, we enter the catch block if the task leaves the invoke() method due
                // to the fact that it has been canceled
                if (isCanceledOrFailed()) {
                    throw new CancelTaskException();
                }
    
                // ----------------------------------------------------------------
                //  finalization of a successful execution
                // ----------------------------------------------------------------
    
                // finish the produced partitions. if this fails, we consider the execution failed.
                for (ResultPartition partition : producedPartitions) {
                    if (partition != null) {
                        partition.finish();
                    }
                }
    
                // try to mark the task as finished
                // if that fails, the task was canceled/failed in the meantime
                if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                    throw new CancelTaskException();
                }
            }
            catch (Throwable t) {
                //......
            }
            finally {
                //......
            }
        }
        
        //......
    }
    
    • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

    StreamTask.invoke

    flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

    @Internal
    public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
            extends AbstractInvokable
            implements AsyncExceptionHandler {
    
        @Override
        public final void invoke() throws Exception {
    
            boolean disposed = false;
            try {
                // -------- Initialize ---------
                LOG.debug("Initializing {}.", getName());
    
                asyncOperationsThreadPool = Executors.newCachedThreadPool();
    
                CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
    
                synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
                    getExecutionConfig().isFailTaskOnCheckpointError(),
                    getEnvironment());
    
                asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
    
                stateBackend = createStateBackend();
                checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
    
                // if the clock is not already set, then assign a default TimeServiceProvider
                if (timerService == null) {
                    ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                        "Time Trigger for " + getName(), getUserCodeClassLoader());
    
                    timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
                }
    
                operatorChain = new OperatorChain<>(this, streamRecordWriters);
                headOperator = operatorChain.getHeadOperator();
    
                // task specific initialization
                init();
    
                // save the work of reloading state, etc, if the task is already canceled
                if (canceled) {
                    throw new CancelTaskException();
                }
    
                // -------- Invoke --------
                LOG.debug("Invoking {}", getName());
    
                // we need to make sure that any triggers scheduled in open() cannot be
                // executed before all operators are opened
                synchronized (lock) {
    
                    // both the following operations are protected by the lock
                    // so that we avoid race conditions in the case that initializeState()
                    // registers a timer, that fires before the open() is called.
    
                    initializeState();
                    openAllOperators();
                }
    
                // final check to exit early before starting to run
                if (canceled) {
                    throw new CancelTaskException();
                }
    
                // let the task do its work
                isRunning = true;
                run();
    
                // if this left the run() method cleanly despite the fact that this was canceled,
                // make sure the "clean shutdown" is not attempted
                if (canceled) {
                    throw new CancelTaskException();
                }
    
                LOG.debug("Finished task {}", getName());
    
                // make sure no further checkpoint and notification actions happen.
                // we make sure that no other thread is currently in the locked scope before
                // we close the operators by trying to acquire the checkpoint scope lock
                // we also need to make sure that no triggers fire concurrently with the close logic
                // at the same time, this makes sure that during any "regular" exit where still
                synchronized (lock) {
                    // this is part of the main logic, so if this fails, the task is considered failed
                    closeAllOperators();
    
                    // make sure no new timers can come
                    timerService.quiesce();
    
                    // only set the StreamTask to not running after all operators have been closed!
                    // See FLINK-7430
                    isRunning = false;
                }
    
                // make sure all timers finish
                timerService.awaitPendingAfterQuiesce();
    
                LOG.debug("Closed operators for task {}", getName());
    
                // make sure all buffered data is flushed
                operatorChain.flushOutputs();
    
                // make an attempt to dispose the operators such that failures in the dispose call
                // still let the computation fail
                tryDisposeAllOperators();
                disposed = true;
            }
            finally {
                //......
            }
        }
    
        private void initializeState() throws Exception {
    
            StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
    
            for (StreamOperator<?> operator : allOperators) {
                if (null != operator) {
                    operator.initializeState();
                }
            }
        }
    
        //......
    }
    
    • StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource

    StreamOperator.initializeState

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.java

    @PublicEvolving
    public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
        /**
         * Provides a context to initialize all state in the operator.
         */
        void initializeState() throws Exception;
    
        //......
    }
    
    • StreamOperator接口定义了initializeState方法用于初始化operator的state

    StreamSource.initializeState

    flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

    @Internal
    public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
            extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
    
            //......
    }
    
    • StreamSource继承了AbstractUdfStreamOperator,它没有覆盖initializeState,而AbstractUdfStreamOperator也没有覆盖initializeState方法,因而是执行的是AbstractUdfStreamOperator的父类AbstractStreamOperator的initializeState

    AbstractStreamOperator.initializeState

    flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

    @PublicEvolving
    public abstract class AbstractStreamOperator<OUT>
            implements StreamOperator<OUT>, Serializable {
    
        @Override
        public final void initializeState() throws Exception {
    
            final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
    
            final StreamTask<?, ?> containingTask =
                Preconditions.checkNotNull(getContainingTask());
            final CloseableRegistry streamTaskCloseableRegistry =
                Preconditions.checkNotNull(containingTask.getCancelables());
            final StreamTaskStateInitializer streamTaskStateManager =
                Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
    
            final StreamOperatorStateContext context =
                streamTaskStateManager.streamOperatorStateContext(
                    getOperatorID(),
                    getClass().getSimpleName(),
                    this,
                    keySerializer,
                    streamTaskCloseableRegistry,
                    metrics);
    
            this.operatorStateBackend = context.operatorStateBackend();
            this.keyedStateBackend = context.keyedStateBackend();
    
            if (keyedStateBackend != null) {
                this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
            }
    
            timeServiceManager = context.internalTimerServiceManager();
    
            CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
            CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
    
            try {
                StateInitializationContext initializationContext = new StateInitializationContextImpl(
                    context.isRestored(), // information whether we restore or start for the first time
                    operatorStateBackend, // access to operator state backend
                    keyedStateStore, // access to keyed state backend
                    keyedStateInputs, // access to keyed state stream
                    operatorStateInputs); // access to operator state stream
    
                initializeState(initializationContext);
            } finally {
                closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
                closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
            }
        }
    
        /**
         * Stream operators with state which can be restored need to override this hook method.
         *
         * @param context context that allows to register different states.
         */
        public void initializeState(StateInitializationContext context) throws Exception {
    
        }
    
        //......
    }
    
    • AbstractStreamOperator实现了StreamOperator接口定义的initializeState方法,该方法会调用initializeState(initializationContext)方法,其子类AbstractUdfStreamOperator对该方法进行了覆盖

    AbstractUdfStreamOperator.initializeState(initializationContext)

    flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

    @PublicEvolving
    public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
            extends AbstractStreamOperator<OUT>
            implements OutputTypeConfigurable<OUT> {
    
        @Override
        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            StreamingFunctionUtils.restoreFunctionState(context, userFunction);
        }
        
        //......
    }
    
    • initializeState(initializationContext)方法这里调用了StreamingFunctionUtils.restoreFunctionState

    StreamingFunctionUtils.restoreFunctionState

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

        public static void restoreFunctionState(
                StateInitializationContext context,
                Function userFunction) throws Exception {
    
            Preconditions.checkNotNull(context);
    
            while (true) {
    
                if (tryRestoreFunction(context, userFunction)) {
                    break;
                }
    
                // inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function
                if (userFunction instanceof WrappingFunction) {
                    userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
                } else {
                    break;
                }
            }
        }
    
        private static boolean tryRestoreFunction(
                StateInitializationContext context,
                Function userFunction) throws Exception {
    
            if (userFunction instanceof CheckpointedFunction) {
                ((CheckpointedFunction) userFunction).initializeState(context);
    
                return true;
            }
    
            if (context.isRestored() && userFunction instanceof ListCheckpointed) {
                @SuppressWarnings("unchecked")
                ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
    
                ListState<Serializable> listState = context.getOperatorStateStore().
                        getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
    
                List<Serializable> list = new ArrayList<>();
    
                for (Serializable serializable : listState.get()) {
                    list.add(serializable);
                }
    
                try {
                    listCheckpointedFun.restoreState(list);
                } catch (Exception e) {
    
                    throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
                }
    
                return true;
            }
    
            return false;
        }
    
    • restoreFunctionState主要是调用了tryRestoreFunction方法,而该方法会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法

    小结

    • Task的run方法会触发invokable.invoke(),这里的invokable为StreamTask,StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource,它继承了AbstractUdfStreamOperator
    • StreamOperator接口定义了initializeState方法用于初始化operator的state,其抽象子类AbstractStreamOperator实现了initializeState方法,但是它内部会调用调用initializeState(initializationContext)方法,而其子类AbstractUdfStreamOperator对该方法进行了覆盖
    • AbstractUdfStreamOperator的initializeState(initializationContext)方法调用了StreamingFunctionUtils.restoreFunctionState,而后者会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法

    doc

    相关文章

      网友评论

        本文标题:聊聊flink StreamOperator的initializ

        本文链接:https://www.haomeiwen.com/subject/ajjqhqtx.html