美文网首页Big Data
flink的状态与容错

flink的状态与容错

作者: 盗梦者_56f2 | 来源:发表于2019-07-03 12:01 被阅读0次

状态

状态性的函数和操作通过处理单个(元素/事件)存储数据,使任何类型的state可以构建更复杂的操作。
flink可以使用checkpoints对statue进行容错管理,并且允许对流应用程序执行savepoint。

按基本类型划分:Flink有两种基本的状态:

  • Keyed State:和Key有关的状态类型,它只能被基于KeyedStream(通过stream.keyBy(…)创建)之上的操作或方法所使用。
  • Operator State:它是和Key无关的一种状态类型。它相当于一个并行度实例对应一份状态数据。

按组织形式划分:又可以分为一下两类:

  • Managed State,这类State的内部结构完全由Flink runtime内部来控制,包括如何将它们编码写入到checkpoint中等等。
  • Raw State,这类State就比较显得灵活一些,它们被保留在操作运行实例内部的数据结构中。从Flink系统角度来观察,在checkpoint时,它只知道的是这些状态数据是以连续字节的形式被写入checkpoint中。等待进行状态恢复时,又从字节数据反序列化为状态对象。

Managed State可以在所有的data stream相关方法中被使用,官方也是推荐优先使用这类State,因为它能被Flink runtime内部做自动重分布而且能被更好地进行内存管理。

在Flink内部,我们能够对State设置TTL,使其状态过期然后被系统清理掉。

Broadcast State 是 Flink 支持的另一种扩展方式。用来支持将某一个流的数据广播到所有下游任务,数据被存储在本地,接受到广播的流在操作时可以使用这些数据。

容错

为了使状态容错,Flink需要对状态进行checkpointcheckpoint允许Flink恢复流中的状态和位置,从而为应用程序提供与无故障执行相同的语义。
检查点机制:检查点定期触发,产生快照,快照中记录了(1)当前检查点开始时数据源(例如Kafka)中消息的offset,(2)记录了所有有状态的operator当前的状态信息(例如sum中的数值)。
Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认checkpoint功能是disabled的,想要使用的时候需要先启用
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
//checkpoint的checkPointMode有两种,Exactly-once和At-least-once
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Flink提供了不同的状态后端,用于设置状态的存储方式和位置

rocksdb(RocksDBStateBackend)
val env = StreamExecutionEnvironment.getExecutionEnvironment()
//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
env.setStateBackend(...)
//或者修改flink-conf.yaml
//提供三种方式:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), 

Flink内部支持定制化的State序列化器/反序列化实现。这里的序列化过程指的是将状态数据序列为字节数据写到checkpoint中,再从checkpoint文件字节数据反序列为状态对象数据。针对不同类型的State数据,可以定义各自不同的序列化/反序列的实现。

DataSet API中程序的容错性是通过重试失败的执行来实现的。

val env = ExecutionEnvironment.getExecutionEnvironment()
//重试次数
env.setNumberOfExecutionRetries(3)
//重试间隔时长
env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
//或者在flink-conf.yaml中配置
execution-retries.default: 3
execution-retries.delay: 10 s

相关文章

  • Flink状态与容错

    一致性检查点 1.什么是一致性检查点 Flink故障恢复机制的核心,就是应用的一致性检查点。有状态应用的一致性检查...

  • flink的状态与容错

    状态 状态性的函数和操作通过处理单个(元素/事件)存储数据,使任何类型的state可以构建更复杂的操作。flink...

  • flink状态容错

    什么是State(状态)? 某task/operator在某时刻的一个中间结果 快照(shapshot) 在fli...

  • Flink检查点机制与状态管理

    1 检查点机制 1.1 CheckPoints 为了使 Flink 的状态具有良好的容错性,Flink 提供了检查...

  • Flink Streaming状态与容错概述

    概述 状态函数(State Function)和操作符可以跨单个元素/事件处理存储数据,State是任何类型操作的...

  • Flink状态管理与容错机制

  • flink状态和容错

    flink是有状态的计算,可以存储一些中间过程和结果在内部存储里。 状态有三种存储方案MemoryStateBac...

  • Flink 2.2CheckPoint

    CheckPointCheckPoint用于flink的故障恢复/容错机制,保存任务的状态(所有任务(source...

  • Flink Checkpoint机制解析-代码走读

    Flink的Checkpoint机制是Flink容错能力的基本保证,能够对流处理运行时的状态进行保存,当故障发生时...

  • Checkpoints

    原文链接 概述 Checkpoint通过允许从状态和相应流的位置进行恢复,从而使Flink中的状态具备容错能力,从...

网友评论

    本文标题:flink的状态与容错

    本文链接:https://www.haomeiwen.com/subject/gjvacctx.html