美文网首页Flink大数据平台技术笔记
Flink运行jar时的programArgs设置问题

Flink运行jar时的programArgs设置问题

作者: OkGogogooo | 来源:发表于2022-04-13 16:19 被阅读0次

    1. 环境说明

    • flink 2.12(scala)-1.14.2
    • flink集群以session模式运行在YARN上

    2. programArgs设置方法

    调用Flink的REST接口运行指定的jar以运行Job。
    下面是关于运行jar包的API的官方说明

    /jars/:jarid/run
    Verb: POST ............... Response code: 200 OK
    Submits a job by running a jar previously uploaded via '/jars/upload'. Program arguments can be passed both via the JSON request (recommended) or query parameters.
    Path parameters
    - jarid - String value that identifies a jar. When uploading the jar a path is returned, where the filename is the ID. This value is equivalent to the id field in the list of uploaded jars (/jars).
    Query parameters
    - allowNonRestoredState (optional): Boolean value that specifies whether the job submission should be rejected if the savepoint contains state that cannot be mapped back to the job.
    - savepointPath (optional): String value that specifies the path of the savepoint to restore the job from.
    - program-args (optional): Deprecated, please use 'programArg' instead. String value that specifies the arguments for the program or plan
    - programArg (optional): Comma-separated list of program arguments.
    - entry-class (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the jar file manifest.
    - parallelism (optional): Positive integer value that specifies the desired parallelism for the job.
    • Request
    {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
      "properties" : {
        "allowNonRestoredState" : {
          "type" : "boolean"
        },
        "entryClass" : {
          "type" : "string"
        },
        "jobId" : {
          "type" : "any"
        },
        "parallelism" : {
          "type" : "integer"
        },
        "programArgs" : {
          "type" : "string"
        },
        "programArgsList" : {
          "type" : "array",
          "items" : {
            "type" : "string"
          }
        },
        "savepointPath" : {
          "type" : "string"
        }
      }
    }            
    
    • Response
    {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
      "properties" : {
        "jobid" : {
          "type" : "any"
        }
      }
    }  
    

    说明:

    • 文档中描述的"Query parameters"和Request请求体是两部分内容,前者是QueryParam参数,后者是HttpBody的格式。
    • 有3种方法设置programArgs:
      方法1:用programArg这个QueryParam参数设置。示例代码:
     getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
            .queryParam("entry-class" , "org.sailboat.flink.Test") 
            .queryParam("programArg" , "--paramA,valueA,--paramB,valueB,--paramC,valueC")
        ) ;
    

    方法2:在HttpBody中用programArgs属性设置。注意它的类型是“string”。示例代码:

     getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
            .queryParam("entry-class" , "org.sailboat.flink.Test") 
            .setJsonEntity(new JSONObject().put("programArgs",  "--paramA,valueA,--paramB,valueB,--paramC,valueC"))
             ) ;
    

    方法3:在HttpBody中用programArgsList属性设置。注意它的类型是“array”。示例代码:

    getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
            .queryParam("entry-class" , "org.sailboat.flink.Test") 
            .setJsonEntity(new JSONObject().put("programArgsList",  new JSONArray()
                .put("--paramA").put("valueA")
                .put("--paramB").put("valueB")
                .put("--paramC").put("valueC")))
             ) ;
    

    3. 从Flink客户端代码理解参数设置

    • org.apache.flink.client.cli.CliFrontend


      org.apache.flink.client.cli.CliFrontend.main()
      org.apache.flink.client.cli.CliFrontend.parseAndRun()
      org.apache.flink.client.cli.CliFrontend.runApplication()
    • org.apache.flink.client.cli.ProgramOptions


      org.apache.flink.client.cli.ProgramOptions.()
      org.apache.flink.client.cli.ProgramOptions.extractProgramArgs()

    4.Flink服务端相关类

    • org.apache.flink.runtime.webmonitor.handlers.JarRequestBody
    • org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils
    • org.apache.flink.runtime.webmonitor.handlers.JarRunHandler

    相关文章

      网友评论

        本文标题:Flink运行jar时的programArgs设置问题

        本文链接:https://www.haomeiwen.com/subject/nijasrtx.html