美文网首页
flink 1.12 native on k8s

flink 1.12 native on k8s

作者: ZYvette | 来源:发表于2021-04-07 20:13 被阅读0次

    目录:

    1. native flink on k8s 部署方式:session && application
    2. natvie flink on k8s高可用方式: k8s HA && ZK HA

    一、部署

    下载flink1.12版本,添加hadoop依赖包,flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar,commons-cli-1.4.jar

    注:hadoop包可以网上找或者自己打包:mvn clean install -DskipTests -Dhadoop.version=3.1.1 ,找到对应flink-shaded-hadoop包即可。

    二、执行

    1.在k8s上创建serviceaccount:

    kubectl create serviceaccount flink
    kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink
    

    在有kubectl环境提交命令

    a. application mode:

    (1) dockerfile:

    FROM flink:1.12.0-scala_2.12
    RUN mkdir -p $FLINK_HOME/usrlib
    COPY flink-clickhousetest-1.0-SNAPSHOT.jar $FLINK_HOME/usrlib/
    

    注:目前只能使用本地包,不能使用外部hdfs之类的jar包路径,所以在打包的时候将执行的jar包打到镜像里。

    (2)执行

    ./bin/flink run-application -p 16 -t kubernetes-application \
    -c test.datawarehouse.DwSplitDataJob \
    -Dkubernetes.jobmanager.service-account=flink \
    -Dkubernetes.cluster-id=flink-k8s-application-cluster \
    -Dtaskmanager.memory.process.size=6g \
    -Dkubernetes.taskmanager.cpu=5 \
    -Dtaskmanager.numberOfTaskSlots=16 \
    -Dkubernetes.container.image=flink-test12:v1 \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    local:///opt/flink/usrlib/flink-clickhousetest-1.0-SNAPSHOT.jar
    

    b. session mode:

    ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=flink-k8s-session-cluster \
    -Dkubernetes.jobmanager.service-account=flink \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dtaskmanager.numberOfTaskSlots=16 \
    -Dtaskmanager.memory.process.size=6g \
    -Dkubernetes.taskmanager.cpu=5
    

    session mode任务提交和standalone模式相同

    三、相关问题

    1. hadoop依赖问题
      参考:https://www.jianshu.com/p/cd5f8e2e2e9c

    2. jobmanager重启,日志报错Too old resource version
      参考:https://www.jianshu.com/p/bddda290e0a8

    四、JM高可用

    背景

    高可用目的:JobManager 对任务起到调度协调作用,他管理这任务调度和资源管理。默认是一个flink集群有单个JM,当JM挂了会导致单点失败,其他任务无法提交。JM高可用的目的是为了消除单点问题。


    image.png

    问题1:对比hdfs和flink高可用区别:

    HDFS的HA切换,主要保证的是数据请求处理的正常服务。而Flink要让所有的失败任务能够快速恢复。我们可以从更高层面来理解这样的差异:一个是存储系统的HA实现,一个是计算框架的HA实现。

    问题2:flink高可用需要什么?

    FlinkJobMnager在服务发生切换的时候要及时地通知外界事物。这里的外界事物包括:

    • JobManager管理的TaskManager
    • 在跑的所有Job
    • 在请求的JobClient客户端
      然后这些Job,JobClient收到新的leader信息后,能够主动重新连接新的JobManager地址,保证任务的正常执行。

    高可用实现方式

    1. k8s HA service方式

    a.配置高可用参数

    kubernetes.cluster-id: <cluster-id>
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: hdfs:///flink/recovery
    

    b. 提交任务执行


    configmap image.png

    k8s HA service方式,是当JM挂了之后,会重新创建新的JM,从hdfs 数据恢复,重新执行任务。

    实际存储:


    image.png

    1)blob:存储用户jar之类的
    2)checkpoint:存储任务上次完成的checkpoint指针
    3) jobgraph:存储任务jobgraph

    测试现象:使用 k8s configmap实现高可用,当JM挂了之后,会自动新建一个JM,然后整体任务重启。

    参考:https://zhuanlan.zhihu.com/p/89537466

    2. ZK HA services

    high-availability.storageDir: hdfs:///user/flink/zk/recovery
    high-availability: zookeeper
    high-availability.zookeeper.quorum: ip:2181
    high-availability.zookeeper.path.root: /flink
    

    测试现象:使用zk 实现高可用,当JM挂了之后,会自动新建JM同时新建对应TM,旧的TM会在任务正常后删除。

    其他

    flink 高可用局限:JM挂了之后任务需要重启而不是无缝连接
    参考文章:https://zhuanlan.zhihu.com/p/89537466

    相关文章

      网友评论

          本文标题:flink 1.12 native on k8s

          本文链接:https://www.haomeiwen.com/subject/voqekltx.html