美文网首页Flink学习指南
Flink总结-状态保存

Flink总结-状态保存

作者: zachary_1db5 | 来源:发表于2018-04-19 20:48 被阅读0次

官方文档
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/

所有的functions和operator在flink中,都是可以stateful。stateful functions通过处理单独的元素/事件

为了使得 state有容错性,flink需要使用checkpoint状态.Checkpoint允许flink恢复状态和位置在流中,使得应用有相同的预付达到任意失败的运行。

状态有两种

  • keyed状态.Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream.
  • 操作的状态.With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.

CheckPoints

https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#overview

打开和配置Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
//当程序关闭的时候,会触发额外的checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

相关联的配置项

一些相关联的参数在conf/flink-conf.yaml

  • state.backend。如果打开,可以用以存储operator的状态的checkpoint。支持的后端有:
    • jobmanager In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
    • filesystem: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
  • state.backend.fs.checkpointdir:存储checkpoint的目录,文件系统是flink支持的文件系统。注意:State backend必须从jobmanager可访问,使用flie:// 只能在local搭建的情况下。
  • state.backend.rocksdb.checkpointdir
  • state.checkpoints.dir
  • state.checkpoints.num-retained

Resuming from an externalized checkpoint

A job may be resumed from an externalized checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).

需要从checkpoint的meta数据恢复程序。注意:如果meta data文件不是自包含的,jobmanager就需要访问关联的数据文件

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]

注意:
This directory will then contain the checkpoint meta data required to restore the checkpoint. For the MemoryStateBackend, this meta data file will be self-contained and no further files are needed.

FsStateBackend and RocksDBStateBackend write separate data files and only write the paths to these files into the meta data file. These data files are stored at the path given to the state back-end during construction.

在使用FsStateBackend and RocksDBStateBackend 情况下,会把文件分开存储,只需要填写这些meta文件保存的路径即可。

Directory Structure

可以通过配置state.checkpoints.dir

比如:
state.checkpoints.dir: hdfs:///checkpoints/

这些文件是可以在保存在后端的时候通过construction指定的。经验证,是可行的。

env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");

checkpoint与savepoint的区别

  • use a state backend specific (low-level) data format,
  • may be incremental,
  • do not support Flink specific features like rescaling.

Savepoints

Triggering Savepoints

When triggering a savepoint, a new savepoint directory beneath the target directory is created. In there, the data as well as the meta data will be stored. For example with a FsStateBackend or RocksDBStateBackend:

# Savepoint target directory
/savepoints/

# Savepoint directory
/savepoints/savepoint-:shortjobid-:savepointid/

# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-:shortjobid-:savepointid/_metadata

# Savepoint state
/savepoints/savepoint-:shortjobid-:savepointid/...

相关文章

  • Flink总结-状态保存

    官方文档https://ci.apache.org/projects/flink/flink-docs-relea...

  • Flink Checkpoint机制解析-代码走读

    Flink的Checkpoint机制是Flink容错能力的基本保证,能够对流处理运行时的状态进行保存,当故障发生时...

  • Flink 2.2CheckPoint

    CheckPointCheckPoint用于flink的故障恢复/容错机制,保存任务的状态(所有任务(source...

  • Flink状态版本控制之保存点

    之前说过Flink通过检查点,用来在故障发生时重新处理记录,从而修正状态。Flink用户还可以通过另一个特性有意识...

  • flink中的JobListeningContext类

    flink作业上下文监听器——JobListeningContext保存监视正在运行的作业并接收其结果所需的状态。...

  • [Flink原理]-一文入门Flink中对状态的管理

    概述:状态作为流计算的核心属性,Flink针对状态做了很多的处理,即你可以将中间的计算结果进行保存,并提供给后续的...

  • 译:Flink---状态

    Flink 1.7 Google翻译 键控状态和操作状态 Flink中有两种基本的状态类型:键控状态和运算符状态 ...

  • 【Flink】Flink 状态管理

    [TOC] 一、前言 有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入...

  • Flink 状态

    state创建 state清理 (TTL/clear) state存储 (分布式) state的恢复 flink中...

  • flink状态

    Flink从外部数据源持续接收数据,每接收一条数据就会触发相应的计算操作。当Flink对数据进行聚合操作时,不可能...

网友评论

    本文标题:Flink总结-状态保存

    本文链接:https://www.haomeiwen.com/subject/bkqgkftx.html