美文网首页
Flink远程提交job jar到standalone集群

Flink远程提交job jar到standalone集群

作者: 老羊_肖恩 | 来源:发表于2022-02-15 11:19 被阅读0次
        public static void main(String[] args) {
            String jarFilePath = "D:\\Workspace\\JavaProjects\\FlinkJobs\\target\\FlinkJobs-1.0.1.jar";
            RestClusterClient<StandaloneClusterId> client;
            try {
                //配置standalone集群信息
                Configuration config = new Configuration();
                config.setString(JobManagerOptions.ADDRESS, "192.168.2.200");
                config.setInteger(JobManagerOptions.PORT,6123);
                config.setInteger(RestOptions.PORT,8081);
    //            config.setString(PipelineOptions.NAME,"Filter Adults Job");
                client = new RestClusterClient<StandaloneClusterId>(config, StandaloneClusterId.getInstance());
    
                //Job运行的配置
                int parallelism = 1;
                SavepointRestoreSettings savePoint = SavepointRestoreSettings.none();
    
                //设置job的入口和参数
                File jarFile = new File(jarFilePath);
                PackagedProgram program = PackagedProgram
                        .newBuilder()
                        .setConfiguration(config)
                        .setJarFile(jarFile)
                        .setEntryPointClassName("com.quan.graph.VC_SSSP")
                        .setSavepointRestoreSettings(savePoint)
                        .build();
    
                JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, parallelism, false);
                CompletableFuture<JobID> result = client.submitJob(jobGraph);
                JobID jobId = result.get();
                System.out.println("job: [" + jobId.toHexString() + "] 提交完成!");
                System.out.println("job: [" + jobId.toHexString() + "] 是否执行完成:" + result.isDone());
                System.out.println("job: [" + jobId.toHexString() + "] 是否异常结束:" + result.isCompletedExceptionally());
                System.out.println("job: [" + jobId.toHexString() + "] 是否取消:" + result.isCancelled());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    

    相关文章

      网友评论

          本文标题:Flink远程提交job jar到standalone集群

          本文链接:https://www.haomeiwen.com/subject/zrnklrtx.html