美文网首页
Flink知识点

Flink知识点

作者: 虎不知 | 来源:发表于2020-07-08 15:30 被阅读0次

    1savepoint checkpoint savepoint是checkpoint的一种特殊方式,手动保存,其实就是指针。savepoint不会过期 不会覆盖 除非手动删除

    2.jobmanager相当于resource manager 一般生产会有2个 做ha

    3.OperatorChain的优点:
    (1)减少线程切换
    (2)减少序列化与反序列化
    (3)减少数据在缓冲区的交换
    (4)减少延迟并且提高吞吐能力

    OperatorChain组成条件:
    (1)没有禁用Chain
    (2)上下游算子并行度一致
    (3)下游算子的入度为1(也就是说下游节点没有其他节点的输入)
    (4)上下游算子在同一个slot group
    (5)下游节点的chain策略为always(可以与上下游链接,map、flatmap、filter等默认是always)
    (6)上有节点的chain策略为always或head(只能与下游链接,不能与上有链接,source默认是head)
    (7)上下游算子之间没有数据shuffle(数据分区方式是forward)

    禁用OperatorChain几种方式:
    (1)DataStream的算子操作后调用startNewChain算子
    (2)DataStream调用disableChaining来关闭Chain
    (3)StreamExecutionEnvironment.getExecutionEnvironment.disableOperatorChaining() 全局关闭
    (4) DataStream.slotSharingGroup("name") 设置新的slotgrop名称
    (5)改变并行度

    4.共享slot
    (1)flink集群需要的任务槽与作业中使用的最高并行度正好相同(前提,保持默认SlotSharingGroup)。也就是说我们不需要再去
    计算一个程序总共会起多少个task了
    (2)适当设置sltSharingGroup可以减少每个slot运行的线程数,从而整体上减少机器的负载

    5.slot与parallelism的关系
    (1)默认task slot数与join中task的最高并行度一致

    6.累加器和计数器
    (1)计数器是最简单的累加器
    (2)内置累加器有IntCounter,LongCounter,DoubleCounter
    (3)Histogram 柱状图

    7.控制延迟
    默认情况下,流中的元素并不会一个一个的在网络中传输(这会导致不必要的网络流量消耗)
    ,而是缓存起来,缓存的大小可以在Flink的配置文件、ExecutinEnvironment、设置某个算子
    进行配置(默认100ms)
    这样控制的 好处:提高吞吐 坏处:增加了延迟

    如何把握平衡
    (1)为了最大吞吐量,可以设置setBufferTimeout(-1),这会移出timeout机制,缓存中的数据
    一满就会被发送,不建议用,假如一条信息4 5个小时才来这时候延迟会非常高,会等整个buffer满了再处理
    (2)为了最小延迟,可以将超时设置为接近0的数(例如5或者10ms)
    (3)缓存的超时不要设置0,因为会带来一些性能的损耗

    8.min minby max maxby
    min和minby的区别是min返回一个最小值,而minby返回的是其字段中包含的最小元素

    9.interval join
    在给定周期内,按照指定key对两个KeyedStream进行join操作,把符合join条件的两个
    event拉倒一起,然后怎么处理由用户自己定义
    场景:把一定时间内的相关的分组数据拉成一个宽表

    10connect 和union
    connect之后是connectedStreams,会对两个流的数据应用不同的处理方法,并且双流之间可以
    共享状态(比如计数)。这在第一个流的输入会影响第二个流时会非常有用。
    union合并多个流,新的流包含所有流的数据
    union是DataStream->DataStream
    connect只能连接两个流,而union可以连接多余两个流
    connect两个流类型可以不一致,而union连接的流类型必须一致

    11.assignTimestampsAndWatermarks
    含义:提取记录中的时间戳作为Event time,主要在window操作中发挥作用,不设置默认就是
    Processing time
    限制: 只有基于event time构建window时才有用
    使用场景:当你需要使用event time来创建window时,用来指定如何获取event的时间戳

    12.算子之间传递数据的方式
    (1)One-to-one streams 保持元素的分区和顺序
    (2)重新分区的方式 ,重新分区策略取决于使用的算子 keyby、broadcast、rebalance

    dataStream.shuffle() 按均匀分布随机划分元素,网络开销往往比较大
    dataStream.rebalance() 循环对元素进行分区,为每各分区创建相等负载,解决数据倾斜时非常有用
    dataStream.rescale() 跟rebalance类似,但不是全局的,通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集
    dataStream.broadcast() 将元素广播到每个分区上

    13.三个时间的比较
    一、EventTime
    (1)事件生成的时间,在进入Flink之气就存在,可以从event的字段中抽取
    (2)必须指定watermarks的生产方式
    (3)优势:确定性,乱序、延时、或者数据重放等情况,都能给出正确结果
    (4)弱点:处理无序事件时性能和延迟受到影响

    二、IngerstTime
    (1)事件进入flink的时间,即source里获取的当前系统时间,后续统一使用该时间
    (2)不需要指定watermarks的生产范式(自动生成)
    (3)弱点:不能处理无序事件和延迟数据

    三、ProcessingTime
    (1)执行操作的机器的当前系统时间(每个算子都不一样)
    (2)不需要流和机器之间的协调
    (3)优势:最佳的性能和最低的延迟
    (4)弱点:不确定性,容易受到各种因素影响(event产生的速度、到达flink的速度、算子之间传输速度),压根不管顺序和延迟

    比较
    性能:ProcessingTime>IngestTime>EventTime
    延迟:ProcessingTime<IngestTime<EventTime
    确定性:EventTime>IngestTime>ProcessIngTime

    不设置time类型,默认是processingTime
    通过 env.setStreamTimeCharacteristic()方法设置time类型

    14.watermark
    (1)通常情况下,watermark在source函数中生成,但也可以在source后任何阶段,如果指定多次
    后吗指定的会覆盖前面的值。source的每个sub task独立生成水位线。
    (2)watermark通过操作时会推进算子操作时的event time,同时会为下游生成一个新的watermark
    (3)多输入operator(union、keyby、partition)的当前event time是其输入流event time最小值

    15.timestamp/watermark两种生成方式
    方式1:直接在source function中生成
    方式2:timestamp assigner / watermark generator

    timestamp和watermark都是采用毫秒

    16.两种watermark
    一、周期性 watermark
    (1)基于时间
    (2)ExecutionConfig.setAutoWatermarkInterval(msec) (默认200ms,设置watermarker发生的周期)
    (3)实现AssignerWithPeriodicWatermarks接口

    二、间断的watermark
    (1)基于某些时间出发watermark的生产和发送(由用户代码实现,例如遇到特殊情况)
    (2)实现AssignerWithPeriodicWatermarks接口

    17.处理延迟数据
    方式一:allowedLateness(),设定最大延迟时间,触发被延迟,不宜设置太大
    方式二:sideOutputTag,提供了延迟数据获取的一种方式,这样就不会丢弃数据了,延迟数据单独处理。

    18.windows分类(是否keyby决定了大分类)
    一.Keyed Windows(在已经安装keyby分组的基础上(KeyedStream),再构建多任务并行window)
    stream.keyBy().window()
    二.Non-Keyed Windwos(在未分组的DataStream上构建单任务Window,并行度是1,API都带ALL后缀)
    stream.windowAll()

    19.window窗口生命周期
    创建:当属于第一个元素到达时就会创建该窗口
    销毁:当时间(event/process time)超过窗口的结束时间戳+用户指定的延迟时(allowedLateness<time>),窗口将会移除

    20.触发器
    (1)触发器决定了一个窗口何时可以被窗口函数处理(条件满足时触发并发出信号)
    (2)每一个WindowAssigner都有一个默认的触发器,如果默认触发器不满足需要可以通过trigger()来指定

    触发器有5个方法来允许触发器处理不同的事件(trigger)
    onElement()方法每个元素被添加到窗口是调用
    onEvenTime() 当一个已注册的事件时间计时器启动时调用
    onProcessingTime 当一个已注册的处理时间计时器启动时调用
    onMerge 与状态触发器相关, 当使用session window时两个触发器对应的窗口合并,合并两个触发器的状态
    clear相应窗口被清除时触发

    21驱逐器
    evictor是可选的,WindowAssigner默认没有evictor
    evictor能够在Trigger触发之后以及在应用窗口函数执行前和/或后从窗口中删除无用的元素,类似filter作用
    evictBefore在窗口之前应用
    evictAfter在窗口后应用

    22.如何允许延迟
    (1)当处理event-time的windwo时,可能会出现元素晚到的情况,即flink用来跟踪event-time进度的
    watermark已经过了元素所属窗口的最后时间,属于当前窗口的数据才到达)
    (2)默认情况下,当watermark已经过了窗口的最后时间时,晚到的元素会被丢弃
    (3)Flink允许为窗口操作指定一个最大允许延时时长,Allowed lateness指定,默认情况是0
    (4)水位线已过了窗口最后时间才来的元素,如果还在未到窗口最后时间加延迟时间,任然可以在窗口中计算

    特例:在使用GlobalWindows(全局window),不会考虑延迟,因为窗口的结束时间戳是Long.MAX_VALUE

    23.CoGROUP、Join、Connect
    一、cogroup:
    (1)侧重于group,是对同一个key上的两组集合进行操作
    (2)CoGroup的作用和join基本相同,但有一点不一样的是,如果未能找到新到来的数据与另一个流
    在window中存在的匹配数据,仍会将其输出
    (3)只能在window中用

    二 、join
    (1)对同一个key上的每对元素进行操作
    (2)类似inner join
    (3)按照条件分别取出两个流能匹配的元素,返回个下游处理
    (4)join是cogroup的特例
    (5)只能在window中用

    三、connect
    (1)没有匹配条件,两个流分别处理流各种的逻辑。

    24.Process Function
    Flink提供三层API
    (1)SQL/Table Api
    (2)DataStream API
    (3)ProcessFunction

    不要跟ProcessWindowFunction混为一谈
    ProcessFunction是一个低阶的流处理做操,它可以访问流处理程序的基础构建模块
    (1)事件(event 流元素)
    (2)状态(state容错性,一致性,仅在keyed stream中)
    (3)定时器(event time和processint time,仅在keyed stream中)

    25.Connectors
    Connectors是数据进出flink一套接口和实现,是source和sink的统称
    数据进出flink方式不止connectors,还有:(1)Async I/O 异步IO (2)Queryable State

    自定义source
    (1)实现SourceFunction 非并行,并行度为1
    (2)实现ParallelSourceFunction
    (3)继承RichParallelSourceFunction

    kafka-connector:
    (1)基于Kafka的partition机制,Flink实现了并行化数据切分
    (2)Flink可以效仿Kafka的topic,和sink数据到kafka
    (3)出现失败,flink协调kafka来恢复应用(通过设置kafka的offset)

    FlinkKafkaConsumer消费模式
    (1)setStartFromEarliest 从队列头开始消费,最早的记录
    (2)setStartFromLatest 从队列为开始消费,最新的记录
    (3)setStartFromGroupOffsets 默认值,从当前消费组记录偏移量开始消费
    (4)setStartFromSpecificOffsets(Map<TopicPartition,Long>)从指定位置开始消费
    (5)setStartFromTimestamp(Long) 从指定时间戳爱是消费

    FlinkKafkaSource的容错性
    (1)env.enableCheckpointing() 启动检查点
    (2)如果flink启用了检查点,将会周期性的checkpoint其kafka的偏移量
    (3)保证了仅一次消费
    (4)如果作业失败,flink将程序恢复到最新检查点的状态,并从检查点中存储的偏移量开始消费Kafka中的记录
    (5)内部实现checkpointFunction
    (6)内部保存ListState<Tuple2<KafkaTopicPartition,Long>>

    不同情况下消费位置分析
    (1)第一次启动,无savepoint(常规情况) 。 由消费者模式决定
    (2)通过savepoint启动(应用升级,比如加大并行度)。 由savepoint记录的offset决定
    (3)有checkpoint,失败后,job恢复的情况。 由checkpoint的snapshot中记录的offset决定
    (4)无checkpoint,失败后,job恢复的情况。 由消费者模式决定

    动态Partition discovery
    (1)Flink Kafka Consumer支持动态发现Kafka分区,且能保证exactly-once
    (2)默认禁止动态发现分区,把flink.partition-discover.interval-milllis设置大于0即可开启
    properties.setProperty("flink.partition-discover.interval-milllis","30000")

    26.Sate 状态
    Flink的状态:一般指一个具体的task/operator某时刻在内存中的的状态(例如某属性的值)
    注意:State和checkpointing不要搞混
    checkpoint 则表示了一个flink job ,在一个特定时一份全局状态快照,即包含了一个job下所有task/operator某时刻的状态

    状态的错用
    一.增量计算
    (1)聚合操作
    (2)机器学习训练模型迭代运算时保持当前模型
    二.容错
    (1)job故障重启
    (2)flink程序升级

    状态分类
    (1)Operator State 每个流普通的Operator的状态
    (2)Keyed State Keyed Streaming的状态
    (3)特殊的:Broadcast State(1.5开始)

    Keyed State支持的数据结构
    (1)ValueState
    (2)ListState
    (3)ReducingState
    (4)AggregatingState
    (5)FoldingState
    (6)MapState

    注意:
    (1)状态不一定存储在内部,可能驻留在磁盘或其他地方
    (2)状态是使用RunntimContext方法的,因此只能在Rich函数中访问

    Keyed State和Operator State,可以以两种形式存在:原始状态和托管状态
    managerd(托管状态): 如ValueState,ListState,MapState等,通过框架接口来更新和管理,不需要序列化
    raw(原始状态):原始状态是由用户自行管理的具体数据结构,如checkpoint的时候,使用byte[]来读写状态内容,需要序列化

    通常在DataStream上的状态推荐使用托管状态,当用户自定义operator时,会使用到原始状态

    Keyed State TTL
    任何类型的keyed都可以设置TTL。如果设置TTL已配置,且状态值已过期,则将以最佳方式清理
    所有State collection都支持条目级别TTL,即list、map中的条目独立expire
    用法:
    val ttlConfig=StateTtlConfig.newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build()
    val descripe = new ValueStateDescriptor("avgState", TypeInformation.of(new TypeHint[(Long, Long)] {}))
    descripe.enableTimeToLive(ttlConfig)

    TTL相关配置
    一.Refresh策略(默认onCreateAndWrite):设置如何更新keyedState的最后访问时间
    StateTtlConfig.UpdateType.Disabled 禁用ttl
    StateTtlConfig.UpdateType.OnCreateAndWrite 每次写操作均更新State的最后访问时间(Create、Update)
    StateTtlConfig.UpdateType.OnReadAndWrite 每次读写操作均更新State的最后访问时间
    二.状可见性(默认是NeverReturnExpired):设置是否返回过期的值(过期尚未处理,此时正好被访问)
    StateTtlConfig.StateVisibility.NeverReturnExpired 用不返回过期的值
    StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp 可以返回过期但尚未清理的状态值

    TTL注意事项
    (1)启用TTL增加后端状态存储消耗
    (2)原来没启用TTL,后来启用TTL做恢复会将导致兼容性失败和statmigrationexception(反之一样)
    (3)TTL配置不是检查或保存点的一部分

    26.Broadcast state使用套路(三步)
    (1)创建dataStream
    (2)创建BroadcastedStream:创建规则流/配置流(低吞吐)并广播
    (3)连接两个流进行计算 connect,proccess(BroadcastProcessFunction and keyedBroadcastProcessFunction)

    27.checkpoint状态容错
    (1)有了状态自然需要状态容错,否则就失去意义了,flink状态容错机制就是checkpoint
    (2)checkpoint是通过分布式snapshot实现的,没有特殊声明时snapshot和checkpoint和back-up是一个意思

    特点:(1)异步 (2)全量和增量都可以设置 (3)Barrier机制 (4)失败情况下可回滚到最近成功一次的checkpoint (5)周期性

    使用checkpoint前置条件:
    (1)在一定时间内可回溯的datasource 例如:kafka、rabiitma、hdfs
    (2)可持久化存储state的存储系统,通常使用分布式文件系统,一般是hdfs,s3,nfs

    checkmode:一般选择EXACTLY_ONCE,除非场景要求极低会选择AT_LEAST_ONCE(几毫秒)

    checkpoint高级选项值保留策略
    默认情况下检查点不会被保留,仅用于从故障中恢复作业。可以启用外部持久化检查点,同事指定保留策略
    checkpointConfg.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    (1)CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION 在作业被取消时保留检查点。这种情况取消后必须手动清除检查点
    (2)CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION 在作业被取消(cancel)时会删除检查点,等于不启用。

    setCheckpointTimeout 设置超时时间,超过时间没有完成checkpoint则被终止
    setMinPauseBetweenCheckpoints 最小间隔,上一个checkpoint完成最少等待多久发出下一个checkpoint请求
    setMaxConcurrentCheckpoints 指定运行中多少并行度进行checkpoint

    使用checkpoint第二部:选择合适的State Backed
    (1)默认State保存在taskmanager的内存中
    (2)checkpoint机制会持久化所有状态的一致性快照
    快照保存由State Backend来决定,目前flink自带三个State Backed:
    (1)MemoryStateBackend(默认)
    (2)FsStateBackend
    (3)RocksDBStateBackend

    一、MemoryStateBackend
    (1)MemoryStateBackend是一个内部状态backend,用于维护Java堆上的状态。Key/value状态和窗口运算符包含存储值和计时器的哈希表
    (2)Checkpoint时,MemoryStateBackend会对state做一次快照,并像jobManager发送checkpoint确认完成的消息中带上此快照数据,然后快照会存储在JobManager的堆内存中
    (3)MemoeyStateBackend默认开启异步方式进行快照,推荐使用异步避免阻塞。如果要阻塞可以传false,如下
    val memoryStateBackend:StateBackend=new MemoryStateBackend(1010241024,false)
    env.setStateBackend(memoryStateBackend)
    (4)限制:单个state默认5mb,可以在MemoryStateBackend的构造函数指定。不论如何设置,State大小无法大于akka.framesize(JobManager和TaskManager之间发送的最大消息的大小默认10mb)。Job Manager必须有足够内存
    (5)适用场景:本地开发和测试 小状态job,如只使用Map FlatMap Fliter或Kaka Consumer

    二、FsStateBackend
    (1)FsStateBackend需要配置一个文件系统URL来,如hdfs://namenode:8080/flink/checkpoint
    (2)FsStateBackend在TaskManager的内存中持有正在处理的数据。checkpoint时将state snapshot写入文件系统目录下的文件中。
    (3)FsStateBackend默认开启异步方式进行快照,构造方法如下
    val stateBackend:StateBackend=new FsStateBackend("hdfs://namenode:9000/flink/checkpoint",false)
    env.setStateBackend(stateBackend)
    (4)适用场景:大状态、长窗口、大键/值状态的job

    三、RocksDBStateBackend
    (1)RocksDBStateBackend需要配置一个文件系统的URL。如hdfs://namenode:8080/flink/checkpoint
    (2)RocksDBStateBackend运行中的数据保存在RockDB数据库中,默认情况下存储在TaskManager数据目录中。
    在Checkpoint时,整个RocksDB数据库将被checkpointed到配置的文件系统和目录中
    (3)RocksDBSateBackend 始终是异步
    (4)RocksDB JNI API是基于Byte[],因此key和value最大支持2^31个字节(2GB)
    (5)适用场景:超大窗口,超大状态,大键/值状态的job
    (6)只有RockDBStateBackend支持增量checkpoint
    (7)状态保存在数据块中,只受可用磁盘空间量限制,但开销更大(读/写需要反序列化与序列化),吞吐收到限制
    使用需要导包:
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_{scala.binary.version}</artifactId> <version>{flink.version}</version>
    </dependency>
    val stateBackend:StateBackend=new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoint",true)
    env.setStateBackend(stateBackend)

    配置重启策略
    Flink支持不同的重启策略,这些策略控制在出现故障时如何重新启动job
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)))
    (1)如果没用启动checkpoint,则使用无重启方案
    (2)如果启用了checkpoint,但是没有配重启方案,则使用固定延迟策略,尝试次数是Integer.MAX_VALUE

    28.Savepoint
    概念:手工触发,通过checkpointing机制创建的streaming job的一致性快照称之为Savepoint
    Savepoint由两部分组成:
    (1)数据目录:稳定存储上的目录
    (2)元数据文件:指向数据目录属于当前Savepoint的数据文件的指针

    直接触发Savepoint(想象你要为数据库做个备份)
    bin/flink savepoint :jobId [ 目录]

    直接触发savepoint(flink on yarn)
    bin/flink savepoint :jobid [目录] -yid :yarnApplicationId

    Cancel Job with Savepoint
    bin/flink cncel -s [目录] :jobid

    从指定Savepoint恢复job
    bin/flink run -s :savepointPath

    从指定Savepoint恢复job
    bin/flin run -s:savepointPath -n

    相关文章

      网友评论

          本文标题:Flink知识点

          本文链接:https://www.haomeiwen.com/subject/nlsccktx.html