美文网首页
聊聊flink的SpoutWrapper

聊聊flink的SpoutWrapper

作者: go4it | 来源:发表于2018-11-24 21:56 被阅读9次

    本文主要研究一下flink的SpoutWrapper

    SpoutWrapper

    flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutWrapper.java

    /**
     * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
     * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
     * {@link SpoutCollector} for supported types).<br>
     * <br>
     * Per default, {@link SpoutWrapper} calls the wrapped spout's {@link IRichSpout#nextTuple() nextTuple()} method in
     * an infinite loop.<br>
     * Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() nextTuple()} for a finite number of
     * times and terminate automatically afterwards (for finite input streams). The number of {@code nextTuple()} calls can
     * be specified as a certain number of invocations or can be undefined. In the undefined case, {@link SpoutWrapper}
     * terminates if no record was emitted to the output collector for the first time during a call to
     * {@link IRichSpout#nextTuple() nextTuple()}.<br>
     * If the given spout implements {@link FiniteSpout} interface and {@link #numberOfInvocations} is not provided or
     * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until
     * {@link FiniteSpout#reachedEnd()} returns true.
     */
    public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction {
        //......
    
        /** The number of {@link IRichSpout#nextTuple()} calls. */
        private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop
    
        /**
         * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
         * the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to
         * {@link Tuple25} depending on the spout's declared number of attributes.
         *
         * @param spout
         *            The {@link IRichSpout spout} to be used.
         * @param numberOfInvocations
         *            The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper}
         *            terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is
         *            disabled.
         * @throws IllegalArgumentException
         *             If the number of declared output attributes is not with range [0;25].
         */
        public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations)
                throws IllegalArgumentException {
            this(spout, (Collection<String>) null, numberOfInvocations);
        }
    
        /**
         * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
         * the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to
         * {@link Tuple25} depending on the spout's declared number of attributes.
         *
         * @param spout
         *            The {@link IRichSpout spout} to be used.
         * @throws IllegalArgumentException
         *             If the number of declared output attributes is not with range [0;25].
         */
        public SpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
            this(spout, (Collection<String>) null, null);
        }
    
        @Override
        public final void run(final SourceContext<OUT> ctx) throws Exception {
            final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig()
                    .getGlobalJobParameters();
            StormConfig stormConfig = new StormConfig();
    
            if (config != null) {
                if (config instanceof StormConfig) {
                    stormConfig = (StormConfig) config;
                } else {
                    stormConfig.putAll(config.toMap());
                }
            }
    
            final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext(
                    (StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name,
                    this.stormTopology, stormConfig);
    
            SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes,
                    stormTopologyContext.getThisTaskId(), ctx);
    
            this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector));
            this.spout.activate();
    
            if (numberOfInvocations == null) {
                if (this.spout instanceof FiniteSpout) {
                    final FiniteSpout finiteSpout = (FiniteSpout) this.spout;
    
                    while (this.isRunning && !finiteSpout.reachedEnd()) {
                        finiteSpout.nextTuple();
                    }
                } else {
                    while (this.isRunning) {
                        this.spout.nextTuple();
                    }
                }
            } else {
                int counter = this.numberOfInvocations;
                if (counter >= 0) {
                    while ((--counter >= 0) && this.isRunning) {
                        this.spout.nextTuple();
                    }
                } else {
                    do {
                        collector.tupleEmitted = false;
                        this.spout.nextTuple();
                    } while (collector.tupleEmitted && this.isRunning);
                }
            }
        }
    
        /**
         * {@inheritDoc}
         *
         * <p>Sets the {@link #isRunning} flag to {@code false}.
         */
        @Override
        public void cancel() {
            this.isRunning = false;
        }
    
        /**
         * {@inheritDoc}
         *
         * <p>Sets the {@link #isRunning} flag to {@code false}.
         */
        @Override
        public void stop() {
            this.isRunning = false;
        }
    
        @Override
        public void close() throws Exception {
            this.spout.close();
        }
    }
    
    • SpoutWrapper继承了RichParallelSourceFunction类,实现了StoppableFunction接口的stop方法
    • SpoutWrapper的run方法创建了flink的SpoutCollector作为storm的SpoutOutputCollector的构造器参数,之后调用spout的open方法,把包装了SpoutCollector(flink)的SpoutOutputCollector传递给spout,用来收集spout发射的数据
    • 之后就是根据numberOfInvocations参数来调用spout.nextTuple()方法来发射数据;numberOfInvocations是控制调用spout的nextTuple的次数,它可以在创建SpoutWrapper的时候在构造器中设置,如果使用没有numberOfInvocations参数的构造器,则该值为null,表示infinite loop
    • flink对storm的spout有进行封装,提供了FiniteSpout接口,它有个reachedEnd接口用来判断数据是否发送完毕,来将storm的spout改造为finite模式;这里如果使用的是storm原始的spout,则就是一直循环调用nextTuple方法
    • 如果有设置numberOfInvocations而且大于等于0,则根据指定的次数来调用nextTuple方法;如果该值小于0,则根据collector.tupleEmitted值来判断是否终止循环

    SpoutCollector

    flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutCollector.java

    /**
     * A {@link SpoutCollector} is used by {@link SpoutWrapper} to provided an Storm
     * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
     * Flink tuples and emits them via the provide {@link SourceContext} object.
     */
    class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {
    
        /** The Flink source context object. */
        private final SourceContext<OUT> flinkContext;
    
        /**
         * Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the
         * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
         * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
         *
         * @param numberOfAttributes
         *            The number of attributes of the emitted tuples.
         * @param taskId
         *            The ID of the producer task (negative value for unknown).
         * @param flinkContext
         *            The Flink source context to be used.
         * @throws UnsupportedOperationException
         *             if the specified number of attributes is greater than 25
         */
        SpoutCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId,
                final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
            super(numberOfAttributes, taskId);
            assert (flinkContext != null);
            this.flinkContext = flinkContext;
        }
    
        @Override
        protected List<Integer> doEmit(final OUT flinkTuple) {
            this.flinkContext.collect(flinkTuple);
            // TODO
            return null;
        }
    
        @Override
        public void reportError(final Throwable error) {
            // not sure, if Flink can support this
        }
    
        @Override
        public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
            return this.tansformAndEmit(streamId, tuple);
        }
    
        @Override
        public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
            throw new UnsupportedOperationException("Direct emit is not supported by Flink");
        }
    
        public long getPendingCount() {
            return 0;
        }
    
    }
    
    • SpoutCollector实现了storm的ISpoutOutputCollector接口,实现了该接口定义的emit、emitDirect、getPendingCount、reportError方法;flink目前不支持emitDirect方法,另外getPendingCount也始终返回0,reportError方法是个空操作
    • doEmit里头调用flinkContext.collect(flinkTuple)来发射数据,该方法为protected,主要是给tansformAndEmit调用的
    • tansformAndEmit方法由父类AbstractStormCollector提供

    AbstractStormCollector.tansformAndEmit

    flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.java

        /**
         * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
         * to the specified output stream.
         *
         * @param The
         *            The output stream id.
         * @param tuple
         *            The Storm tuple to be emitted.
         * @return the return value of {@link #doEmit(Object)}
         */
        @SuppressWarnings("unchecked")
        protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
            List<Integer> taskIds;
    
            int numAtt = this.numberOfAttributes.get(streamId);
            int taskIdIdx = numAtt;
            if (this.taskId >= 0 && numAtt < 0) {
                numAtt = 1;
                taskIdIdx = 0;
            }
            if (numAtt >= 0) {
                assert (tuple.size() == numAtt);
                Tuple out = this.outputTuple.get(streamId);
                for (int i = 0; i < numAtt; ++i) {
                    out.setField(tuple.get(i), i);
                }
                if (this.taskId >= 0) {
                    out.setField(this.taskId, taskIdIdx);
                }
                if (this.split) {
                    this.splitTuple.streamId = streamId;
                    this.splitTuple.value = out;
    
                    taskIds = doEmit((OUT) this.splitTuple);
                } else {
                    taskIds = doEmit((OUT) out);
                }
    
            } else {
                assert (tuple.size() == 1);
                if (this.split) {
                    this.splitTuple.streamId = streamId;
                    this.splitTuple.value = tuple.get(0);
    
                    taskIds = doEmit((OUT) this.splitTuple);
                } else {
                    taskIds = doEmit((OUT) tuple.get(0));
                }
            }
            this.tupleEmitted = true;
    
            return taskIds;
        }
    
    • AbstractStormCollector.tansformAndEmit,这里主要处理了split的场景,即一个spout declare了多个stream,最后都通过子类SpoutCollector.doEmit来发射数据
    • 如果split为true,则传给doEmit方法的是splitTuple,即SplitStreamType,它记录了streamId及其value
    • 如果split为false,则传给doEmit方法的是Tuple类型,即相当于SplitStreamType中的value,相比于SplitStreamType少了streamId信息

    Task.run

    flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

    /**
     * The Task represents one execution of a parallel subtask on a TaskManager.
     * A Task wraps a Flink operator (which may be a user function) and
     * runs it, providing all services necessary for example to consume input data,
     * produce its results (intermediate result partitions) and communicate
     * with the JobManager.
     *
     * <p>The Flink operators (implemented as subclasses of
     * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
     * The task connects those to the network stack and actor messages, and tracks the state
     * of the execution and handles exceptions.
     *
     * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
     * are the first attempt to execute the task, or a repeated attempt. All of that
     * is only known to the JobManager. All the task knows are its own runnable code,
     * the task's configuration, and the IDs of the intermediate results to consume and
     * produce (if any).
     *
     * <p>Each Task is run by one dedicated thread.
     */
    public class Task implements Runnable, TaskActions, CheckpointListener {
        //......
    
        /**
         * The core work method that bootstraps the task and executes its code.
         */
        @Override
        public void run() {
                //......
                // now load and instantiate the task's invokable code
                invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
    
                // ----------------------------------------------------------------
                //  actual task core work
                // ----------------------------------------------------------------
    
                // we must make strictly sure that the invokable is accessible to the cancel() call
                // by the time we switched to running.
                this.invokable = invokable;
    
                // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
                if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                    throw new CancelTaskException();
                }
    
                // notify everyone that we switched to running
                notifyObservers(ExecutionState.RUNNING, null);
                taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
    
                // make sure the user code classloader is accessible thread-locally
                executingThread.setContextClassLoader(userCodeClassLoader);
    
                // run the invokable
                invokable.invoke();
    
                //......
        }
    }
    
    • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

    StreamTask

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

    /**
     * Base class for all streaming tasks. A task is the unit of local processing that is deployed
     * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
     * the Task's operator chain. Operators that are chained together execute synchronously in the
     * same thread and hence on the same stream partition. A common case for these chains
     * are successive map/flatmap/filter tasks.
     *
     * <p>The task chain contains one "head" operator and multiple chained operators.
     * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
     * as well as for sources, iteration heads and iteration tails.
     *
     * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
     * produced by the operators at the ends of the operator chain. Note that the chain may fork and
     * thus have multiple ends.
     *
     * <p>The life cycle of the task is set up as follows:
     * <pre>{@code
     *  -- setInitialState -> provides state of all operators in the chain
     *
     *  -- invoke()
     *        |
     *        +----> Create basic utils (config, etc) and load the chain of operators
     *        +----> operators.setup()
     *        +----> task specific init()
     *        +----> initialize-operator-states()
     *        +----> open-operators()
     *        +----> run()
     *        +----> close-operators()
     *        +----> dispose-operators()
     *        +----> common cleanup
     *        +----> task specific cleanup()
     * }</pre>
     *
     * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
     * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
     * are called concurrently.
     *
     * @param <OUT>
     * @param <OP>
     */
    @Internal
    public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
            extends AbstractInvokable
            implements AsyncExceptionHandler {
    
            //......
    
        @Override
        public final void invoke() throws Exception {
    
            boolean disposed = false;
            try {
                //......
    
                // let the task do its work
                isRunning = true;
                run();
    
                // if this left the run() method cleanly despite the fact that this was canceled,
                // make sure the "clean shutdown" is not attempted
                if (canceled) {
                    throw new CancelTaskException();
                }
    
                LOG.debug("Finished task {}", getName());
    
                //......
            }
            finally {
                // clean up everything we initialized
                isRunning = false;
    
                //......
            }
        }
    }
    
    • StreamTask的invoke方法里头调用子类的run方法,这里子类为StoppableSourceStreamTask

    StoppableSourceStreamTask

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java

    /**
     * Stoppable task for executing stoppable streaming sources.
     *
     * @param <OUT> Type of the produced elements
     * @param <SRC> Stoppable source function
     */
    public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
        extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {
    
        private volatile boolean stopped;
    
        public StoppableSourceStreamTask(Environment environment) {
            super(environment);
        }
    
        @Override
        protected void run() throws Exception {
            if (!stopped) {
                super.run();
            }
        }
    
        @Override
        public void stop() {
            stopped = true;
            if (this.headOperator != null) {
                this.headOperator.stop();
            }
        }
    }
    
    • StoppableSourceStreamTask继承了SourceStreamTask,主要是实现了StoppableTask的stop方法,它的run方法由其直接父类SourceStreamTask来实现

    SourceStreamTask

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

    /**
     * {@link StreamTask} for executing a {@link StreamSource}.
     *
     * <p>One important aspect of this is that the checkpointing and the emission of elements must never
     * occur at the same time. The execution must be serial. This is achieved by having the contract
     * with the StreamFunction that it must only modify its state or emit elements in
     * a synchronized block that locks on the lock Object. Also, the modification of the state
     * and the emission of elements must happen in the same block of code that is protected by the
     * synchronized block.
     *
     * @param <OUT> Type of the output elements of this source.
     * @param <SRC> Type of the source function for the stream source operator
     * @param <OP> Type of the stream source operator
     */
    @Internal
    public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
        extends StreamTask<OUT, OP> {
    
        //......
    
        @Override
        protected void run() throws Exception {
            headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
        }
    }
    
    • SourceStreamTask主要是调用StreamSource的run方法

    StreamSource

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

    /**
     * {@link StreamOperator} for streaming sources.
     *
     * @param <OUT> Type of the output elements
     * @param <SRC> Type of the source function of this stream source operator
     */
    @Internal
    public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
            extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
    
        //......    
    
        public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
            run(lockingObject, streamStatusMaintainer, output);
        }
    
        public void run(final Object lockingObject,
                final StreamStatusMaintainer streamStatusMaintainer,
                final Output<StreamRecord<OUT>> collector) throws Exception {
    
            final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
    
            final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
            final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
                ? getExecutionConfig().getLatencyTrackingInterval()
                : configuration.getLong(MetricOptions.LATENCY_INTERVAL);
    
            LatencyMarksEmitter<OUT> latencyEmitter = null;
            if (latencyTrackingInterval > 0) {
                latencyEmitter = new LatencyMarksEmitter<>(
                    getProcessingTimeService(),
                    collector,
                    latencyTrackingInterval,
                    this.getOperatorID(),
                    getRuntimeContext().getIndexOfThisSubtask());
            }
    
            final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
    
            this.ctx = StreamSourceContexts.getSourceContext(
                timeCharacteristic,
                getProcessingTimeService(),
                lockingObject,
                streamStatusMaintainer,
                collector,
                watermarkInterval,
                -1);
    
            try {
                userFunction.run(ctx);
    
                // if we get here, then the user function either exited after being done (finite source)
                // or the function was canceled or stopped. For the finite source case, we should emit
                // a final watermark that indicates that we reached the end of event-time
                if (!isCanceledOrStopped()) {
                    ctx.emitWatermark(Watermark.MAX_WATERMARK);
                }
            } finally {
                // make sure that the context is closed in any case
                ctx.close();
                if (latencyEmitter != null) {
                    latencyEmitter.close();
                }
            }
        }
    
    • 它调用了userFunction.run(ctx),这里的userFunction为SpoutWrapper,从而完成spout的nextTuple的触发

    小结

    • flink使用SpoutWrapper来包装storm原始的spout,它在run方法里头创建了flink的SpoutCollector作为storm的SpoutOutputCollector的构造器参数,之后调用spout的open方法,把包装了SpoutCollector(flink)的SpoutOutputCollector传递给spout,用来收集spout发射的数据;之后就是根据numberOfInvocations参数来调用spout.nextTuple()方法来发射数据;numberOfInvocations是控制调用spout的nextTuple的次数,它可以在创建SpoutWrapper的时候在构造器中设置,如果使用没有numberOfInvocations参数的构造器,则该值为null,表示infinite loop
    • SpoutCollector的emit方法内部调用了AbstractStormCollector.tansformAndEmit(它最后调用SpoutCollector.doEmit方法来发射),针对多个stream的场景,封装了SplitStreamType的tuple给到doEmit方法;如果只有一个stream,则仅仅将普通的tuple传给doEmit方法
    • flink的Task的run方法会调用StreamTask的invoke方法,而StreamTask的invoke方法会调用子类(这里子类为StoppableSourceStreamTask)的run方法,StoppableSourceStreamTask的run方法是直接父类SourceStreamTask来实现的,而它主要是调用了StreamSource的run方法,而StreamSource的run方法调用了userFunction.run(ctx),这里的userFunction为SpoutWrapper,从而执行spout的nextTuple的逻辑,通过flink的SpoutCollector进行发射

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的SpoutWrapper

          本文链接:https://www.haomeiwen.com/subject/fpfuqqtx.html