美文网首页Flink精选学习
【Flink on k8s】高可用的关键机制及configmap

【Flink on k8s】高可用的关键机制及configmap

作者: 熊本极客 | 来源:发表于2022-05-25 22:52 被阅读0次

    1.高可用的关键机制

    源码详解:DefaultCompletedCheckpointStore.addCheckpoint/tryRemoveCompletedCheckpoint
    步骤 1:根据checkpointID获取checkpoint path
    步骤 2:在s3 path写state数据,接着修改configmap的中checkpoint信息即flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader的checkpointID-0000000000000102688
    步骤 3:把checkpoint信息放到队列里面,然后根据需要保留的completecheckpoint数量(集群配置state.checkpoints.num-retained),删除多余的completecheckpoint

    public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
            implements CompletedCheckpointStore {
        
        // 主要是缓存completedCheckpoints的路径
        private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
    
        @Override
        public void addCheckpoint(
                final CompletedCheckpoint checkpoint,
                CheckpointsCleaner checkpointsCleaner,
                Runnable postCleanup)
                throws Exception {
            // 省略...
    
            // 1.首先根据checkpointID获取checkpoint path
            final String path = completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());
            // 2.然后在s3 path写state数据,接着修改configmap的中checkpoint信息
            checkpointStateHandleStore.addAndLock(path, checkpoint);
            
            // 3.最后把checkpoint信息放到队列里面,然后根据需要保留的completecheckpoint数量
            completedCheckpoints.addLast(checkpoint);
            CheckpointSubsumeHelper.subsume(
                    completedCheckpoints,
                    maxNumberOfCheckpointsToRetain,
                    completedCheckpoint ->
                            tryRemoveCompletedCheckpoint(
                                    completedCheckpoint,
                                    completedCheckpoint.shouldBeDiscardedOnSubsume(),
                                    checkpointsCleaner,
                                    postCleanup));
            // 省略...
        }
    
        
    
        private void tryRemoveCompletedCheckpoint(
                CompletedCheckpoint completedCheckpoint,
                boolean shouldDiscard,
                CheckpointsCleaner checkpointsCleaner,
                Runnable postCleanup)
                throws Exception {
            if (tryRemove(completedCheckpoint.getCheckpointID())) {
                checkpointsCleaner.cleanCheckpoint(
                        completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
            }
        }
    }
    
    

    2.高可用数据详解

    2.1 高可用配置

    ① 采用 s3 作为状态后端

    设置 s3 协议的文件路径作为状态后端即 s3://bucket01/flink/savepointss3://bucket01/flink/checkpoints,设置支持 s3 协议的集群即 s3.endpoints3.access-keys3.secret-key

    ② 基于 Kubernetes 设置高可用配置

    high-availability 设置为 org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    kubernetes.namespace 是指 kubernetes 的 namespace,kubernetes.service-account 是指 kubernetes 的serviceaccount,high-availability.storageDir 采用 s3 地址,最后 kubernetes.cluster-id 是设置了高可用 configmap 的前缀,例如 flink-dispatcher-leader、flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader 等

    $kubectl get cm |grep flink
    flink-config                                              5      4d19h
    flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader   4      24d
    flink-dispatcher-leader                                    4      28d
    flink-resourcemanager-leader                               2      28d
    flink-restserver-leader                                    2      28d
    $kubectl describe cm flink-config
    Name:         flink-config
    Namespace:    default
    Labels:       <none>
    Annotations:  <none>
    
    Data
    ====
    flink-conf.yaml:
    ----
    省略...
    #共享文件系统S3
    s3.endpoint: http://service-minio:9000
    s3.path.style.access: true
    s3.access-key: admin
    s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    #状态后端配置
    state.backend: filesystem
    state.checkpoints.dir: s3://bucket01/flink/checkpoints
    state.savepoints.dir: s3://bucket01/flink/savepoints
    #HA和k8s参数
    kubernetes.namespace: default
    kubernetes.cluster-id: flink
    kubernetes.service-account: serviceaccount-flink
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://bucket01/flink/ha
    

    2.2 集群 dispatcher 高可用数据

    dispatcher 是管理作业的主节点,高可用数据主要有 dispatcher 主节点的地址非完成状态的作业状态和流图保存地址,其中流图保存地址是 Base64 编码的。如下所示,dispatcher 主节点是akka.tcp://flink@10.244.0.246:8123/user/rpc/dispatcher_1,作业 161511ce1fe78368bc659597e472fb7d 的状态是 Running ,其流图 jobGraph-161511ce1fe78368bc659597e472fb7d 保存在 s3://bucket01/flink/ha/default/submittedJobGraph307e2d6a5be8

    说明:利用 OS 的 Base64 编解码工具,例如,编码是 echo "mmsc" | openssl base64 -e,解码是 echo "bW1zYwo=" | openssl base64 -d

    $kubectl describe cm flink-dispatcher-leader
    Name:         flink-dispatcher-leader
    Namespace:    default
    Labels:       app=flink
                  configmap-type=high-availability
                  type=flink-native-kubernetes
    Annotations:  control-plane.alpha.kubernetes.io/leader:
                    {"holderIdentity":"f7978fe5-962d-4037-aa23-19ff522afbff","leaseDuration":15.000000000,"acquireTime":"2022-05-19T15:09:39.272000Z","renewTi...
    
    Data
    ====
    runningJobsRegistry-161511ce1fe78368bc659597e472fb7d:
    ----
    RUNNING
    sessionId:
    ----
    942d4a50-c31f-47fb-939b-94b14a1121fc
    address:
    ----
    akka.tcp://flink@10.244.0.246:8123/user/rpc/dispatcher_1
    jobGraph-161511ce1fe78368bc659597e472fb7d:
    ----
    rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAADcrNzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAN3MzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvc3VibWl0dGVkSm9iR3JhcGgzMDdlMmQ2YTViZTh4
    Events:  <none>
    $echo "rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAADcrNzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAN3MzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvc3VibWl0dGVkSm9iR3JhcGgzMDdlMmQ2YTViZTh4" | openssl base64 -d
    ▒▒sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle▒U▒+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle▒u▒b▒J  stateSizefilePathtLorg/apache/flink/core/fs/Path;xpr▒srorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
    java.net.URI▒x.C▒I▒LstringtLjava/lang/String;xpt7s3://bucket01/flink/ha/default/submittedJobGraph307e2d6a5be8
    

    2.3 作业的 jobmanager 高可用数据

    作业的高可用数据主要有 作业管理节点的地址当前作业的checkpoint 最新数据的保存地址,其中checkpoint 保存地址是 Base64 编码的。如下所示,作业管理节点是akka.tcp://flink@10.244.0.246:8123/user/rpc/jobmanager_2,该作业最新的 checkpoint 是 checkpointID-0000000000000102688,其保存地址是 s3://bucket01/flink/ha/default/completedCheckpointf07724c0946a

    $kubectl describe cm flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader
    Name:         flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader
    Namespace:    default
    Labels:       app=flink
                  configmap-type=high-availability
                  type=flink-native-kubernetes
    Annotations:  control-plane.alpha.kubernetes.io/leader:
                    {"holderIdentity":"f7978fe5-962d-4037-aa23-19ff522afbff","leaseDuration":15.000000000,"acquireTime":"2022-05-19T15:09:39.988000Z","renewTi...
    
    Data
    ====
    address:
    ----
    akka.tcp://flink@10.244.0.246:8123/user/rpc/jobmanager_2
    checkpointID-0000000000000102688:
    ----
    rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFyhzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAOXMzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvY29tcGxldGVkQ2hlY2twb2ludGYwNzcyNGMwOTQ2YXg=
    counter:
    ----
    102689
    sessionId:
    ----
    766ea025-af00-4b6b-8700-a80c9fa2a4e5
    Events:  <none>
    $echo "rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFyhzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAOXMzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvY29tcGxldGVkQ2hlY2twb2ludGYwNzcyNGMwOTQ2YXg=" | openssl base64 -d
    ▒▒sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle▒U▒+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle▒u▒b▒J  stateSizefilePathtLorg/apache/flink/core/fs/Path;xp(srorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
    java.net.URI▒x.C▒I▒LstringtLjava/lang/String;xpt9s3://bucket01/flink/ha/default/completedCheckpointf07724c0946a
    

    相关文章

      网友评论

        本文标题:【Flink on k8s】高可用的关键机制及configmap

        本文链接:https://www.haomeiwen.com/subject/eslsprtx.html