Flink 流计算算子函数详解

作者: Tim在路上 | 来源:发表于2019-12-03 09:31 被阅读0次

Flink 的算子函数和spark的大致一样,但是由于其是流处理的模式,所有还要有需要加强理解的地方

Flink 中 和spark算子一致的算子

  1. Map, FlaMap 做一对一,一对多映射
  2. Reuce 多对一进行聚合
  3. 聚合函数,sum,min,minBy,MaxBy 等
  4. keyBy 按Key进行分组 名字不一样但是操作一样。

Flink 特有的或需要重新理解的算子

  1. 窗口函数: 窗口函数用于对每一个key开窗口,windowsAll 全体元素开窗口
text.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
text.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

窗口函数实际上分为滚动时间窗口,滑动时间窗口,会话窗口

滚动时间窗口不会发生重叠, 滑动时间窗口,当步长小于窗口大小,就会重叠。

会话窗口是根据相邻时间间隔确定窗口边界

全局窗口必须定义触发器

在窗口内也可以进行其他的操作

  1. 窗口连接

两个数据源相同窗口内的连接

text.join(windowCounts)
        .where()
        .equalTo()
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .apply((e1,e2) => e1 + "," + e2)
0 1      2
0 1   2  3

0,1 0,1 1,0 1,0      2,2  3,2   一个窗口一个窗口内连接

间隔连接,以一个元素去连接一个窗口形成锥形

  text.keyBy(0).intervalJoin(windowCounts.keyBy(0))
      .between(Time.milliseconds(-2),Time.milliseconds(2))
        .process(new ProcessJoinFunction[Integer,Integer,String] {
          override def processElement(in1: Integer, in2: Integer, context: ProcessJoinFunction[Integer, Integer, String]#Context, collector: Collector[String]): Unit = {
            collector.collect(in1 + "," + in2)
          }
        })
    ```
    ```
0  1         6

0     2     3

以2 进行聚合 2,0 2,1 
  1. 数据分区

数据分区的好处是,如果分区数和算子数一致,则他们会直接运行到一个节点,通过内存进行传输,减少网络带宽的压力

自定义分区 :

text.partitionCustom(partitioner,"key")

使用shuffle() 进行均匀分区

text.shuffle()`

使用负载均衡的轮询调度算法进行数据分区

text.rebalance

可伸缩动态分区,使数据尽可能在一个slot内流转,减少网络开销

dataStream.rescale()

广播分区,每一个元素广播到下一个节点

text.broadcast()
  1. 资源共享

Flink 将多个任务连接成一个任务在一个线程中执行,以实现资源共享

(1) 创建链, 开启作业优化

dataStream.map(..).map(...).startNewChain().map(...)

(2) Slot共享组

在同一个组所有任务在同一个实例中运行

dataStream.map(...).slotSharingGroup("name")

(3) 关闭作业优化

dataStream.map(...).disableChaining()
  1. RichFunction函数

处理函数生命周期和获取函数上下文能力的算子

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
    private static final long serialVersionUID = 1L;
    private transient RuntimeContext runtimeContext;

    public AbstractRichFunction() {
    }
    
    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }
    // 得到上下文函数 
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

    public IterationRuntimeContext getIterationRuntimeContext() {
        if (this.runtimeContext == null) {
            throw new IllegalStateException("The runtime context has not been initialized.");
        } else if (this.runtimeContext instanceof IterationRuntimeContext) {
            return (IterationRuntimeContext)this.runtimeContext;
        } else {
            throw new IllegalStateException("This stub is not part of an iteration step function.");
        }
    }
    // 自定义初始化和销毁函数
    public void open(Configuration parameters) throws Exception {
    }

    public void close() throws Exception {
    }
}
  1. 触发器

基于事件的触发器

(1)onElement 窗口没收到一个元素,调用该方法

(2)onProcessingTime 根据注册处理时间进行触发,定时可以参数设定

(3)onEventTime 根据注册事件时间进行触发,定时可以参数设定

(4)onMerge 两个窗口合并时触发

  1. 清除器

在触发器后函数执行窗口前或者后执行清除的操作

evictor()可以在触发器后,窗口执行前或者后都可以触发

  1. 状态分类
val env = StreamExecutionEnvironment.getExecutionEnvironment()

env.setStateBackend(...)

设置状态后端,内存,JVM堆内存,JVM堆外内存,

9.检查点

检查点是Flink实现 exactly-once 语义的核心机制,启用检测点,需要:

(1) 支持时空穿梭的外部数据源, kafka 和 分布式文件系统

(2) 可持久化状态的外部存储, 如分布式文件系统。

检查点默认是关闭的,启用检查点需要配置 一致性级别, exactly-once

检测超时时间,

Kafka进行流计算实例

  1. 创建连接器

添加kafka source

  // 设置配置文件
          val properties = new Properties()
          properties.setProperty("bootstraps.servers","localhost:9092")
          properties.setProperty("zookeeper.connect","localhost:2181")

          // 设置消费组
          properties.setProperty("group.id","test")
          val myConsumer = new FlinkKafkaConsumer09[String]("topic",
                           new SimpleStringSchema(),properties)
          stream = env.addSource(myConsumer)
  1. 创建反序列化器
// https://mvnrepository.com/artifact/org.apache.flink/flink-avro
compile group: 'org.apache.flink', name: 'flink-avro', version: '1.7.1'
  1. 设置消息起始位置的偏移
    设置 据上一次的偏移位置
myConsumer.setStartFromGroupOffsets()

// 从最早和最晚开始记录

myConsumer.setStartFromEarliest()

myConsumer.setStartFromLatest()

// 从固定的时间点开始

myConsumer.setStartFromTimestamp(23L)

// 设置分区的起始位置

val specificStartOffsets = new java.util.HashMap[]
  1. 设置检查点
env.enableCheckpointing(5000)

相关文章

  • Flink 流计算算子函数详解

    Flink 的算子函数和spark的大致一样,但是由于其是流处理的模式,所有还要有需要加强理解的地方 Flink ...

  • Flink SQL 性能调优--MiniBatch提升吞吐率

    Flink 在设计和实现流计算算子时,把“面向状态编程”作为第一准则。因为在流计算中,为了保证状态(State)的...

  • Flink DataSet 迭代

    机器学习和图计算应用,都会使用到迭代计算,Flink 通过在迭代算子中定义 Step 函数来实现迭代算法,迭代算法...

  • Flink流式计算WordCountTopN

    Flink流式计算WordCountTopN可以采用流处理编程和FlinkSql自定义UDTF函数的方式 流处理编...

  • Flink基本原理

    1.支持流处理的引擎:Spark和Flink Spark:基于批处理来模拟流的计算;Flink:基于流计算来模拟批...

  • 大数据开发:Flink框架的State状态简介

    Flink的出现,主攻流计算场景,提供有状态的流计算,尤其是在面对大规模实时流计算上,性能值得称赞。而Flink的...

  • MATLAB滤波器

    fspecial 函数详解 Fspecial 函数用于创建预定义的滤波算子,h = fspecial(type,p...

  • 大数据开发:Flink中Checkpoint和Savepoint

    Flink作为实时流计算现行的代表框架之一,强大的计算性能,也非常符合当下的实时数据流计算的需求。在Flink内部...

  • Spark架构模式与Flink的对比

    Spark架构模式与Flink的对比 Spark和Flink都属于流批一体的分布式计算引擎。Flink属于流处理框...

  • 02-flink基本架构

    02-flink基本架构 flink基本组件栈 API&Libraries ​ 同时提供了流计算和批计算的接口...

网友评论

    本文标题:Flink 流计算算子函数详解

    本文链接:https://www.haomeiwen.com/subject/rryfgctx.html