美文网首页Flink
k8s application模式自动部署flink任务

k8s application模式自动部署flink任务

作者: 另存為 | 来源:发表于2021-08-23 18:21 被阅读0次

    启动任务和更新任务

    public void start( DataprocessParam dataprocessParam, DataMap target, Pair<Boolean, String> ifWithSavePoint) {
            Kafka kafka = dataprocessParam.getKafka();
            TaskInfo taskInfo = dataprocessParam.getTaskInfo();
            //配置集群信息
            Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration()
                    .set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName())
                    .set(KubernetesConfigOptions.NAMESPACE, dataprocessParam.getNameSpace())
                    .set(KubernetesConfigOptions.CLUSTER_ID, "jobmanager-"+Instant.now().toEpochMilli())
                    .set(KubernetesConfigOptions.CONTAINER_IMAGE, dataprocessParam.getDataProcessImage())
                    .set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP)
                    .set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, KubernetesConfigOptions.ImagePullPolicy.IfNotPresent)
                    .set(DeploymentOptions.ATTACHED, false)
                    .set(PipelineOptions.JARS, Collections.singletonList(dataprocessParam.getDataProcessJar()))
                    .set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true)
                    .set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024M"))
                    .set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024M"))
                    .set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2);
            //ifWithSavePoint.getFirst为true,为更新接口,即需要设置上次停止时保存的savepoint位置
            if (ifWithSavePoint.getFirst()) {
                flinkConfiguration.set(SavepointConfigOptions.SAVEPOINT_PATH, ifWithSavePoint.getSecond());
            }
            
            //自定义参数
            String[] execArgs = new String[]{
                    "-kafkaBroker", StringUtils.join(kafka.getBroker().toArray(), ","),
                    "-kafkaSchemaTopic", kafka.getSchemaTopic(),
                    "-kafkaDataTopics", StringUtils.join(kafka.getDataTopics().toArray(), ","),
                    "-targetAddress", target.getAddress(),
                    "-targetPort", target.getPort(),
                    "-targetUserName", target.getDbUsername(),
                    "-targetPassWord", target.getDbPassword()
            };
            
            ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(execArgs, null);
    
            //构建flink集群
            ClusterClient<String> clusterClient;
            Iterator<JobStatusMessage> iterator;
            try {
                KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(flinkConfiguration, new Fabric8FlinkKubeClient(flinkConfiguration, new DefaultKubernetesClient(), () -> Executors.newFixedThreadPool(2)));
                ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
                ClusterClientProvider<String> clusterClientProvider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
                clusterClient = clusterClientProvider.getClusterClient();
                //获取jobid 先do一次,获取不到的话,持续do
                do {
                    iterator = clusterClient.listJobs().get().iterator();
                }
                while (!iterator.hasNext());
            } catch (Exception e) {
                log.error("flink application 任务启动失败"+e);
                throw APPLICATION_CLUSTER_FAILED;
            }
            JobID flinkJobId = iterator.next().getJobId();
            String clusterId = clusterClient.getClusterId();
    
            if (clusterId != null && flinkJobId != null) {
                log.info("flink application 任务启动成功");
            }
        }
    

    更新任务

    //先暂停
    CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(new JobID(bytes), true, dataProcessConfig.getSavepointPath());
    String savepointPath = completableFuture.get();
    //获得savepointPath之后可以再调用上面的staart方法更新任务
    

    删除任务

    Object acknowledge = jobMap.get(id).getFirst().cancel(new JobID(StringUtils.hexStringToByte(originalJob.getFlinkJobId()))).get();
    //acknowledge为删除结果
    

    关注作者公众号 HEY DATA,一起讨论更多

    相关文章

      网友评论

        本文标题:k8s application模式自动部署flink任务

        本文链接:https://www.haomeiwen.com/subject/ejvliltx.html