美文网首页
Flink 源码笔记 — execute 后发生了什么?

Flink 源码笔记 — execute 后发生了什么?

作者: 飞不高的老鸟 | 来源:发表于2020-01-06 18:15 被阅读0次

    前言

        我们知道,Flink 程序的执行是在我们调用 env.execute() 后才会真正开始。事实上,我们在编写业务代码时,是在勾勒程序执行的 DAG 图,当调用 execute 时这些逻辑才会开始工作,那么在我们调用 execute 函数后到底发生了什么?

    执行流程

    构造拓扑图
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
            // 从 kafka 获取数据
            String brokers;
            String groupId;
            String topic;
    
            ParameterTool param = ParameterTool.fromArgs(args);
            if (param.getNumberOfParameters() > 0){
                brokers = param.get("brokers");
                groupId = param.get("groupId");
                topic   = param.get("topic");
            } else {
                brokers = "";
                groupId = "";
                topic = "";
            }
    
            // 消费 kafka,接入数据源
            DataStream<String> dataStream = env.addSource(KafkaConsumer.consumer(brokers, groupId, topic));
    
            SingleOutputStreamOperator<ActionStat> userStat = dataStream.map(new MyMap())
                    .filter(user -> (user.userId != null && user.articleId != null && "AppClick".equals(user.action)))
                    .keyBy("userId")
                    .timeWindow(Time.milliseconds(5000))
                    .aggregate(new AggDiY());
    
            userStat.print();
            
            env.execute("filnk-test");
    

        首先,必须先要初始化我们的执行环境,这里本地调试通常会生成 LocalStreamEnvironment 环境,而生产上通常是 RemoteStreamEnvironment 环境,这是 StreamExecutionEnvironment 的两个子类:

    public class LocalStreamEnvironment extends StreamExecutionEnvironment {...}
    public class RemoteStreamEnvironment extends StreamExecutionEnvironment {...}
    

        接下来我们在本地跟一下代码的执行流程:

    public JobExecutionResult execute(String jobName) throws Exception {
            // transform the streaming program into a JobGraph
            StreamGraph streamGraph = getStreamGraph();
            streamGraph.setJobName(jobName);
    
            JobGraph jobGraph = streamGraph.getJobGraph();
            jobGraph.setAllowQueuedScheduling(true);
    
            Configuration configuration = new Configuration();
            configuration.addAll(jobGraph.getJobConfiguration());
            configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
    
            // add (and override) the settings with what the user defined
            configuration.addAll(this.configuration);
    
            if (!configuration.contains(RestOptions.BIND_PORT)) {
                configuration.setString(RestOptions.BIND_PORT, "0");
            }
    
            int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
    
            MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
                .setConfiguration(configuration)
                .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
                .build();
    
            if (LOG.isInfoEnabled()) {
                LOG.info("Running job on local embedded Flink mini cluster");
            }
    
            MiniCluster miniCluster = new MiniCluster(cfg);
    
            try {
                miniCluster.start();
                configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
    
                return miniCluster.executeJobBlocking(jobGraph);
            }
            finally {
                transformations.clear();
                miniCluster.close();
            }
        }
    

        当我们调用 execute 开始执行程序时,会先生成 StreamGraph,此时 StreamExecutionEnvironment 会在内部初始化一个 transformations 用来存放每一步的操作:

    // 创建 transformations
    protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
    
    // 遍历 transformations,开始生成 streamGraph
    public StreamGraph getStreamGraph() {
            if (transformations.size() <= 0) {
                throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
            }
            return StreamGraphGenerator.generate(this, transformations);
        }
    
    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
            for (StreamTransformation<?> transformation: transformations) {
                transform(transformation);
            }
            return streamGraph;
        }
    
    private Collection<Integer> transform(StreamTransformation<?> transform) {
    
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            LOG.debug("Transforming " + transform);
    
            if (transform.getMaxParallelism() <= 0) {
    
                // if the max parallelism hasn't been set, then first use the job wide max parallelism
                // from the ExecutionConfig.
                int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
                if (globalMaxParallelismFromConfig > 0) {
                    transform.setMaxParallelism(globalMaxParallelismFromConfig);
                }
            }
    
            // call at least once to trigger exceptions about MissingTypeInfo
            transform.getOutputType();
    
            Collection<Integer> transformedIds;
            if (transform instanceof OneInputTransformation<?, ?>) {
                transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
            } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
                transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
            } else if (transform instanceof SourceTransformation<?>) {
                transformedIds = transformSource((SourceTransformation<?>) transform);
            } else if (transform instanceof SinkTransformation<?>) {
                transformedIds = transformSink((SinkTransformation<?>) transform);
            } else if (transform instanceof UnionTransformation<?>) {
                transformedIds = transformUnion((UnionTransformation<?>) transform);
            } else if (transform instanceof SplitTransformation<?>) {
                transformedIds = transformSplit((SplitTransformation<?>) transform);
            } else if (transform instanceof SelectTransformation<?>) {
                transformedIds = transformSelect((SelectTransformation<?>) transform);
            } else if (transform instanceof FeedbackTransformation<?>) {
                transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
            } else if (transform instanceof CoFeedbackTransformation<?>) {
                transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
            } else if (transform instanceof PartitionTransformation<?>) {
                transformedIds = transformPartition((PartitionTransformation<?>) transform);
            } else if (transform instanceof SideOutputTransformation<?>) {
                transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
            } else {
                throw new IllegalStateException("Unknown transformation: " + transform);
            }
    
            // need this check because the iterate transformation adds itself before
            // transforming the feedback edges
            if (!alreadyTransformed.containsKey(transform)) {
                alreadyTransformed.put(transform, transformedIds);
            }
    
            if (transform.getBufferTimeout() >= 0) {
                streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
            }
            if (transform.getUid() != null) {
                streamGraph.setTransformationUID(transform.getId(), transform.getUid());
            }
            if (transform.getUserProvidedNodeHash() != null) {
                streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
            }
    
            if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
                streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
            }
    
            return transformedIds;
        }
    

         在遍历 List<StreamTransformation> 生成 StreamGraph 的时候,会递归调用 StreamGraphGenerator 的 transform 方法。StreamTransformations 被转换为 StreamGraph 中的节点 StreamNode,并为上下游节点添加边 StreamEdge。这里会针对不同的 transformation 有不同的处理,我们以第一项 OneInputTransformation 为例来说明下游执行流程:

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
    
            Collection<Integer> inputIds = transform(transform.getInput());
    
            // the recursive call might have already transformed this
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    
            streamGraph.addOperator(transform.getId(),
                    slotSharingGroup,
                    transform.getCoLocationGroupKey(),
                    transform.getOperator(),
                    transform.getInputType(),
                    transform.getOutputType(),
                    transform.getName());
    
            if (transform.getStateKeySelector() != null) {
                TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
                streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
            }
    
            streamGraph.setParallelism(transform.getId(), transform.getParallelism());
            streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
    
            for (Integer inputId: inputIds) {
                streamGraph.addEdge(inputId, transform.getId(), 0);
            }
    
            return Collections.singleton(transform.getId());
        }
    

         这里执行包括以下几个步骤:

    • 执行 transform, 保证上游的转换已经结束。
    • 执行 determineSlotSharingGroup ,确定资源共享组,如果没有指定,则默认的是 default。
    • 执行 addOperator ,向 StreamGraph 中添加 Operator 操作, 这里会生成相应的 StreamNode。
    protected StreamNode addNode(Integer vertexID,
            String slotSharingGroup,
            @Nullable String coLocationGroup,
            Class<? extends AbstractInvokable> vertexClass,
            StreamOperator<?> operatorObject,
            String operatorName) {
    
            if (streamNodes.containsKey(vertexID)) {
                throw new RuntimeException("Duplicate vertexID " + vertexID);
            }
    
            StreamNode vertex = new StreamNode(environment,
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                operatorObject,
                operatorName,
                new ArrayList<OutputSelector<?>>(),
                vertexClass);
    
            streamNodes.put(vertexID, vertex);
    
            return vertex;
        }
    
    • 执行 addEdge ,将上下游 StreamNode 关联起来,生成 StreamEdge 。
    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
            addEdgeInternal(upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    null,
                    new ArrayList<String>(),
                    null);
    
        }
    
        private void addEdgeInternal(Integer upStreamVertexID,
                Integer downStreamVertexID,
                int typeNumber,
                StreamPartitioner<?> partitioner,
                List<String> outputNames,
                OutputTag outputTag) {
    
            if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
                if (outputTag == null) {
                    outputTag = virtualSideOutputNodes.get(virtualId).f1;
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
            } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
                if (outputNames.isEmpty()) {
                    // selections that happen downstream override earlier selections
                    outputNames = virtualSelectNodes.get(virtualId).f1;
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
            } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
                int virtualId = upStreamVertexID;
                upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
                if (partitioner == null) {
                    partitioner = virtualPartitionNodes.get(virtualId).f1;
                }
                addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
            } else {
                StreamNode upstreamNode = getStreamNode(upStreamVertexID);
                StreamNode downstreamNode = getStreamNode(downStreamVertexID);
    
                // If no partitioner was specified and the parallelism of upstream and downstream
                // operator matches use forward partitioning, use rebalance otherwise.
                if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                    partitioner = new ForwardPartitioner<Object>();
                } else if (partitioner == null) {
                    partitioner = new RebalancePartitioner<Object>();
                }
    
                if (partitioner instanceof ForwardPartitioner) {
                    if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                        throw new UnsupportedOperationException("Forward partitioning does not allow " +
                                "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                                ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                                " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                    }
                }
    
                StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
    
                getStreamNode(edge.getSourceId()).addOutEdge(edge);
                getStreamNode(edge.getTargetId()).addInEdge(edge);
            }
        }
    

         通过上面的操作,Flink 初步勾勒出了拓扑图。

    StreamGraph 转化为 JobGraph

         在 execute 函数中我们看到 JobGraph jobGraph = streamGraph.getJobGraph() ,这里会根据 StreamGraph 生成相应的 JobGraph,核心代码:

    private JobGraph createJobGraph() {
    
            // make sure that all vertices start immediately
            jobGraph.setScheduleMode(ScheduleMode.EAGER);
    
            // Generate deterministic hashes for the nodes in order to identify them across
            // submission iff they didn't change.
            Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    
            // Generate legacy version hashes for backwards compatibility
            List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
            for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
                legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
            }
    
            Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
    
            setChaining(hashes, legacyHashes, chainedOperatorHashes);
    
            setPhysicalEdges();
    
            setSlotSharingAndCoLocation();
    
            configureCheckpointing();
    
            JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);
    
            // set the ExecutionConfig last when it has been finalized
            try {
                jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
            }
            catch (IOException e) {
                throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                        "This indicates that non-serializable types (like custom serializers) were registered");
            }
    
            return jobGraph;
        }
    
    • 执行 setScheduleMode, 确保所有的 vertices 都立即执行。
    • 执行 traverseStreamGraphAndGenerateHashes , 生成对应的 hash 值。
    • 执行 setExecutionConfig,配置初始化过的 ExecutionConfig。
    执行 jobGraph
    miniCluster.executeJobBlocking(jobGraph);
    
    public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
            checkNotNull(job, "job is null");
    
            final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
    
            final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
                (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
    
            final JobResult jobResult;
    
            try {
                jobResult = jobResultFuture.get();
            } catch (ExecutionException e) {
                throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
            }
    
            try {
                return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(job.getJobID(), e);
            }
        }
    

         通过上面代码生成最终的 JobExecutionResult ,整个执行流程就结束了。

    小结

        通过对源码进行简单的分析,对我们的 Flink Job 有一个整体的执行概念,帮助我们进一步理解 Flink 程序的工作流程。

    相关文章

      网友评论

          本文标题:Flink 源码笔记 — execute 后发生了什么?

          本文链接:https://www.haomeiwen.com/subject/nagkactx.html