美文网首页flink简单使用教程
flink使用16-正确打包Flink程序并使用Cli提交任务

flink使用16-正确打包Flink程序并使用Cli提交任务

作者: CheckChe | 来源:发表于2019-11-15 11:44 被阅读0次

    本文的计划是使用正确的maven插件打包当前教程代码库batch模块下的WordCount代码,并通过命令行的方式提交到Flink来启动任务。WordCount类即为Flink主方法类,该部分代码是Flink官方example的简单修改,只是对map方法填加了一点sleep来方便观察运行情况。

    package.png

    项目的运行环境使用Docker来部署Flink, Flink镜像可以从Docker hub上拉去,其Docker-Compose文件如下:

    version: "2.1"
    services:
      jobmanager:
        image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
        expose:
          - "6123"
        ports:
          - "8081:8081"
        command: jobmanager
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
    
      taskmanager:
        image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
        expose:
          - "6121"
          - "6122"
        depends_on:
          - jobmanager
        command: taskmanager
        links:
          - "jobmanager:jobmanager"
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
    

    正确启动Flink之后,就可以在WebUI上看到我们的环境了。

    webUI.png

    下面就开始打包我们的应用程序了。

    官方推荐我们使用maven-shade-plugin插件,复制一下代码到POM中指定我们的主方法类即可。

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>my.programs</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    需要注意的是一般来说我们是不会将flink的一些相关的包直接打到项目里,通常有两种方案:

    • 将相关的jar包统一都放到flink/lib目录下

    • 构建一个单独的common模块,所以使用到的包都放在这个模块中打包并上传到集群,之后其他模块只需要引用该common模块即可

      具体的操作可以见这篇文章

    打包好后就可以直接是用 FLink Cli 提交到集群来开始job了 。

    Flink Cli 一般来讲主要作用有:提交并执行任务、取消任务、获取任务状态信息、列出正在运行和等待的任务、触发savepoint等。

    我们将已经打包好的jar包放到docker中

    docker cp /opt/flink/wordcount.jar flink_jobmanager_1:/opt/
    

    然后就可以通过命令行启动任务了,启动完成后我们可以在webUI上看到任务的执行情况。

    docker exec -ti flink_jobmanager_1 bash -c 'flink run /opt/wordcount.jar'
    
    runningJob.png

    Flink Cli 的命令有很多,具体的内容可以参考官网示例:

    Flink Cli Examples

    相关文章

      网友评论

        本文标题:flink使用16-正确打包Flink程序并使用Cli提交任务

        本文链接:https://www.haomeiwen.com/subject/mawjictx.html