Flink 原理架构
下图是官网的一个架构图,有以下特点:
1:数据源有实时数据和非实时数据,比如数据库、文件系统等
2:
用来做什么
-
处理有边界和无边界的数据
image.png
1:无边界数据流(Unbounded streams)
2:有边界数据流(bounded Streams)
部署
可以和各种流行的分布式资源管理工具整合,比如: Hadoop YARN, Apache Mesos, and Kubernetes
利用内存计算
Flink 主要使用内存来计算。
因此延迟低,
Flink 周期性的异步的把内存中的状态持久化,来保证出现故障的时候能够保证 exactly-onece 的效果。
image.png
Flink的一些入门操作
Flink是什么
Flink 作为新一代流式大数据处理框架,已获得阿里、美团等诸多大厂的青睐
流处理并不是一个新概念,但是要做好并不是一件容易的事情。提到流处理,我们最先想到的可能是金融交易、信号检测以及地图导航等领域的应用。但是近年来随着信息技术的发展,除了前面提到的三个领域,其它方向对数据时效性的要求也越来越高。随着 Hadoop 生态的崛起,Storm、Spark Streaming、Samza、MillWheel 等一众流处理技术开始走入大众视野,但是我们最熟悉的应该还是 Storm 和 Spark Steaming。
“高吞吐”、“低延迟”和”exactly-once“是衡量一个流处理框架的重要指标。 Storm 虽然提供了低延迟的流处理,但是在高吞吐方面的表现并不算佳,可以说基本满足不了日益暴涨的数据量,而且也没办法保证精准一次消费。Spark Streaming 中通过微批次的批处理来模拟流处理,只要当批处理的批次分的足够小,那么从宏观上来看就是流处理,这也是 Spark Steaming 的核心思想。通过微观批处理的方式,Spark Streaming 也实现了高吞吐和 exactly-once 语义,时效性也有了大幅提升,在很长一段时间里占据流处理榜首。但是受限于其实现方式,依然存在几秒的延迟,对于那些实时性要求较高的领域来说依然不够完美。 在这样的背景下,Flink 应用而生。
Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架,用于对无界和有界数据流进行有状态计算。Flink 最早起源于在 2010 ~ 2014 年,由 3 所地处柏林的大学和欧洲的一些其它大学共同进行研究的名为 Stratosphere 的项目。2014 年 4 月 Stratosphere 将其捐赠给 Apache 软件基 金会, 初始成员是 Stratosphere 系统的核心开发人员,2014 年 12 月,Flink 一跃成为 Apache 软件基金会的顶级项目。在 2015 年,阿里也加入到了 Flink 的开发工作中,并贡献了至少 150 万行代码。
Flink 一词在德语中有着“灵巧”、“快速”的意思,它的 logo 原型也是柏林常见的一种松鼠,以身材娇小、灵活著称,为该项目取这样的名字和选定这样的 logo 也正好符合 Flink 的特点和愿景。
注意,虽然我们说 Flink 是一个流处理框架,但是它同样可以进行批处理。因为在 Flink 的世界观里,批处理是流处理的一种特殊形式,这和 Spark 不同,在 Spark 中,流处理是通过大批量的微批处理实现的。
Flink入门的一些知识点
image.pngFlink本地安装及启动和任务提交
下载flink,到flink官网下载
然后:
$ tar -xzf flink-1.13.2-bin-scala_2.11.tgz
$ cd flink-1.13.2-bin-scala_2.11
启动flink
$ ./bin/start-cluster.sh
提交任务
$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
关闭集群
$ ./bin/stop-cluster.sh
Flink界面
http://localhost:8081/
搭建Flink开发环境
建立IDEA maven项目
image.png
image.png
在Java同级目录下新建scala目录,并设置为source folder
image.png
右键点击项目,增加Scala支持
image.png
接下来建立两个 Scala的 Object:
image.png
然后配置maven以来和build内容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kenian.test</groupId>
<artifactId>FlinkLearning</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
注意,高版本的flink,需要增加下面的依赖才能够在本地启动
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.0</version>
</dependency>
注意,如果要连接kafka,可以加上
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.9.1</version>
</dependency>
编写批处理代码并启动
package test.kenian
import org.apache.flink.api.scala._
object BatchWordCount {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 从文本读取数据
val inputPath = "D:/ubuntu_linux_base/words.txt"
val inputDS: DataSet[String] = env.readTextFile(inputPath)
// 计算逻辑
val wordCountDS: AggregateDataSet[(String, Int)] = inputDS
.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0)
.sum(1)
// 打印输出
wordCountDS.print()
}
}
直接IDEA启动,得到下面的结果
image.png
编写流代码并且启动
package test.kenian
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 监控Socket数据
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
// 导入隐式转换
import org.apache.flink.api.scala._
// 计算逻辑
val dataStream: DataStream[(String, Int)] = textDstream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
// 设置并行度
dataStream.print().setParallelism(1)
// 执行
env.execute("Socket stream word count")
}
}
STEP1
执行 下面命令:
nc -l -p 9999
STEP2:
启动Steaming代码
STEP3
在nc 命令下数据数据,然后查看结果
hello world
hello flink
hello spark
hello java
输出结果如下:
(hello,1)
(flink,1)
(hello,2)
(spark,1)
(java,1)
(hello,3)
Flink算子
基础算子
Map
Filter
Flatmap
image.png
package test.kenian.operator
import org.apache.flink.api.scala._
object BaseOperator {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val data = env.fromCollection(List(1, 2, 3, 4, 5))
println("--------- map --------------")
//map 算子
val data2 = data.map( x => x * 10)
data2.print()
println("--------- filter --------------")
// filter 算子
val data3 = data.filter( x => x >= 3)
data3.print()
// flatmap 算子
println("--------- flatmap --------------")
// flatmap 最终会导致数据行数的改变,但是如果是map不会导致这儿问题
val dataStr = env.fromCollection(List("a b","c d","e f"))
val dataStr2 = dataStr.flatMap(_.split(" "))
dataStr2.print()
println("--------- map --------------")
val dataStr1 = dataStr.map(_.split(" "))
dataStr1.print()
}
}
结果如下:
--------- map --------------
10
20
30
40
50
--------- filter --------------
3
4
5
--------- flatmap --------------
a
b
c
d
e
f
--------- map --------------
[Ljava.lang.String;@5ec46cdd
[Ljava.lang.String;@2324bfe7
[Ljava.lang.String;@112d1c8e
基于Key的算子
基于Key的算子分为三个大类:
• KeyBy
• Rolling Aggregation
o sum
o min
o max
o minBy
o maxBy
• reduce
KeyBy算子
该算子和其他算子一般一起使用,比如和Sum,看下面代码
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object KeyByOperator {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data: DataStream[String] = env.readTextFile("D:/ubuntu_linux_base/userlog.txt")
val userLogStream: DataStream[UserLog] = data.map(fun = log => {
val arr: Array[String] = log.split(" ")
UserLog(arr(0), arr(1), arr(2).toInt)
})
userLogStream
.keyBy("city")
.sum("duration")
.print()
env.execute("Key By Operator")
}
case class UserLog(name: String, city: String, duration: Int)
}
执行结果如下:
6> UserLog(Jack,Beijing,100)
16> UserLog(Joker,Shanghai,200)
18> UserLog(William,Chengdu,600)
18> UserLog(William,Chengdu,900)
6> UserLog(Jack,Beijing,500)
16> UserLog(Joker,Shanghai,400)
原始数据如下:
Jack Beijing 100
Bob Chengdu 300
William Chengdu 600
Lily Shanghai 200
Loius Beijing 400
Joker Shanghai 200
你可能会疑惑,输出的不应该是有两列,city 和 sum(duration)吗?请注意,我们这里的计算是流处理,而不是离线的批处理,我们创建的环境是 StreamExecutionEnvironment。程序从上往下一行一行读取文本,然后按照 city 字段分组,当执行到第一行的时候,只有它自己,所以输出自己本身。当执行到第二行的时候,city为 Chengdu 的也只有一行,所以也输出了它自己。当程序执行到第三行的时候,第二个 Chengdu 出现了,所以 sum 的结果是 900(300 + 600)。当程序执行到第四行的时候,Shanghai 第一次出现,所以也只有它自己。当程序执行到第 5 行的时候,第二个 Beijing 出现了,所以输出的是 500(100 + 400)。当程序执行到第 6 行,第二个 Shanghai 出现了,所以输出的是 400(200 + 200)。
maxBy算子
和max的区别是,max只会展示对应值的最大值。但是maxBy会替换整行的值。
有点类似于,max是属于开窗,独立出来一个值用于存储最大值。
MaxBy 是直接找到最大的那行并保留下来。
Reduce算子
代码如下:
userLogStream
.keyBy("city")
.reduce((x, y) => {
UserLog(y.name, y.city, x.duration + y.duration)
}).print
可以对数据做出聚合的效果。
多流转换算子
Union 合并数据流
作用是把多个流的数据合并到一起,比如从两个系统取的同一种交易信息,则可以合并到一起进行处理。
样例代码,健康两个端口 9999 9998 ,使用nc命令发送消息,然后把消息展示出来。查看效果
ackage test.kenian.operator
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object UnionOperator {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 接收淘宝的订单信息
val taobaoOrder: DataStream[String] = env.socketTextStream("localhost", 9998)
// 接收天猫的订单信息
val tianmaoOrder: DataStream[String] = env.socketTextStream("localhost", 9999)
// 把两个环境的信息合并
val dataStream: DataStream[String] = taobaoOrder.union(tianmaoOrder)
// 设置并行度
// 设置并行度
dataStream.print().setParallelism(1)
// 执行
env.execute("Socket stream word count")
}
}
Split select 分离数据流
Split 和 Select是完全想法的一种做法,
Split 可以把现有的一个数据流分为多个,然后通过select 可以单独的查询分裂出来的某个数据流。
这里有个小DEMO,就是根据温度把不同的人放到不同的数据流里面,比如说检查出高危新冠人群。
package test.kenian.operator
import org.apache.flink.streaming.api.scala._
object SelectOperator {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 接收天猫的订单信息
val dataStream: DataStream[String] = env.socketTextStream("localhost", 9999)
val splitStream = dataStream.map(line => {
val arr: Array[String] = line.split(" ")
People(arr(0), arr(1).toFloat)
}).split(people => {
if (people.temperature > 36) Seq("fever") else Seq("normal")
})
val normal: DataStream[People] = splitStream.select("normal")
val fever: DataStream[People] = splitStream.select("fever")
// normal.print()
fever.print()
// 执行
env.execute("Socket Stream Split and Select")
}
case class People(name: String, temperature: Float)
}
以上代码,使用nc发送的数据个人的温度大于36都会被上报
Source 和 Sink
Source
Flink自带常见source有:
• env.readTextFile()
• env.socketTextStream()
• env.fromCollection()
• env.fromElement()
• env.generateSequence()
自定义source
但是实际工作中,更加常见的是使用一些第三方的工具作为source,比如kafka
Kafka其实是自定义source的一种。自定义source的原理如下:
package test.kenian.source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object Source {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.addSource(new MySource())
data.setParallelism(1).print()
env.execute()
}
/**
* 自定义source,该source实现两个方法
* 分别是断开方法和搜集数据方法
*
*/
class MySource extends SourceFunction[Integer] {
var flag = true
override def run(ctx: SourceFunction.SourceContext[Integer]): Unit = {
var count = 0
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
while (flag) {
for (elem <- list) {
ctx.collect(elem) // 如果把数据导入到flink中,这是个桥梁。如果自定义source,需要在这个地方进行控制,如何把数据放入到 sourceContext中
count += 1
if (count == 300) cancel()
}
}
}
override def cancel(): Unit = {
flag = false
}
}
}
下面代码中自定义了一个source,该source被flink流式处理。一个会发送300个数字过去。
然后在代码中使用了该source。通过该source接收数据。
Kafka source
启动kafka
到kafka的安装目录
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
在kafka上建立topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic shiyanlou
在topic中写入数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic shiyanlou
然后可以手写的方式随便写一些
实验是否可以通过命令读取数据
bin/kafka-console-consumer.sh --topic shiyanlou --from-beginning --bootstrap-server localhost:9092
通过Java代码读取kafka
这里需要注意一个非常重要的点,因为我的本地没有配置远程机器的别名什么的,导致没法连接上他们的环境。因此需要配置kafka的这两个参数:
advertised.host.name=127.0.0.1
advertised.port=9092
这两个参数要根据自己的实际情况重新配置,我因为是本地因此配置的就是127.0.0.1
Java代码如下:
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
val prop = new Properties()
prop.setProperty("bootstrap.servers", "localhost:9092")
prop.setProperty("zookeeper.connect", "localhost:2181")
prop.setProperty("key.deserializer", classOf[StringDeserializer].getName)
prop.setProperty("value.deserializer", classOf[StringDeserializer].getName)
prop.setProperty("auto.offset.reset", "latest")
// 添加 Kafka Source
val data = env.addSource(new FlinkKafkaConsumer[String]("shiyanlou", new SimpleStringSchema(), prop))
data.print()
env.execute()
}
}
Sink
常见flink自带sink
• writeAsText
• writeAsCsv
• writeToSocket
注意,生成文件的个数和并行度分区数相关
自定义sink
代码如下,类似source,flink和sink对象的沟通通过的是 SinkFunction.Context
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object FlinkSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.readTextFile("D:/ubuntu_linux_base/score.txt")
data.addSink(new MySink())
env.execute()
}
class MySink extends SinkFunction[String] {
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
val time = context.currentProcessingTime()
val waterMark = context.currentWatermark()
println(s"$value : $time : $waterMark")
}
}
}
我们可以从该对象中取出对应的结果值,然后打印出来
Flink状态管理
什么叫flink的状态
有状态的计算是流处理框架中的重要功能之一,因为很多复杂的业务场景都会涉及到数据前后状态。Flink 本身也是带状态的流处理引擎,如果你在此之前有看过 Flink 的官方文档,应该有注意到Stateful这个关键词。本节实验我们就重点学习 Flink 中的状态(State)相关知识点。
状态相关知识点
- State 分类
- Keyed State
- Operator State
- Checkpoint
- StateBackend
State的两种类型:Keyed State 和 Operate State
Operator State:Operator State 可以作用在所有算子上,每个算子中并行的 Task 都可以共享一个状态,或者说同⼀个算⼦中的多个 Task 的状态是相同的。但是请注意,算子状态不能由相同或不同算子的另一个实例访问。Operator State 支持三种基本数据结构,分别是:
o ListState:存储列表类型的状态。
o UnionListState:存储列表类型的状态。和 ListState 的区别是,如果发生故障,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。
BroadcastState:用于广播的算子状态。如果一个算子有多项任务,并且它的每项任务状态又都相同,这种情况就可以使用广播状态。
• Keyed State:Keyed State 是作用在 KeyedStream 上的。从名称中就可以看出来,它的特点是和 Key 强相关的。在任务处理中,Flink 为每个 Key 维护一个状态实例,而且相同 Key 所对应的数据都会被分配到同一个任务中执行。Keyed State 支持五种基本数据结构,分别是:
o ValueState:保存单个 Value,可以针对该 Value 进行 get/set 操作。
o ListState:保存一个列表,列表中可以存储多个 Value。可以针对列表进行 add、get、update 操作。
o MapState:保存 Key-Value 类型的值。可以针对其进行 get、put、remove 操作,还可以使用 contains 判断某个 key 是否存在。
o ReducingState:保存一个单一值,该值是添加到状态的所有值聚合的结果。
o AggregatingState:保存一个单一值,该值是添加到状态的所有值聚合的结果。与 ReducingState 有些不同,聚合类型可能不同于添加到状态的元素的类型。接口和 ListState 相同,但是使用 add(IN)添加的元素本质是通过使用指定的 AggregateFunction 进行聚合。
Flink 架构
主要进程
JobManager 作业管理器
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的 JobManager 所控制执行。JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的 JAR 包。JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。而在运行过程中,JobManager 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
ResourceManager 资源管理器
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是 Flink 中定义的处理资源单元。Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如 YARN、Mesos、K8s,以及 standalone 部署。当 JobManager 申请插槽资源时,ResourceManager 会将有空闲插槽的 TaskManager 分配给 JobManager。如果 ResourceManager 没有足够的插槽来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager 进程的容器。另外,ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。
TaskManager 任务管理器
Flink 中的工作进程。通常在 Flink 中会有多个 TaskManager 运行,每一个 TaskManager 都包含了一定数量的插槽(slots)。插槽的数量限制了 TaskManager 能够执行的任务数量。启动之后,TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager 就会将一个或者多个插槽提供给 JobManager 调用。JobManager 就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个 TaskManager 可以跟其它运行同一应用程序的 TaskManager 交换数据。
Dispatcher 以及分发器
可以跨作业运行,它为应用提交提供了 REST 接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager。由于是 REST 接口,所以 Dispatcher 可以作为集群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中可能并不是必需的,这取决于应用提交运行的方式。
Slots
每⼀个 TaskManager(worker)是⼀个 JVM 进程,它可能会在独⽴的线程上执⾏⼀个或多个 subtask。为了控制⼀个 worker 能接收多少个 task,worker 通过 task slot 来进⾏控制(⼀个 worker ⾄少有⼀个 task slot)。每个 task slot 表示 TaskManager 拥有资源的⼀个固定⼤⼩的⼦集。假如⼀个 TaskManager 有三个 slot,那么它会将其管理的内存分成三份给各个 slot。资源 slot 化意味着⼀个 subtask 将不需要跟来⾃其 他 job 的 subtask 竞争被管理的内存,取⽽代之的是它将拥有⼀定数量的内存储备。需要注意的是,这⾥ 不会涉及到 CPU 的隔离,slot ⽬前仅仅⽤来隔离 task 的受管理的内存。
通过调整 task slot 的数量,允许⽤户定义 subtask 之间如何互相隔离。如果⼀个 TaskManager ⼀个 slot,那将意味着每个 task group 运⾏在独⽴的 JVM 中(该 JVM 可能是通过⼀个特定的容器启动的),⽽ ⼀个 TaskManager 多个 slot 意味着更多的 subtask 可以共享同⼀个 JVM。⽽在同⼀个 JVM 进程中的 task 将 共享 TCP 连接(基于多路复⽤)和⼼跳消息。它们也可能共享数据集和数据结构,因此这减少了每个 task 的负载。
Task Slot 是静态的概念,是指 TaskManager 具有的并发执⾏能⼒,可以通过参数 taskmanager.numberOfTaskSlots 进⾏配置,⽽并⾏度 parallelism 是动态概念,即 TaskManager 运⾏ 程序时实际使⽤的并发能⼒,可以通过参数 parallelism.default 进⾏配置。也就是说,假设⼀共有 3 个 TaskManager,每⼀个 TaskManager 中分配 3 个 TaskSlot,也就是每个 TaskManager 可以接收 3 个 task,⼀共 9 个 TaskSlot,如果我们设置 parallelism.default=1,即运⾏程序默认的并⾏度为 1,9 个 TaskSlot 只⽤了 1 个,有 8 个空闲,因此,设置合适的并⾏度才能提⾼效率。
窗口
Window分类
根据上游数据集的类型可以分为:
Keyed Window
Global Window
根据业务场景来分,又可以分为:
Count Window
Time Window。
TIME WINDOW
滚动窗口(Tumbling Window)
滚动窗口是按照固定时间进行切分,而且所有窗口之间的数据不会重叠,使用时只需要指定一个窗口长度即可。
image.png
代码样例:
滚动窗口的窗口大小(window size)是固定的,而且相邻窗口之间是连续的。现在有这样的业务场:某公司要求每 10 秒统计一次最近 10 秒内各个电商平台的订单数量并输出到大屏幕,这时候就需要用到滚动窗口了,我们只需要将窗口大小设置为 10 秒就可以。我们使用 netcat 发送 Socket 数据来模拟订单流量。
// 监控Socket数据
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
val dataStream: DataStream[(String, Int)] = textDstream
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(1)
滑动窗口(Sliding Window)
滑动窗口(Sliding Window):滑动窗口有两个参数,分别是窗口大小和窗口滑动时间,它是允许不同窗口的元素重叠的(同一个元素可以出现在不同的窗口中)。窗口大小指定数据统计的时间跨度,而滑动时间指定的是相邻两个窗口时间的时间偏移量。当滑动时间小于窗口大小的时候,数据会发生重叠;当滑动窗口大于窗口大小的时候,窗口会出现不连续的情况(部分元素不会纳入统计);当滑动时间和窗口大小相等的时候,滑动窗口就是滚动窗口,从这个角度来看,滚动窗口是滑动窗口的一个特殊存在。
image.png简单来说,会有数据的重复统计。不同的窗口里面,会有交叉的数据。
滚动窗口属于特殊滑动窗口
代码和滚动窗口类似:多了个滑动时间
// 监控Socket数据
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
val dataStream: DataStream[(String, Int)] = textDstream
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
会话窗口(Session Window)
与前滚动窗口和滑动窗口不同的是,会话窗口没有固定的滑动时间和窗口大小,而是通过一个 session gap 来指定窗口间隔。如果在 session gap 规定的时间内没有活跃数据进入的话,则认为当前窗口结束,下一个窗口开始。session gap 可以理解为相邻元素的最大时间差。
简单理解下,类似Tomcat的session,只要在当前session下有数据过来,则session继续保持;超过session规定等待时间,则session失效;当前session失效后,如果有新的数据进来,则建立新的session。
把一个session内的数据进行统计
image.png代码如下:
val dataStream: DataStream[(String, Int)] = textDstream
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.sum(1)
CountWindow
基于输入数据量定义,与时间无关。Count Window 也可以细分为滚动窗口和滑动窗口,逻辑和 Time Window 中的滚动窗口和滑动窗口的逻辑类似,只是窗口大小和触发条件由时间换成了相同 Key 元素的数量。窗口大小是由相同 Key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
简单来说,key重复次数到达阈值,则触发计算。
适合业务那种规定某个消息出现多少次则计算或者上报一次
val dataStream: DataStream[(String, Int)] = textDstream
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
// .countWindow(3) // 一个key出现了三次,则会触发一次 针对该key的 计算。且只计算这3个key
.countWindow(3,2) // 一个key出现了两次,则触发一次针对该key的计算。且会计算该key最近3个key。也就是说,会有重复统计的情况
.sum(1)
Time和WaterMark
在流式数据处理中,如何保证数据的全局有序和 Exactly Once(精准一次消费)是非常重要的。虽然数据在上游产生的时候是唯一并且有序的,但是数据从产生到进入 Flink 的过程中,中间可能会由于负载均衡、网络传输、分区等等原因造成数据乱序,为了应对这种情况,所以我们引入了时间语义和 WaterMark 的概念
内容范围
• Time
o Event Time (事件生成时间)
o Ingestion Time (事件接入时间)
o Processing Time (事件处理时间)
• Watermark
o Watermark 的概念
o Watermark 的使用
TIME时间语义
image.png• Event Time:指的是事件产生的时间。通常由事件中的某个时间戳字段来表示,比如用户登录日志中所携带的时间字段、天气信号检测系统中采集到的天气数据中所携带的时间字段。
• Ingestion Time:指的是事件进入 Flink 中的时间。
• Processing Time:指的是时间被处理时的当前系统时间。比如某条数据进入 Flink 之后,我们运行到某个算子时的系统时间,就是 Processing Time。Processing Time 是 Flink 中的默认时间属性。
也就是说,我们窗口默认处理的时间其实是 Processing Time。
但是有时候希望是 Ingestion Time。。。。。而且从业务角度来说,我们更加想使用 Event Time 。。。。。。。。。。。,可以参考下面代码:
// 设置使用EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置使用IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
Watermark 原理
一般业务中以Event Time作为处理时间,但是,Event 产生的比较早,到达flink的时间有可能比较迟。这个时候需要flink进行等待。比如12:00:00 的event Time,Flink等到12:00:05
WaterMark 代码样例
// 指定使用waterMark的方式处理处理时间,也就是指定EventTime,该时间是事件的业务日期
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val data: DataStream[UserLog] = env.socketTextStream("localhost", 9999)
.map(line => {
val arr = line.split(",")
UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toLong)
})
// 指定DataStream的WaterMark字段,并且以该字段作为基础来处理数据
// 指定WaterMark字段以后,同时指定具体的等待时间
data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[UserLog](Time.seconds(2)) {
override def extractTimestamp(element: UserLog): Long = {
element.time
}
}).print()
Table API 和 SQL
image.png首先是TableAPI,其实就是一种声明式编程的方式,类似于Spark的DataFrame和DataSet。
这是SQL的底层。
其实还是要加一些POM的依赖,当然具体的版本根据自己的情况来
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.11.0</version>
</dependency>
TableAPI
看下面代码样例,在Flink批处理中,使用对Table的一些简单SELECT 操作。
//初始化Table API的上下文环境
val tableEnv = BatchTableEnvironment.create(env)
tableEnv
.connect(new FileSystem().path("D:/ubuntu_linux_base/userlog.log"))
.withFormat(new OldCsv())
.withSchema(new Schema()
.field("time", DataTypes.BIGINT())
.field("action", DataTypes.STRING())
.field("city", DataTypes.STRING())
.field("IP", DataTypes.STRING())
.field("user_id", DataTypes.BIGINT())
)
.createTemporaryTable("temp_userlog")
tableEnv.from("temp_userlog").printSchema()
val res = tableEnv.from("temp_userlog").select("city")
tableEnv.toDataSet[Row](res).print();
这里面读取以逗号分隔的文件,且最终定义了多个列。
然后读取文件的schema信息,和展示文件的某个列的数据。
如果是流式处理,其实代码差不多,看下面代码:
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types._
object StreamTableTest {
def main(args: Array[String]): Unit = {
import org.apache.flink.streaming.api.scala._
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv)
val data = streamEnv.socketTextStream("localhost", 9999)
.map(line => {
val arr = line.split(",")
UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toLong)
})
val table: Table = tableEnv.fromDataStream(data)
table.printSchema()
val res = table
.where('city === "北京" || 'city === "成都")
.groupBy('city)
.select('city, 'user_id.count as 'cnt )
tableEnv
.toRetractStream[Row](res)
.print()
streamEnv.execute()
}
case class UserLog(time: Long, action: String, city: String, ip: String, user_id: Long)
区别不是很大,TableAPI 声明式编程适合批处理和流式处理两种情况。且处理的过程中,可以使用各种查询条件等,非常方便。
Flink SQL
创建 MyFlinkSql object。还是针对”过滤出城市为北京和成都的用户,并分别统计这两个城市中的用户数量“这个业务逻辑,对应到 Flink SQL 中的语法为:
object MyFlinkSql {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv)
val data = streamEnv.socketTextStream("localhost", 9999)
.map(line => {
val arr = line.split(",")
UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
})
val table = tableEnv.fromDataStream(data)
//tableEnv.registerDataStream("temp_userlog", data) // 也可以直接在SQL中使用指定临时表的名字的方式
val res = tableEnv.sqlQuery(
s"""
|select
| city, count(user_id) as cnt
|from
| $table
|where
| city = '北京' or city = '成都'
|group by
| city
|""".stripMargin)
tableEnv
.toRetractStream[Row](res)
.filter(_._1 == true)
.print()
tableEnv.execute("Table API")
}
case class UserLog(time: Long, action: String, city: String, ip: String, user_id: String)
}
可以通过sql的方式对流式的数据进行统计。
输入下面数据:
20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006
20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006
20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006
Flink处理结果如下:
13> (true,成都,1)
16> (true,北京,1)
13> (false,成都,1)
13> (true,成都,2)
16> (false,北京,1)
16> (true,北京,2)
16> (false,北京,2)
16> (true,北京,3)
13> (false,成都,2)
13> (true,成都,3)
13> (false,成都,3)
13> (true,成都,4)
16> (false,北京,3)
16> (true,北京,4)
16> (false,北京,4)
16> (true,北京,5)
13> (false,成都,4)
13> (true,成都,5)
13> (false,成都,5)
13> (true,成都,6)
16> (false,北京,5)
16> (true,北京,6)
网友评论