一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上由以下几部分构成:
- 获取执行环境(Execution Environment);
- 读取数据源(Source);
- 定义基于数据的转换操作(Transformations);
- 定义计算结果的输出位置(Sink);
-
触发执行程序;
5.1 执行环境
5.1.1 创建执行环境
创建执行环境,通过调用StreamExecutionEnviroment
类的的静态方法。具体有三种:
-
StreamExecutionEnvironment.getExecutionEnvironment
,它会根据当前运行的上下文
直接得到正确的结果;也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的
运行环境; -
StreamExecutionEnvironment.createLocalEnvironment
, 这个方法返回一个本地执行环境; -
StreamExecutionEnvironment.createRemoteEnvironment
, 这个方法返回集群执行环境,调用时需要指定JobManager的主机号和端口号,并指定要运行的jar包;
5.1.2 执行模式
- 流执行模式(streaming);
- 批执行模式(batch),有两种方式进行配置:
- 命令行配置:
bin/flink run -Dexecution.runtime-mode=BATCH ...
; - 代码中进行配置:
env.setRuntimeMode(RuntimeExcutionMode.BATCH)
;
- 命令行配置:
- 自动模式(automatic),在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
5.2 数据源算子(SOURCE)
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source)。因此,Source就是整个处理程序的输入端。
Flink有多种读取源数据的方式:
// 定义一个模拟的用户行为样例类
case class Event(user:String, url:String, timestamp:Long)
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 1、从集合读取数据
val clicks = List(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
val stream1 = env.fromColletctions(clicks)
// 也可以直接将元素列举出来通过fromElements进行读取数据
val stream1 = env.fromElements(Event("Mary", "/.home", 1000L), Event("Bob", "/.cart", 2000L))
// 2、从文件读取数据:可以是目录/文件,可以是hdfs文件,也可以是本地文件
val stream2 = env.readTextFile("clicks.csv")
// 3、从socket读取数据
val stream3 = env.socketTextStream("localhost", 777)
// 4、从kafka读取数据。需要添加依赖 连接工具 flink-connector-kafka
// 创建 FlinkKafkaConsumer 时需要传入三个参数:
// (1) topic,定义了从哪些主题中读取数据;
// (2) 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema, 反序列化方式;
// (3) Properties 对象,设置了 Kafka 客户端的一些属性;
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
// 创建kafka相关配置
val properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//创建一个 FlinkKafkaConsumer 对象,传入必要参数,从 Kafka 中读取数据
val stream = env.addSource(new FlinkKafkaConsumer[String](
"clicks",
new SimpleStringSchema(),
properties
))
上面介绍的是直接通过API读取数据源。另一种比较复杂的方式是自定义数据源,然后通过env.addSource
进行读取。
自定义数据源需要实现SourceFunction
接口。主要需要重写两个关键方法:
-
run()
方法,使用运行时上下文对象(SourceContext)向下游发送数据; -
cancel()
方法,通过标识位控制退出循环,来达到中断数据源的效果;
package com.whu.chapter05
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import java.util.Calendar
import scala.util.Random
// 调用
// val stream = env.addSource(new ClickSource)
case class Event(user: String, url: String, timestamp: Long)
// 实现 SourceFunction 接口,接口中的泛型是自定义数据源中的类型
class ClickSource(sleepTime:Long=1000L) extends SourceFunction[Event] {
// 标志位,用来控制循环的退出
var running = true
// 重写run方法,使用上下文对象sourceContext调用collect方法
override def run(ctx: SourceContext[Event]): Unit = {
// 实例化一个随机数发生器
val random = new Random()
// 供随机选择的用户名数组
val users = Array("Marry", "Bob", "Jack", "Cary")
// 供选择的url数组
val urls = Array("./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2")
// 通过while循环发送数据,running默认为true,所以会一直发送数据
while (running) {
// 调用collect方法向下游发送数据
ctx.collect(Event(
users(random.nextInt(users.length)),
urls(random.nextInt(urls.length)),
Calendar.getInstance.getTimeInMillis // 当前时间戳
))
// 每隔一秒生成一个点击事件,方便观测
Thread.sleep(sleepTime)
}
}
override def cancel(): Unit = {
// 通过将running设置为false来终止数据发送
running = false
}
}
5.3 转换算子(Transformation)
数据源读入数据之后,我们就可以使用各种转换算子,讲一个或多个DataStream转换为新的DataStream。
5.3.1 基本转换算子
-
map
, 一个个进行数据转换; -
filter
, 对数据进行过滤; -
flatmap
, 扁平映射,可以理解为先map然后进行flatten;
5.3.2 聚合算子(Aggregation)
-
keyBy
, 按键分区。对于Flink来说,DataStream是没有直接进行觉得API的。要做聚合需要先进行分区,这个操作就是通过keyBy来完成的。keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合。对于 POJO 类型或 Scala 的样例类,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。 - 简单聚合,sum、min、max、minBy、maxBy等。都是在指定字段上进行聚合操作。min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。
指定字段的方式有两种:指定位置,和指定名称。元组通过位置,样例类通过字段名称。
keyBy得到的数据流一般称为KeyedStream。而聚合操作则会将KeyedStream转换为DataStream。
规约聚合(reduce)
与简单聚合类似,reduce操作也会将KeyedStream转换为DataStream。他不会改变流的元素数据类型,输入输出是一致的。
reduce方法来自ReduceFunction
接口,该方法接收两个输入事件,经过处理后输出一个相同数据类型的事件。
一个简单的栗子:
import org.apache.flink.streaming.api.scala._
object TransformationDemo {
def main(args:Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 添加自定义数据源
env.addSource(new ClickSource)
.map(r => (r.user, 1L))
// 按照用户进行分组
.keyBy(_._1)
// 计算每个用户的访问频次
.reduce((r1, r2) => (r1._1, r1._2+r2._2))
// 将所有数据分到同一个分区
.keyBy(_ => true)
// 通过reduce实现max功能,计算访问频次最高的用户
.reduce((r1, r2)=> if(r1._2>r2._2) r1 else r2)
.print()
// 更简单的方法是直接keyBy然后sum然后maxBy就行了,这里只是为了演示reduce用法
env.execute()
}
}
5.3.3 用户自定义函数(UDF)
Flink的DataStream API编程风格其实是一致的:基本都是基于DataStream调用一个方法,表示要做一个转换操作;方法需要传入一个参数,这个参数都是需要实现一个接口。
这个接口有一个共同特定:全部都以算子操作名称 + Function命名,如数据源算子需要实现SourceFunction
接口,map算子需要实现MapFunction
接口。我们可以通过三种方式来实现接口。这就是所谓的用户自定义函数(UDF)。
- 自定义函数类;
- 匿名类;
- lambda表达式;
接下来对这三种编程方式做一个梳理。
函数类(Function Classes)
package com.whu.chapter05
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
object TransformationUDFDemo {
def main(args:Array[String]): Unit = {
// 自定义filterFunction类, 并接受额外的参数
class MyFilter(key:String) extends FilterFunction[Event] {
override def filter(t: Event): Boolean = {
t.url.contains(key)
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 通过自定义函数类
val stream1 = env.addSource(new ClickSource)
.filter(new MyFilter("home"))
// 通过匿名类
val stream2 = env.addSource(new ClickSource)
.filter(new FilterFunction[Event]{
override def filter(t: Event): Boolean = {
t.url.contains("home")
}
})
// 最简单的lambda 表达式
val stream3 = env.addSource(new ClickSource)
.filter(_.url.contains("home"))
stream1.print("stream1")
stream2.print("stream2")
stream3.print("stream3")
env.execute()
}
}
富函数类(Rich Function Classes)
富函数类也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是已抽象类的形式出现的。例如:RichMapFunction,RichFilterFunction,RichReduceFunction等。
与常规函数类的不同主要在于富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
典型的生命周期方法有:
-
open
方法,是RichFunction的初始化方法,会开启一个算子的生命周期。当一个算子的实际工作方法如map、filter等方法被调用之前,open会首先被调用。所以像文件IO流、数据库连接、配置文件读取等等这样一次性的工作,都适合在open方法中完成; -
close
方法,是生命周期中最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。
open、close等生命周期方法对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,如map,对于每一条数据都会调用一次。
package com.whu.chapter05
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object RichFunctionDemo {
def main(args:Array[String]) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
env.addSource(new ClickSource(10000))
.map(new RichMapFunction[Event, Long] {
// 在任务生命周期开始时会执行open方法,在控制台打印对应语句
override def open(parameters: Configuration): Unit = {
println(s"索引为 ${getRuntimeContext.getIndexOfThisSubtask} 的任务开始")
}
override def map(in: Event): Long = {
in.timeStamp
}
override def close(): Unit = {
println(s"索引为 ${getRuntimeContext.getIndexOfThisSubtask} 的任务结束")
}
}).print()
env.execute()
}
}
在上面的例子中可以看到,富函数类提供了getRuntimeContex
方法,可以获取运行时上下文信息,如程序执行的并行度,任务名称,任务状态等。
5.3.4 物理分区(Physical Partitioning)
分区(partitioning)操作就是要将数据进行重新分布,传递到不同的流分区去进行下一步计算。keyBy是一种逻辑分区(logic partitioning)操作。
Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作算子,能够帮我们实现数据流的手动重分区。为了同 keyBy()相区别,我们把这些操作统称为“物理分区”操作。
常见的物理分区策略有随机分区、轮询分区、重缩放和广播,还有一种特殊的分区策略— —全局分区,并且 Flink 还支持用户自定义分区策略,下边我们分别来做了解。
随机分区(shuffle)
最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的 shuffle()方法,将数据随机地分配到下游算子的并行任务中去。
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。
轮询分区(Round-Robin)
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用 DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance()使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,也就是说,“发牌人”如果有多个,那么 rebalance()的方式是每个发牌人都面向所有人发牌;而rescale()的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时,rescale()的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。
广播(broadcast)
这种方式其实不应该叫作“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。
自定义分区
当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector 接口。
栗子:
package com.whu.chapter05
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala._
object PartitioningDemo {
def main(args:Array[String]) : Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取数据源
val stream = env.addSource(new ClickSource())
// 随机分区(shuffle)
stream.shuffle.print("shuffle").setParallelism(4)
// 轮询分区(rebalance, Round-Robin)
stream.rebalance.print("rebalance").setParallelism(4)
// 重缩放分区(rescale)
stream.rescale.print("rescale").setParallelism(4)
// 广播 (broadcast)
stream.broadcast.print("broadcast").setParallelism(4)
// 全局分区(global)
stream.global.print("global").setParallelism(4)
// 自定义分区
stream.partitionCustom(new Partitioner[Event] {
// 根据 key 的奇偶性计算出数据将被发送到哪个分区
override def partition(k: Event, i: Int): Int = {
k.timeStamp.toInt % 2
}
}, "user"
).print()
env.execute()
}
}
5.4 输出算子(Sink)
5.4.1 连接到外部系统
Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个Sink算子,主要就是用来实现与外部系统链接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
与addSource类似,addSink也支持自定义sink算子SinkFunction。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。Flink官方提供了诸多第三方系统连接器:
除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一
些其他第三方系统与 Flink 的连接器:
5.4.2 输出到文件
Flink有一些非常简单粗暴的输出到文件的预实现方法,如writeAsCsv等,目前这些简单的方法已经要被弃用。
Flink专门提供了一个流式文件系统连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且继承了Flink的检查点机制,用来确保精确一次(exactly)的一致性语义。
StreamingFileSink支持行编码(row-encoded)和批量编码(bulk-encoded,比如parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法如下:
- 行编码:StreamingFileSink.forRowFormat (basePath, rowEncoder);
- 批量编码:StreamingFileSink.forBulkFormat (basePath,bulkWriterFactory);
在创建行或批量Sink时,我们需要传入两个参数,用来指定存储桶的基本路径和数据的编码逻辑。
package com.whu.chapter05
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.streaming.api.scala._
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import java.util.concurrent.TimeUnit
object SinkToFileDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new ClickSource())
val fileSink = StreamingFileSink.forRowFormat(
new Path("./output"),
new SimpleStringEncoder[String]("UTF-8")
)
// 通过.withRollingPolicy()方法指定滚动逻辑
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(1024*1024*1024)
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.build()
).build()
stream.map(_.toString).addSink(fileSink)
}
}
上面创建了一个简单的文件 Sink,通过 withRollingPolicy()方法指定了一个“滚动策略”。上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:
- 至少包含 15 分钟的数据;
- 最近 5 分钟没有收到新的数据;
- 文件大小已达到1GB;
输出到其他系统
略。
参考:
FLink教程
网友评论