启动任务和更新任务
public void start( DataprocessParam dataprocessParam, DataMap target, Pair<Boolean, String> ifWithSavePoint) {
Kafka kafka = dataprocessParam.getKafka();
TaskInfo taskInfo = dataprocessParam.getTaskInfo();
//配置集群信息
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration()
.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName())
.set(KubernetesConfigOptions.NAMESPACE, dataprocessParam.getNameSpace())
.set(KubernetesConfigOptions.CLUSTER_ID, "jobmanager-"+Instant.now().toEpochMilli())
.set(KubernetesConfigOptions.CONTAINER_IMAGE, dataprocessParam.getDataProcessImage())
.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP)
.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, KubernetesConfigOptions.ImagePullPolicy.IfNotPresent)
.set(DeploymentOptions.ATTACHED, false)
.set(PipelineOptions.JARS, Collections.singletonList(dataprocessParam.getDataProcessJar()))
.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true)
.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024M"))
.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024M"))
.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2);
//ifWithSavePoint.getFirst为true,为更新接口,即需要设置上次停止时保存的savepoint位置
if (ifWithSavePoint.getFirst()) {
flinkConfiguration.set(SavepointConfigOptions.SAVEPOINT_PATH, ifWithSavePoint.getSecond());
}
//自定义参数
String[] execArgs = new String[]{
"-kafkaBroker", StringUtils.join(kafka.getBroker().toArray(), ","),
"-kafkaSchemaTopic", kafka.getSchemaTopic(),
"-kafkaDataTopics", StringUtils.join(kafka.getDataTopics().toArray(), ","),
"-targetAddress", target.getAddress(),
"-targetPort", target.getPort(),
"-targetUserName", target.getDbUsername(),
"-targetPassWord", target.getDbPassword()
};
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(execArgs, null);
//构建flink集群
ClusterClient<String> clusterClient;
Iterator<JobStatusMessage> iterator;
try {
KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(flinkConfiguration, new Fabric8FlinkKubeClient(flinkConfiguration, new DefaultKubernetesClient(), () -> Executors.newFixedThreadPool(2)));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
clusterClient = clusterClientProvider.getClusterClient();
//获取jobid 先do一次,获取不到的话,持续do
do {
iterator = clusterClient.listJobs().get().iterator();
}
while (!iterator.hasNext());
} catch (Exception e) {
log.error("flink application 任务启动失败"+e);
throw APPLICATION_CLUSTER_FAILED;
}
JobID flinkJobId = iterator.next().getJobId();
String clusterId = clusterClient.getClusterId();
if (clusterId != null && flinkJobId != null) {
log.info("flink application 任务启动成功");
}
}
更新任务
//先暂停
CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(new JobID(bytes), true, dataProcessConfig.getSavepointPath());
String savepointPath = completableFuture.get();
//获得savepointPath之后可以再调用上面的staart方法更新任务
删除任务
Object acknowledge = jobMap.get(id).getFirst().cancel(new JobID(StringUtils.hexStringToByte(originalJob.getFlinkJobId()))).get();
//acknowledge为删除结果
关注作者公众号 HEY DATA,一起讨论更多
网友评论