一、构建JobGraph
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, flinkConfiguration, parallelism, false);
在这个入口中,构建program
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(runJarFile)
.setArguments(execArgs)
.setUserClassPaths(classPaths)
.setSavepointRestoreSettings(savepointRestoreSettings)
.build();
classPaths中需指定用户资源:
List<URL> classPaths = new ArrayList<URL>();
classPaths.add(new URL("file://D:\\tmp\\flink\\jar\\udf_test-1.0-SNAPSHOT.jar"));//jar包classpath为jar包本身
classPaths.add(new URL("file://D:\\tmp\\flink\\jar\\"));//非jar文件classpath为文件目录
二、添加文件
1、standalone模式
public static void fillDependFilesJobGraph(JobGraph jobGraph, String[] dependFiles) {
Arrays.stream(dependFiles).forEach(path -> jobGraph.addJar(new Path("file://" + path)));
}
将user资源通过jobGraph.addJar()方法添加到jobGraph中
2、yarn-cluster模式
YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor) YarnClusterClientFactory.INSTANCE
.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);
clusterDescriptor.addShipFiles(shipFiles);
通过addShipFiles()方法将用户资源添加到yarn所需的资源列表中
网友评论