Flink JobManager HA高可用

作者: it_zzy | 来源:发表于2019-03-07 23:28 被阅读6次

    Flink JobManager HA高可用


    概述

    本文主要讲解下Flink standalone下JobManager的HA高可用和Flink on yarn下JobManager的HA高可用。

    JobManager 高可用(HA)

    • jobManager协调每个flink任务部署。它负责任务调度和资源管理。

    • 默认情况下,每个flink集群只有一个JobManager,这将导致一个单点故障(SPOF):如果JobManager挂了,则不能提交新的任务,并且运行中的程序也会失败。

    • 使用JobManager HA,集群可以从JobManager故障中恢复,从而避免SPOF(单点故障) 。 用户可以在standalone或 YARN集群 模式下,配置集群高可用

    JobManager HA配置步骤

    • Standalone集群的高可用

      Standalone模式(独立模式)下JobManager的高可用性的基本思想是,任何时候都有一个 Master JobManager ,并且多个Standby JobManagers 。 Standby JobManagers可以在Master JobManager 挂掉的情况下接管集群成为Master JobManager。 这样保证了没有单点故障,一旦某一个Standby JobManager接管集群,程序就可以继续运行。 Standby JobManager和Master JobManager实例之间没有明确区别。 每个JobManager都可以成为Master或Standby节点

    • Yarn 集群高可用

      flink on yarn的HA 其实主要是利用yarn自己的job恢复机制

    Flink Standalone集群HA配置

    1.HA集群环境规划

    使用两台节点实现两主两从集群
    注意:
    要启用JobManager高可用性,必须将高可用性模式设置为zookeeper,配置一个ZooKeeper
    quorum,并配置一个masters文件存储所有JobManager hostname及其Web UI端口号。
    Flink利用ZooKeeper实现运行中的JobManager节点之间的分布式协调。ZooKeeper是独立
    于Flink的服务,它通过领导选举制和轻量级状态一致性存储来提供高度可靠的分布式协调。

    2. 开始配置+启动

    集群内所有节点的配置都一样,所以先从第一台机器50.63,50.64开始配置
    ssh data-hadoop-50-63.xxx.com

    Stanalone的配置
    #首先按照之前配置 standalone 的参数进行修改
    vi conf/flink-conf.yaml
    jobmanager.rpc.address: data-hadoop-50-63.xxx.com
    vi conf/slaves
    data-hadoop-50-63.xxx.com
    data-hadoop-50-64.xxx.com
    
    HA的配置
    [iknow@data-hadoop-50-63 flink-1.7.2]$ cat conf/slaves
    #localhost
    data-hadoop-50-63.xxx.com
    data-hadoop-50-64.xxx.com
    
    #jobmanager.rpc.address: localhost
    jobmanager.rpc.address: data-hadoop-50-63.xxx.com
    #jobmanager.rpc.address: 192.168.50.63
    
    # high-availability: zookeeper
    high-availability: zookeeper
    #ZooKeeper 节点根目录,其下放置所有集群节点的 namespace
    high-availability.zookeeper.path.root: /flink-standalone-ha
    
    # high-availability.storageDir: hdfs:///flink/ha/
    #ZooKeeper节点集群id,其中放置了集群所需的所有协调数据
    high-availability.cluster-id: /cluster_one
    #建议指定 hdfs 的全路径。如果某个 flink 节点没有配置 hdfs 的话,不指定全路径无法识别
    # storageDir 存储了恢复一个 JobManager 所需的所有元数据。
    high-availability.storageDir: hdfs://data-hadoop-50-63.xxx.com:9000/flink/flink-standalone-ha
    
    # high-availability.zookeeper.quorum: localhost:2181
    high-availability.zookeeper.quorum: data-hadoop-50-63.xxx.com:2181
    
    #rest.port: 8081
    rest.port: 18081
    
    启动服务

    先启动 hadoop 服务

    sbin/start-all.sh

    先启动 zk 服务

    bin/zkServer.sh start

    启动flink standaloneHA集群,在50.63节点上启动如下命令

    bin/start-cluster.sh

    去zk里查看
    ./bin/zkCli.sh


    启动HA集群

    [iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/start-cluster.sh
    Starting HA cluster with 2 masters.
    Starting standalonesession daemon on host data-hadoop-50-63.bjrs.zybang.com.
    Starting standalonesession daemon on host data-hadoop-50-64.bjrs.zybang.com.
    Starting taskexecutor daemon on host data-hadoop-50-63.bjrs.zybang.com.
    Starting taskexecutor daemon on host data-hadoop-50-64.bjrs.zybang.com.
    
    3. 验证 HA 集群进程

    50.63启动的进程



    50.64上启动的进程


    Flink web ui未启动在18081端口上,


    通过查看50.63日志发现flink web ui启动在45141端口,

    2019-02-25 10:48:48,787 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at data-hadoop-50-63.bjrs.com:45141
    
    2019-02-25 10:48:48,813 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://data-hadoop-50-63.bjrs.zybang.com:45141.
    2019-02-25 10:48:48,851 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://data-hadoop-50-63.bjrs.zybang.com:45141 was granted leadership with leaderSessionID=3a3047aa-bdb8-4234-9e1a-d3900081f169
    2019-02-25 10:48:48,878 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
    2019-02-25 10:48:48,911 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher
    

    jobManager启动在50.63上


    50.64日志:


    2019-02-25 10:48:46,421 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at data-hadoop-50-64.bjrs.zybang.com:41511
    2019-02-25 10:48:46,421 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
    2019-02-25 10:48:46,444 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://data-hadoop-50-64.bjrs.zybang.com:41511.
    2019-02-25 10:48:46,502 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
    2019-02-25 10:48:46,527 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
    
    4. 模拟jobmanager进程挂掉

    现在50.63节点上的jobmanager是active的。我们手工把这个进程kill掉,模拟进程
    挂掉的情况,来验证50.64上的standby状态的jobmanager是否可以正常切换到active。

    [iknow@data-hadoop-50-63 flink-1.7.2]$ jps
    428021 DataNode
    431794 TaskManagerRunner
    428320 SecondaryNameNode
    429073 NodeManager
    76966 Worker
    437470 Jps
    427770 NameNode
    431177 StandaloneSessionClusterEntrypoint
    429833 QuorumPeerMain
    [iknow@data-hadoop-50-63 flink-1.7.2]$ kill -9 431177
    [iknow@data-hadoop-50-63 flink-1.7.2]$ jps
    428021 DataNode
    431794 TaskManagerRunner
    428320 SecondaryNameNode
    429073 NodeManager
    76966 Worker
    427770 NameNode
    437867 Jps
    429833 QuorumPeerMain
    
    5.验证HA切换

    Kill掉50.63上的jobmanager再次访问50.63上的JobManager是访问不了的;
    50.63节点上的jobmanager进程被手工kill掉了,然后50.64上的jobmanager会
    自动切换为active,中间需要有一个时间差,稍微等一下。
    Kill掉50.63上的jobManager之后,访问50.64

    6.重启之前kill掉的jobmanager

    在50.64上重启之前被kill的jobmanager

    [iknow@data-hadoop-50-63 flink-1.7.2]$ jps
    428021 DataNode
    431794 TaskManagerRunner
    428320 SecondaryNameNode
    76966 Worker
    427770 NameNode
    445947 Jps
    429833 QuorumPeerMain
    [iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/jobmanager.sh start
    Starting standalonesession daemon on host data-hadoop-50-63.bjrs.zybang.com.
    [iknow@data-hadoop-50-63 flink-1.7.2]$ jps
    446533 StandaloneSessionClusterEntrypoint
    428021 DataNode
    431794 TaskManagerRunner
    428320 SecondaryNameNode
    76966 Worker
    427770 NameNode
    446667 Jps
    429833 QuorumPeerMain
    

    这个时候active的jobManager还是50.64


    查看hdfs,看到是有/flink/flink-standalone-ha/cluster_one 目录生成的(blob是什么目录,后面在研究下,从flink jobManager的日志看是有生成blob目录的)


    2019-02-25 10:48:47,747 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://flink@data-hadoop-50-63.bjrs.zybang.com:36169
    2019-02-25 10:48:48,042 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating highly available BLOB storage directory at hdfs://data-hadoop-50-63.bjrs.zybang.com:9000/flink/flink-standalone-ha//cluster_one/blob
    

    Flink on yarn集群HA

    1. HA集群环境规划

    flink on yarn的HA其实是利用yarn自己的恢复机制。
    在这需要用到zk,主要是因为虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper 中,所以我们还要配置 zookeeper 的信息。

    首先需要修改hadoop中 yarn-site.xml 中的配置,设置提交应用程序的最大尝试次数

    <property>
    <name>yarn.resourcemanager.am.max-attempts</name>
    <value>8</value>
    <description>
    The maximum number of application master execution attempts.
    </description>
    

    配置Yarn重试次数

    vi conf/flink-conf.yaml
    yarn.application-attempts: 8
    

    此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
    注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.

    3. 启动flink on yarn,测试HA

    在50.63上启动zk和hadoop

    bin/zkServer.sh start
    sbin/start-all.sh
    
    2019-02-25 11:48:20,914 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=data-hadoop-50-63.bjrs.zybang.com:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@182f1e9a
    2019-02-25 11:48:20,926 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-7156394372293067519.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
    2019-02-25 11:48:20,929 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181
    2019-02-25 11:48:20,930 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181, initiating session
    2019-02-25 11:48:20,930 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed
    2019-02-25 11:48:20,938 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181, sessionid = 0x169228cb2e10006, negotiated timeout = 40000
    2019-02-25 11:48:20,940 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
    2019-02-25 11:48:21,093 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
    Flink JobManager is now running on data-hadoop-50-63.bjrs.zybang.com:38695 with leader id 45d62639-aac4-4903-ae50-a4d69f987e4a.
    JobManager Web Interface: http://data-hadoop-50-63.bjrs.zybang.com:34621
    2019-02-25 11:48:21,155 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
    yarn application -kill application_1551066475117_0001
    

    Hdfs上有HA对应的目录生成


    YARN_CONF_DIR(如果没设置则在HADOOP_CONF_DIR)下看看日志情况,当前AM的日志路径在$HADOOP_CONF_DIR/userlogs//下,可以看出Yarn在重启YarnApplicationMasterRunner进程,并在重启期后重新提交Flink的Job。
    Hadoop下是有logs/userlogs/目录的

    jobmanager 进程就在对应的节点的(YarnSessionClusterEntrypoint)进程里面
    所以想要测试 jobmanager 的 HA 情况,只需要拿 YarnSessionClusterEntrypoint 这个进程进行测试即可。
    执行下面命令手工模拟 kill 掉 jobmanager(YarnSessionClusterEntrypoint)
    Kill 掉jobManager进程

    [iknow@data-hadoop-50-63 flink-1.7.2]$ jps
    428021 DataNode
    428320 SecondaryNameNode
    76966 Worker
    458095 NodeManager
    729 YarnSessionClusterEntrypoint
    427770 NameNode
    457803 ResourceManager
    1581 Jps
    457275 FlinkYarnSessionCli
    429833 QuorumPeerMain
    [iknow@data-hadoop-50-63 flink-1.7.2]$ kill -9 729
    [iknow@data-hadoop-50-63 flink-1.7.2]$ jps
    428021 DataNode
    3540 Jps
    428320 SecondaryNameNode
    76966 Worker
    458095 NodeManager
    427770 NameNode
    457803 ResourceManager
    457275 FlinkYarnSessionCli
    3438 YarnSessionClusterEntrypoint
    429833 QuorumPeerMain
    

    如图看到Attempt ID由000001变为000002、000003…,示进程也有所变化,,说明HA切换成功了。
    进入zk查看zk里的目录,flink-yarn-ha是有生成的。


    ./bin/zkCli.sh 命令进去zk,quit退出zk。

    HA配置,这里只列出High Availability的配置,其他的配置未列出

    #==============================================================================
    # High Availability
    #==============================================================================
    
    # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
    #
    # high-availability: zookeeper
    high-availability: zookeeper
    high-availability.zookeeper.path.root: /flink-yarn-ha
    # The path where metadata for master recovery is persisted. While ZooKeeper stores
    # the small ground truth for checkpoint and leader election, this location stores
    # the larger objects, like persisted dataflow graphs.
    #
    # Must be a durable file system that is accessible from all nodes
    # (like HDFS, S3, Ceph, nfs, ...)
    #
    # high-availability.storageDir: hdfs:///flink/ha/
    #high-availability.cluster-id: /cluster_one
    high-availability.storageDir: hdfs://data-hadoop-50-63.xxx.com:9000/flink/flink-yarn-ha
    
    # The list of ZooKeeper quorum peers that coordinate the high-availability
    # setup. This must be a list of the form:
    # "host1:clientPort,host2:clientPort,..." (default client
    #
    # high-availability.zookeeper.quorum: localhost:2181
    high-availability.zookeeper.quorum: data-hadoop-50-63.xxx.com:2181
    yarn.application-attempts: 8
    high-availability.jobmanager.port: 18085
    
    # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
    # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
    # The default value is "open" and it can be changed to "creator" if ZK security is enabled
    #
    # high-availability.zookeeper.client.acl: open
    

    相关文章

      网友评论

        本文标题:Flink JobManager HA高可用

        本文链接:https://www.haomeiwen.com/subject/lqxauqtx.html