美文网首页
从0到1成为Flink源码Contributor之自建版本Hel

从0到1成为Flink源码Contributor之自建版本Hel

作者: CodeRap | 来源:发表于2022-03-16 17:08 被阅读0次

    前置要求

    • Java技术体系
    • Junit单元测试
    • Maven依赖管理使用
    • Idea软件使用
    • Flink自建版本custom-test

    关于Flink版本自建可以参考上一篇文章

    新建flink-test项目测试自建Flink

    在 flink-in-depth 目录下新建一个 flink-test 目录,并添加 pom.xml 文件,添加自建Flink的custom-test版本依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <flink.version>custom-test</flink.version>
        <scala.version>2.12.7</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <calcite.version>1.26.0</calcite.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>
      
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <!--<scope>test</scope>-->
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
            <scope>provided</scope>
        </dependency>
      
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
      
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
      
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
      
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
    <dependencies>
    

    除了添加Flink依赖之外,我们还添加了junit单元测试、lombok快捷工具、scala语言包

    下面就开始我们的Flink自建版本本地Local MiniCluster的单元测试吧
    新建一个名为FlinkFrameworkTest的Java类,然后添加如下代码:

    private static final StreamExecutionEnvironment streamExecutionEnvironment;
      
    static {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
        configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
      
        streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
    }
      
    @Test
    public void testFlinkHelloWorld() throws Exception {
        DataStreamSource<String> lines = streamExecutionEnvironment.socketTextStream("localhost", 8080);
        lines.print();
      
        streamExecutionEnvironment.execute();
    }
    

    HelloWorld程序很简单,使用静态代码块创建Flink的流式执行环境StreamExecutionEnvironment,然后从本地的8080网络端口读取Socket链接数据并进行输出,最后是使用执行环境执行流式应用程序
    其中执行环境我们通过Configuration类配置了只有一个并行度,一个TaskManager,每个TaskManager只有一个Slot
    由于我们使用了带有Flink WebUI的流式执行环境,所以需要 flink-runtime-web 依赖

    由以上的HelloWorld可以看出,Flink的流式应用编程三步曲

    1) 创建流式执行环境StreamExecutionEnvironment
    2) 基于StreamExecutionEnvironment执行环境进行DataStream的算子操作(包括Source、Transformation、Sink)
    3) 使用StreamExecutionEnvironment执行环境执行程序
    

    本例子中只有Source与Sink,其实Source与Sink操作本质也是Transaformation操作,但更多与外部系统或集合交互

    接下来我们运行一下HelloWorld例子
    先通过nc程序打开8080网络端口

    nc -lk 8080
    
    image

    然后点击单元测试运行HelloWorld单元测试


    image

    可以看到我们的HelloWorld已经创建了本地Local MiniCluster集群并运行了起来

    通过浏览器找开 http://localhost:8081 页面(8081端口是Flink WebUI的默认端口)

    image
    从Flink WebUI概览图上,我们可以看出目前集群的配置是只有一个TaskManager,只有一个Slot,与我们代码里Configuration的配置一致,同时只有一个Job在运行,就是我们的HelloWorld程序,这个程序使用了默认的名称,叫做Flink Streaming Job

    我们点击 Flink Streaming Job 这个程序名称,就可以打开正在运行的HelloWorld的算子流图


    image

    从流图中我们可以看出目前有一个Souce为Socket Stream,一个Sink为Print to Std. Out,与我们的代码一致

    我们通过nc程序输入数据看一下效果


    image

    我们输入什么控制台上就输出什么

    以上就是我们使用自建Flink版本进行的HelloWorld流式程序测试

    相关文章

      网友评论

          本文标题:从0到1成为Flink源码Contributor之自建版本Hel

          本文链接:https://www.haomeiwen.com/subject/sibpdrtx.html