美文网首页
第五章 DataStream API (基础篇)

第五章 DataStream API (基础篇)

作者: 井底蛙蛙呱呱呱 | 来源:发表于2022-11-19 19:53 被阅读0次

    一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上由以下几部分构成:

    • 获取执行环境(Execution Environment);
    • 读取数据源(Source);
    • 定义基于数据的转换操作(Transformations);
    • 定义计算结果的输出位置(Sink);
    • 触发执行程序;


    5.1 执行环境

    5.1.1 创建执行环境

    创建执行环境,通过调用StreamExecutionEnviroment类的的静态方法。具体有三种:

    • StreamExecutionEnvironment.getExecutionEnvironment,它会根据当前运行的上下文
      直接得到正确的结果;也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的
      运行环境;
    • StreamExecutionEnvironment.createLocalEnvironment, 这个方法返回一个本地执行环境;
    • StreamExecutionEnvironment.createRemoteEnvironment, 这个方法返回集群执行环境,调用时需要指定JobManager的主机号和端口号,并指定要运行的jar包;

    5.1.2 执行模式

    • 流执行模式(streaming);
    • 批执行模式(batch),有两种方式进行配置:
      • 命令行配置:bin/flink run -Dexecution.runtime-mode=BATCH ...;
      • 代码中进行配置:env.setRuntimeMode(RuntimeExcutionMode.BATCH);
    • 自动模式(automatic),在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

    5.2 数据源算子(SOURCE)

    Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source)。因此,Source就是整个处理程序的输入端。

    Flink有多种读取源数据的方式:

    // 定义一个模拟的用户行为样例类
    case class Event(user:String, url:String, timestamp:Long)
    
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // 1、从集合读取数据
    val clicks = List(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
    val stream1 = env.fromColletctions(clicks)
    // 也可以直接将元素列举出来通过fromElements进行读取数据
    val stream1 = env.fromElements(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
    
    // 2、从文件读取数据:可以是目录/文件,可以是hdfs文件,也可以是本地文件
    val stream2 = env.readTextFile("clicks.csv")
    
    // 3、从socket读取数据
    val stream3 = env.socketTextStream("localhost", 777)
    
    // 4、从kafka读取数据。需要添加依赖 连接工具 flink-connector-kafka
    // 创建 FlinkKafkaConsumer 时需要传入三个参数:
    // (1) topic,定义了从哪些主题中读取数据;
    // (2) 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema, 反序列化方式;
    // (3) Properties 对象,设置了 Kafka 客户端的一些属性;
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    // 创建kafka相关配置
    val properties = new Properties();
    properties.setProperty("bootstrap.servers", "hadoop102:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    //创建一个 FlinkKafkaConsumer 对象,传入必要参数,从 Kafka 中读取数据
    val stream = env.addSource(new FlinkKafkaConsumer[String](
      "clicks",
      new SimpleStringSchema(),
      properties
    ))
    

    上面介绍的是直接通过API读取数据源。另一种比较复杂的方式是自定义数据源,然后通过env.addSource进行读取。

    自定义数据源需要实现SourceFunction接口。主要需要重写两个关键方法:

    • run()方法,使用运行时上下文对象(SourceContext)向下游发送数据;
    • cancel()方法,通过标识位控制退出循环,来达到中断数据源的效果;
    package com.whu.chapter05
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    
    import java.util.Calendar
    import scala.util.Random
    
    
    // 调用
    // val stream = env.addSource(new ClickSource)
    
    
    case class Event(user: String, url: String, timestamp: Long)
    
    // 实现 SourceFunction 接口,接口中的泛型是自定义数据源中的类型
    class ClickSource(sleepTime:Long=1000L) extends SourceFunction[Event] {
      // 标志位,用来控制循环的退出
      var running = true
    
      // 重写run方法,使用上下文对象sourceContext调用collect方法
      override def run(ctx: SourceContext[Event]): Unit = {
        // 实例化一个随机数发生器
        val random = new Random()
        // 供随机选择的用户名数组
        val users = Array("Marry", "Bob", "Jack", "Cary")
        // 供选择的url数组
        val urls = Array("./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2")
    
        // 通过while循环发送数据,running默认为true,所以会一直发送数据
        while (running) {
          // 调用collect方法向下游发送数据
          ctx.collect(Event(
            users(random.nextInt(users.length)),
            urls(random.nextInt(urls.length)),
            Calendar.getInstance.getTimeInMillis // 当前时间戳
          ))
          // 每隔一秒生成一个点击事件,方便观测
          Thread.sleep(sleepTime)
        }
      }
    
      override def cancel(): Unit = {
        // 通过将running设置为false来终止数据发送
        running = false
      }
    }
    

    5.3 转换算子(Transformation)

    数据源读入数据之后,我们就可以使用各种转换算子,讲一个或多个DataStream转换为新的DataStream。

    5.3.1 基本转换算子

    • map, 一个个进行数据转换;
    • filter, 对数据进行过滤;
    • flatmap, 扁平映射,可以理解为先map然后进行flatten;

    5.3.2 聚合算子(Aggregation)

    • keyBy, 按键分区。对于Flink来说,DataStream是没有直接进行觉得API的。要做聚合需要先进行分区,这个操作就是通过keyBy来完成的。keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合。对于 POJO 类型或 Scala 的样例类,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。
    • 简单聚合,sum、min、max、minBy、maxBy等。都是在指定字段上进行聚合操作。min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。

    指定字段的方式有两种:指定位置,和指定名称。元组通过位置,样例类通过字段名称。

    keyBy得到的数据流一般称为KeyedStream。而聚合操作则会将KeyedStream转换为DataStream。

    规约聚合(reduce)

    与简单聚合类似,reduce操作也会将KeyedStream转换为DataStream。他不会改变流的元素数据类型,输入输出是一致的。

    reduce方法来自ReduceFunction接口,该方法接收两个输入事件,经过处理后输出一个相同数据类型的事件。

    一个简单的栗子:

    import org.apache.flink.streaming.api.scala._
    
    object TransformationDemo {
      def main(args:Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        // 添加自定义数据源
        env.addSource(new ClickSource)
          .map(r => (r.user, 1L))
          // 按照用户进行分组
          .keyBy(_._1)
          // 计算每个用户的访问频次
          .reduce((r1, r2) => (r1._1, r1._2+r2._2))
          // 将所有数据分到同一个分区
          .keyBy(_ => true)
          // 通过reduce实现max功能,计算访问频次最高的用户
          .reduce((r1, r2)=> if(r1._2>r2._2) r1 else r2)
          .print()
        
        // 更简单的方法是直接keyBy然后sum然后maxBy就行了,这里只是为了演示reduce用法
        env.execute()
      }
    }
    

    5.3.3 用户自定义函数(UDF)

    Flink的DataStream API编程风格其实是一致的:基本都是基于DataStream调用一个方法,表示要做一个转换操作;方法需要传入一个参数,这个参数都是需要实现一个接口。

    这个接口有一个共同特定:全部都以算子操作名称 + Function命名,如数据源算子需要实现SourceFunction接口,map算子需要实现MapFunction接口。我们可以通过三种方式来实现接口。这就是所谓的用户自定义函数(UDF)。

    • 自定义函数类;
    • 匿名类;
    • lambda表达式;

    接下来对这三种编程方式做一个梳理。

    函数类(Function Classes)
    package com.whu.chapter05
    
    import org.apache.flink.api.common.functions.FilterFunction
    import org.apache.flink.streaming.api.scala._
    
    object TransformationUDFDemo {
     def main(args:Array[String]): Unit = {
    
       // 自定义filterFunction类, 并接受额外的参数
       class MyFilter(key:String) extends FilterFunction[Event] {
         override def filter(t: Event): Boolean = {
           t.url.contains(key)
         }
       }
    
       val env = StreamExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(1)
    
       // 通过自定义函数类
       val stream1 = env.addSource(new ClickSource)
         .filter(new MyFilter("home"))
    
       // 通过匿名类
       val stream2 = env.addSource(new ClickSource)
         .filter(new FilterFunction[Event]{
           override def filter(t: Event): Boolean = {
             t.url.contains("home")
           }
         })
    
       // 最简单的lambda 表达式
       val stream3 = env.addSource(new ClickSource)
         .filter(_.url.contains("home"))
       
       stream1.print("stream1")
       stream2.print("stream2")
       stream3.print("stream3")
       
       env.execute()
     }
    }
    
    富函数类(Rich Function Classes)

    富函数类也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是已抽象类的形式出现的。例如:RichMapFunction,RichFilterFunction,RichReduceFunction等。

    与常规函数类的不同主要在于富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

    典型的生命周期方法有:

    • open方法,是RichFunction的初始化方法,会开启一个算子的生命周期。当一个算子的实际工作方法如map、filter等方法被调用之前,open会首先被调用。所以像文件IO流、数据库连接、配置文件读取等等这样一次性的工作,都适合在open方法中完成;
    • close方法,是生命周期中最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。

    open、close等生命周期方法对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,如map,对于每一条数据都会调用一次。

    package com.whu.chapter05
    
    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala._
    
    object RichFunctionDemo {
      def main(args:Array[String]) : Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(2)
    
        env.addSource(new ClickSource(10000))
          .map(new RichMapFunction[Event, Long] {
            // 在任务生命周期开始时会执行open方法,在控制台打印对应语句
            override def open(parameters: Configuration): Unit = {
              println(s"索引为 ${getRuntimeContext.getIndexOfThisSubtask} 的任务开始")
            }
            override def map(in: Event): Long = {
              in.timeStamp
            }
    
            override def close(): Unit = {
              println(s"索引为 ${getRuntimeContext.getIndexOfThisSubtask} 的任务结束")
            }
          }).print()
        
        env.execute()
      }
    }
    

    在上面的例子中可以看到,富函数类提供了getRuntimeContex方法,可以获取运行时上下文信息,如程序执行的并行度,任务名称,任务状态等。

    5.3.4 物理分区(Physical Partitioning)

    分区(partitioning)操作就是要将数据进行重新分布,传递到不同的流分区去进行下一步计算。keyBy是一种逻辑分区(logic partitioning)操作。

    Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作算子,能够帮我们实现数据流的手动重分区。为了同 keyBy()相区别,我们把这些操作统称为“物理分区”操作。

    常见的物理分区策略有随机分区、轮询分区、重缩放和广播,还有一种特殊的分区策略— —全局分区,并且 Flink 还支持用户自定义分区策略,下边我们分别来做了解。

    随机分区(shuffle)

    最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的 shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

    随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。

    轮询分区(Round-Robin)

    轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用 DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance()使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。


    重缩放分区(rescale)

    重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,也就是说,“发牌人”如果有多个,那么 rebalance()的方式是每个发牌人都面向所有人发牌;而rescale()的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。



    当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时,rescale()的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。

    广播(broadcast)

    这种方式其实不应该叫作“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

    全局分区(global)

    全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

    自定义分区

    当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。
    在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector 接口。

    栗子:

    package com.whu.chapter05
    
    import org.apache.flink.api.common.functions.Partitioner
    import org.apache.flink.streaming.api.scala._
    
    object PartitioningDemo {
      def main(args:Array[String]) : Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // 读取数据源
        val stream = env.addSource(new ClickSource())
    
        // 随机分区(shuffle)
        stream.shuffle.print("shuffle").setParallelism(4)
    
        // 轮询分区(rebalance, Round-Robin)
        stream.rebalance.print("rebalance").setParallelism(4)
    
        // 重缩放分区(rescale)
        stream.rescale.print("rescale").setParallelism(4)
    
        // 广播 (broadcast)
        stream.broadcast.print("broadcast").setParallelism(4)
    
        // 全局分区(global)
        stream.global.print("global").setParallelism(4)
    
        // 自定义分区
        stream.partitionCustom(new Partitioner[Event] {
          // 根据 key 的奇偶性计算出数据将被发送到哪个分区
          override def partition(k: Event, i: Int): Int = {
            k.timeStamp.toInt % 2
          }
        }, "user"
        ).print()
        
        env.execute()
      }
    }
    

    5.4 输出算子(Sink)

    5.4.1 连接到外部系统

    Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个Sink算子,主要就是用来实现与外部系统链接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

    与addSource类似,addSink也支持自定义sink算子SinkFunction。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。Flink官方提供了诸多第三方系统连接器:


    除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一
    些其他第三方系统与 Flink 的连接器:


    5.4.2 输出到文件

    Flink有一些非常简单粗暴的输出到文件的预实现方法,如writeAsCsv等,目前这些简单的方法已经要被弃用。

    Flink专门提供了一个流式文件系统连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且继承了Flink的检查点机制,用来确保精确一次(exactly)的一致性语义。

    StreamingFileSink支持行编码(row-encoded)和批量编码(bulk-encoded,比如parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法如下:

    • 行编码:StreamingFileSink.forRowFormat (basePath, rowEncoder);
    • 批量编码:StreamingFileSink.forBulkFormat (basePath,bulkWriterFactory);

    在创建行或批量Sink时,我们需要传入两个参数,用来指定存储桶的基本路径和数据的编码逻辑。

    package com.whu.chapter05
    
    import org.apache.flink.api.common.serialization.SimpleStringEncoder
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
    
    import java.util.concurrent.TimeUnit
    
    
    object SinkToFileDemo {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        
        val stream = env.addSource(new ClickSource())
        
        val fileSink = StreamingFileSink.forRowFormat(
          new Path("./output"),
          new SimpleStringEncoder[String]("UTF-8")
        )
          // 通过.withRollingPolicy()方法指定滚动逻辑
          .withRollingPolicy(
            DefaultRollingPolicy.builder()
              .withMaxPartSize(1024*1024*1024)
              .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
              .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
              .build()
          ).build()
        
        stream.map(_.toString).addSink(fileSink)
      }
    }
    

    上面创建了一个简单的文件 Sink,通过 withRollingPolicy()方法指定了一个“滚动策略”。上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:

    • 至少包含 15 分钟的数据;
    • 最近 5 分钟没有收到新的数据;
    • 文件大小已达到1GB;

    输出到其他系统

    略。

    参考:
    FLink教程

    相关文章

      网友评论

          本文标题:第五章 DataStream API (基础篇)

          本文链接:https://www.haomeiwen.com/subject/gfpbxdtx.html