美文网首页flink
flink从 Hello,World WordCount开始

flink从 Hello,World WordCount开始

作者: tracy_668 | 来源:发表于2020-12-22 08:14 被阅读0次

    [TOC]
    Flink是大数据处理领域最近很火的一个开源的分布式、高性能的流式处理框架,其对数据的处 理可以达到毫秒级别。本文以一个来自官网的WordCount例子为引,全面阐述flink的核心架 构及执行流程,希望读者可以借此更加深入的理解Flink逻辑。

    首先,我们把WordCount的例子再放一遍:

    public class Main {
        public static void main(String[] args) throws Exception {
            //创建流运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
    
            env.fromElements(WORDS)
                    .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                        @Override
                        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                            String[] splits = value.toLowerCase().split("\\W+");
    
                            for (String split : splits) {
                                if (split.length() > 0) {
                                    out.collect(new Tuple2<>(split, 1));
                                }
                            }
                        }
                    })
                    .keyBy(0)
                    .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                            return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                        }
                    })
                    .print();
    
            //Streaming 程序必须加这个才能启动程序,否则不会有结果
            env.execute("zhisheng —— word count streaming demo");
        }
    
    

    1.1 flink执行环境

    程序的启动,从这句开始。

    finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
    

    这行代码会返回一个可用的执行环境。执行环境是整个flink程序执行的上下文,记录了相关配置(如并行度等),并提供了一系列方法,如读取输入流的方法,以及真正开始运行整个代码的execute方法等。对于分布式流处理程序来说,我们在代码中定义的flatMap,keyBy等等操作,事实上可以理解为一种声明,告诉整个程序我们采用了什么样的算子,而真正开启计算的代码不在此处。由于我们是在本地运行flink程序,因此这行代码会返回一个LocalStreamEnvironment,最后我们要调用它的execute方法来开启真正的任务。我们先接着往下看。

    1.2 算子(Operator)的注册(声明)

    我们以flatMap为例,text.flatMap(new LineSplitter())这一句话跟踪进去是这样的

    image.png

    里面完成了两件事,一是用反射拿到了flatMap算子的输出类型,二是生成了一个Operator。flink流式计算的核心概念,就是将数据从输入流一个个传递给Operator进行链式处理,最后交给输出流的过程。对数据的每一次处理在逻辑上成为一个operator,并且为了本地化处理的效率起见,operator之间也可以串成一个chain一起处理(可以参考责任链模式帮助理解)。

    下面这张图表明了flink是如何看待用户的处理流程的:抽象化为一系列operator,以source开始,以sink结尾,中间的operator做的操作叫做transform,并且可以把几个操作串在一起执行。


    image.png

    我们也可以更改flink的设置,要求它不要对某个操作进行chain处理,或者从某个操作开启一个新chain等。 上面代码中的最后一行transform方法的作用是返回一个SingleOutputStreamOperator,它继承了Datastream类并且定义了一些辅助方法,方便对流的操作。在返回之前,transform方法还把它注册到了执行环境中(后面生成执行图的时候还会用到它)。其他的操作,包括keyBy,sum和print,都只是不同的算子,在这里出现都是一样的效果,即生成一个operator并注册给执行环境用于生成DAG。

    1.3 程序的执行

    程序执行即env.execute("Java WordCount from SocketTextStream Example")这行代码。

    1.3.1 本地模式下的execute方法

    这行代码主要做了以下事情:

    1. 生成StreamGraph。代表程序的拓扑结构,是从用户代码直接生成的图。
    2. 生成JobGraph。这个图是要交给flink去生成task的图。
    3. 生成一系列配置
    4. 将JobGraph和配置交给flink集群去运行。如果不是本地运行的话,还会把jar文件通过网络发给其他节点
    5. 以本地模式运行的话,可以看到启动过程,如启动性能度量、web模块、JobManager、ResourceManager、taskManager等等
    6. 启动任务。值得一提的是在启动任务之前,先启动了一个用户类加载器,这个类加载器可以用来做一些在运行时动态加载类的工作

    1.3.2 远程模式(RemoteEnvironment)的execute方法
    远程模式的程序执行更加有趣一点。第一步仍然是获取StreamGraph,然后调用executeRemotely方法进行远程执行。

    该方法首先创建一个用户代码加载器

    ClassLoader usercodeClassLoader =JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,   getClass().getClassLoader());
    

    然后创建一系列配置,交给Client对象。Client这个词有意思,看见它就知道这里绝对是跟远程集群打交道的客户端。

    image.png

    client的run方法首先生成一个JobGraph,然后将其传递给JobClient。关于Client、JobClient、JobManager到底谁管谁,可以看这张图:

    image.png

    确切的说,JobClient负责以异步的方式和JobManager通信(Actor是scala的异步模块),具体的通信任务由JobClientActor完成。相对应的,JobManager的通信任务也由一个Actor完成。

    image.png

    可以看到,该方法阻塞在awaitJobResult方法上,并最终返回了一个JobListeningContext,透过这个Context可以得到程序运行的状态和结果。

    1.3.3 程序启动过程
    上面提到,整个程序真正意义上开始执行,是这里:

    "Java WordCount from SocketTextStream Example"
    

    远程模式和本地模式有一点不同,我们先按本地模式来调试。

    我们跟进源码,(在本地调试模式下)会启动一个miniCluster,然后开始执行代码:

    image.png

    这个方法里有一部分逻辑是与生成图结构相关的,我们放在第二章里讲;现在我们先接着往里 跟:

    image.png

    正如我在注释里写的,这一段代码核心逻辑就是调用那个 submitJob 方法。那么我们再接着看 这个方法:

    image.png

    这里的 Dispatcher 是一个接收job,然后指派JobMaster去启动任务的类,我们可以看看它的 类结构,有两个实现。在本地环境下启动的是 MiniDispatcher ,在集群上提交任务时,集群 上启动的是 StandaloneDispatcher 。

    那么这个Dispatcher又做了什么呢?它启动了一个 JobManagerRunner (这里我要吐槽Flink的 命名,这个东西应该叫做JobMasterRunner才对,flink里的JobMaster和JobManager不是 一个东西),委托JobManagerRunner去启动该Job的 JobMaster 。我们看一下对应的代码:


    image.png

    然后,JobMaster经过了一堆方法嵌套之后,执行到了这里:

    image.png

    我们知道,flink的框架里有三层图结构,其中ExecutionGraph就是真正被执行的那一层,所 以到这里为止,一个任务从提交到真正执行的流程就走完了,我们再回顾一下(顺便提一下远 程提交时的流程区别):

    1. 客户端代码的execute方法执行;
    2. 本地环境下,MiniCluster完成了大部分任务,直接把任务委派给了MiniDispatcher;
    3. 远程环境下,启动了一个 RestClusterClient ,这个类会以HTTP Rest的方式把用户代码 提交到集群上
    4. 远程环境下,请求发到集群上之后,必然有个handler去处理,在这里是 JobSubmitHandler 。这个类接手了请求后,委派StandaloneDispatcher启动job,到 这里之后,本地提交和远程提交的逻辑往后又统一了;
    5. Dispatcher接手job之后,会实例化一个 JobManagerRunner ,然后用这个runner启动 job;
    6. JobManagerRunner接下来把job交给了 JobMaster 去处理;
    7. JobMaster使用 ExecutionGraph 的方法启动了整个执行图;整个任务就启动起来了。

    相关文章

      网友评论

        本文标题:flink从 Hello,World WordCount开始

        本文链接:https://www.haomeiwen.com/subject/qztjnktx.html