public static void main(String[] args) {
String jarFilePath = "D:\\Workspace\\JavaProjects\\FlinkJobs\\target\\FlinkJobs-1.0.1.jar";
RestClusterClient<StandaloneClusterId> client;
try {
//配置standalone集群信息
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "192.168.2.200");
config.setInteger(JobManagerOptions.PORT,6123);
config.setInteger(RestOptions.PORT,8081);
// config.setString(PipelineOptions.NAME,"Filter Adults Job");
client = new RestClusterClient<StandaloneClusterId>(config, StandaloneClusterId.getInstance());
//Job运行的配置
int parallelism = 1;
SavepointRestoreSettings savePoint = SavepointRestoreSettings.none();
//设置job的入口和参数
File jarFile = new File(jarFilePath);
PackagedProgram program = PackagedProgram
.newBuilder()
.setConfiguration(config)
.setJarFile(jarFile)
.setEntryPointClassName("com.quan.graph.VC_SSSP")
.setSavepointRestoreSettings(savePoint)
.build();
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, parallelism, false);
CompletableFuture<JobID> result = client.submitJob(jobGraph);
JobID jobId = result.get();
System.out.println("job: [" + jobId.toHexString() + "] 提交完成!");
System.out.println("job: [" + jobId.toHexString() + "] 是否执行完成:" + result.isDone());
System.out.println("job: [" + jobId.toHexString() + "] 是否异常结束:" + result.isCompletedExceptionally());
System.out.println("job: [" + jobId.toHexString() + "] 是否取消:" + result.isCancelled());
}catch (Exception e){
e.printStackTrace();
}
}
网友评论