1.Flink API介绍
Flink提供了不同的抽象级别以开发流式或者批处理应用程序
分层API
-
Stateful Stream Processing 最低级的抽象接口是状态化的数据流接口(stateful
streaming)。这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户
自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册
event time 和 processing time 处理回调函数的方法来实现复杂的计算 -
DataStream/DataSet API 是 Flink 提供的核心 API ,DataSet 处理
有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map /
flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算 -
Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起
来却更加简洁,可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与
DataStream 以及 DataSet 混合使用 -
SQL Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似。
SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行
2.Dataflows 数据流
在Flink的世界观中,一切都是数据流,所以对于批计算来说,那只是流计算的一个特例而已
Flink Dataflows是由三部分组成,分别是:Source、Transformation、Sink结束
- Source负责读取数据源
- Transformation利用各种算子进行处理加工
-
Sink最终输出到外部(console、kafka、redis、DB......)
DataFlow
当source数据源的数量比较大或计算逻辑相对比较复杂的情况下,需要提高并行度来处理数据,采用并行数据流
通过设置不同算子的并行度 source并行度设置为2 map也是2.... 代表会启动多个并行的线程来处理数据
parallelized view
在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分。每一个dataflow以一个或多个sources开始以一个或多个sinks结束。dataflow类似于任意的有向无环图(DAG)。在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系,但有时候,一个transformation可能对应多个operator
3.Flink DataStream API
3.1 Environment
StreamExecutionEnvironment是所有Flink程序的基础。您可以使用以下静态方法获得一个StreamExecutionEnvironment:
#创建一个执行环境,表示当前执行程序的上下文。
#如果程序是独立调用的,则此方法返回本地执行环境;
#如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境
#也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
getExecutionEnvironment()
#返回本地执行环境
createLocalEnvironment()
#返回集群执行环境,将Jar提交到远程服务器。
#需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
3.2 Data Sources
Flink内嵌支持的数据源非常多,比如HDFS、Socket、Kafka、Collections Flink也提供了addSource方式,可以自定义数据源,本小节将讲解Flink所有内嵌数据源及自定义数据源的原理及API
3.2.1 File-based:
通过读取本地、HDFS文件创建一个数据源
env.readTextFile(path)
env.readTextFile(path)
env.readFile(fileInputFormat, path, watchType, interval, pathFilter)
readTextFile底层调用的就是readFile方法,readFile是一个更加底层的方式,使用起来会更加的灵活
3.2.2 Socket-based:
接受Socket Server中的数据
env.socketTextStream("node09",8888)
3.2.3 Collection-based:
env.fromCollection(Seq)
env.fromCollection(Iterator)
env.fromElements(elements: _*)
env.fromParallelCollection(SplittableIterator)
env.generateSequence(from, to)
3.2.4 Kafka Source
Flink的Kafka 消费类FlinkKafkaConsumer 这个是通用的kafka连接器适用0.10 以上的版本(或FlinkKafkaConsumer011 对应的是Kafka 0.11.x或FlinkKafkaConsumer010 对应Kafka 0.10.x)
kafka 作为source的例子:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
上面使用Flink已经定义好的反序列化shema SimpleStringSchema 但是返回的结果只有Kafka的value,而没有其它信息
如果需要获得Kafka的消息的key、value 和元数据,就需要通过实现KafkaDeserializationSchema接口方法deserialize 来实现
flink 对kafka的连接比较重要,后面会专门研究kafka connector
3.2.5 Custom Source
我们可以通过实现 Flink 的SourceFunction 来实现单个或者多个并行度的 Source。具体调用如下:
val stream = env.addSource( new MySensorSource() )
我们希望可以随机生成传感器数据,MySensorSource具体的代码实现如下:
class MySensorSource extends SourceFunction[SensorReading]{
// flag: 表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = {
running = false
}
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 初始化一个随机数发生器
val rand = new Random()
var curTemp = 1.to(10).map(
i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )
)
while(running){
// 更新温度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian() )
)
// 获取当前时间戳
val curTime = System.currentTimeMillis()
curTemp.foreach(
t => ctx.collect(SensorReading(t._1, curTime, t._2))
)
Thread.sleep(100)
}
}
}
3.3 Transformations
Transformations算子可以将一个或者多个算子转换成一个新的数据流,使用Transformations算子组合可以进行复杂的业务处理
3.3.1 简单的操作 DataStream
① Map
DataStream → DataStream
遍历数据流中的每一个元素,产生一个新的元素
② FlatMap
DataStream → DataStream
遍历数据流中的每一个元素,产生N个元素 N=0,1,2,......
③ Filter
DataStream → DataStream
过滤算子,根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉
3.3.2 分组流的操作 KeyedStream
① KeyBy
DataStream → KeyedStream
根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中,内部分区使用的是HashPartitioner
指定分区字段的方式有三种:
1、根据索引号或者field 指定 (最新版本已经不推荐使用这个方法指定分区)
2、通过匿名函数来指定
3、通过实现KeySelector接口 指定分区字段
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1, 100)
stream
.map(x => (x % 3, 1))
//根据索引号来指定分区字段
// .keyBy(0)
//通过传入匿名函数 指定分区字段
// .keyBy(x=>x._1)
//通过实现KeySelector接口 指定分区字段
.keyBy(new KeySelector[(Long, Int), Long] {
override def getKey(value: (Long, Int)): Long = value._1
})
.sum(1)
.print()
env.execute()
② Reduce
根据key聚合结果
注意: reduce是基于分区后的流对象进行聚合,也就是说,DataStream类型的对象无法调用reduce方法
.reduce((v1,v2) => (v1._1,v1._2 + v2._2))
④Fold
一个有初始值的分组数据流的滚动折叠操作. 合并当前元素和前一次折叠操作的结果,并产生一个新的值.
下面的fold函数就是当我们输入一个 (1,2,3,4,5)的序列, 将会产生一下面的句子:"start-1", "start-1-2", "start-1-2-3", ...
val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
⑤ Aggregations
KeyedStream → DataStream
Aggregations代表的是一类聚合算子,具体算子如下:
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
3.2.3 连接算子
① Union
DataStream → DataStream
合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素
注意:需要保证数据流中元素类型一致
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds1 = env.fromCollection(List(("a",1),("b",2),("c",3)))
val ds2 = env.fromCollection(List(("d",4),("e",5),("f",6)))
val ds3 = env.fromCollection(List(("g",7),("h",8)))
// val ds3 = env.fromCollection(List((1,1),(2,2)))
val unionStream = ds1.union(ds2,ds3)
unionStream.print()
env.execute()
② Connect
DataStream,DataStream → ConnectedStreams
合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态
val ds1 = env.socketTextStream("node01", 8888)
val ds2 = env.socketTextStream("node01", 9999)
val wcStream1 = ds1.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
val wcStream2 = ds2.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
val restStream: ConnectedStreams[(String, Int), (String, Int)] = wcStream2.connect(wcStream1)
ConnectedStreams 可以通过下面的算子来操作最后输出DataStream,具体看ConnectedStreams
ConnectedStreams
Connect与 Union 区别:
- Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
- Connect只能操作两个流,Union可以操作多个。
3.2.4 连接流的操作算子 ConnectedStreams
① CoMap, CoFlatMap
ConnectedStreams → DataStream
CoMap, CoFlatMap并不是具体算子名字,而是一类操作名称
凡是基于ConnectedStreams数据流做map遍历,这类操作叫做CoMap
凡是基于ConnectedStreams数据流做flatMap遍历,这类操作叫做CoFlatMap
CoMap第一种实现方式:
connectedStream.map(new CoMapFunction[(String,Int),(String,Int),(String,Int)] {
//对第一个数据流做计算
override def map1(value: (String, Int)): (String, Int) = {
(value._1+":first",value._2+100)
}
//对第二个数据流做计算
override def map2(value: (String, Int)): (String, Int) = {
(value._1+":second",value._2*100)
}
}).print()
CoMap第二种实现方式:
connectedStream.map(
//对第一个数据流做计算
x=>{(x._1+":first",x._2+100)}
//对第二个数据流做计算
,y=>{(y._1+":second",y._2*100)}
).print()
CoFlatMap第一种实现方式:
ds1.connect(ds2).flatMap((x,c:Collector[String])=>{
//对第一个数据流做计算
x.split(" ").foreach(w=>{
c.collect(w)
})
}
//对第二个数据流做计算
,(y,c:Collector[String])=>{
y.split(" ").foreach(d=>{
c.collect(d)
})
}).print
CoFlatMap第二种实现方式:
ds1.connect(ds2).flatMap(
//对第一个数据流做计算
x=>{
x.split(" ")
}
//对第二个数据流做计算
,y=>{
y.split(" ")
}).print()
CoFlatMap第三种实现方式:
ds1.connect(ds2).flatMap(new CoFlatMapFunction[String,String,(String,Int)] {
//对第一个数据流做计算
override def flatMap1(value: String, out: Collector[(String, Int)]): Unit = {
val words = value.split(" ")
words.foreach(x=>{
out.collect((x,1))
})
}
//对第二个数据流做计算
override def flatMap2(value: String, out: Collector[(String, Int)]): Unit = {
val words = value.split(" ")
words.foreach(x=>{
out.collect((x,1))
})
}
}).print()
3.2.5 拆分流
① Split
DataStream → SplitStream
根据条件将一个流分成两个或者更多的流
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,100)
val splitStream = stream.split(
d => {
d % 2 match {
case 0 => List("even")
case 1 => List("odd")
}
}
)
splitStream.select("even").print()
env.execute()
@deprecated Please use side output instead
② Select
SplitStream → DataStream
从SplitStream中选择一个或者多个数据流
splitStream.select("even").print()
③ side output侧输出流
流计算过程,可能遇到根据不同的条件来分隔数据流。filter分割造成不必要的数据复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01",8888)
val gtTag = new OutputTag[String]("gt")
val processStream = stream.process(new ProcessFunction[String, String] {
override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
try {
val longVar = value.toLong
if (longVar > 100) {
out.collect(value)
} else {
ctx.output(gtTag, value)
}
} catch {
case e => e.getMessage
ctx.output(gtTag, value)
}
}
})
val sideStream = processStream.getSideOutput(gtTag)
sideStream.print("sideStream")
processStream.print("mainStream")
env.execute()
3.2.6 Iterate 迭代算子
DataStream → IterativeStream → DataStream
Iterate算子提供了对数据流迭代的支持,对于定义不断更新模型的算法特别有用
迭代由两部分组成:迭代体、终止迭代条件
不满足终止迭代条件的数据流会返回到stream流中,进行下一次迭代
满足终止迭代条件的数据流继续往下游发送
val env = StreamExecutionEnvironment.getExecutionEnvironment
val initStream = env.socketTextStream("node01",8888)
val stream = initStream.map(_.toLong)
stream.iterate {
iteration => {
//定义迭代逻辑
val iterationBody = iteration.map ( x => {
println(x)
if(x > 0) x - 1
else x
} )
//> 0 大于0的值继续返回到stream流中,当 <= 0 继续往下游发送
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}.print()
env.execute()
3.2.7 Physical partitioning 分区算子 分区策略
① shuffle
场景:增大分区、提高并行度,解决数据倾斜
DataStream → DataStream
分区元素随机均匀分发到下游分区,网络开销比较大
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(1)
println(stream.getParallelism)
stream.shuffle.print()
env.execute()
console result: 上游数据比较随意的分发到下游
② rebalance
场景:增大分区、提高并行度,解决数据倾斜
DataStream → DataStream
轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val stream = env.generateSequence(1,100)
val shuffleStream = stream.rebalance
shuffleStream.print()
env.execute()
console result:上游数据比较均匀的分发到下游
③ rescale
场景:减少分区 防止发生大量的网络传输 不会发生全量的重分区
DataStream → DataStream
通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素
注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游
image.pngval env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.rescale.writeAsText("./data/stream2").setParallelism(4)
env.execute()
④ broadcast
场景:需要使用映射表、并且映射表会经常发生变动的场景
DataStream → DataStream
上游中每一个元素内容广播到下游每一个分区中
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
env.execute()
⑤ global
场景:并行度降为1
DataStream → DataStream
上游分区的数据只分发给下游的第一个分区
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.global.writeAsText("./data/stream2").setParallelism(4)
env.execute()
⑥ forward
场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略
DataStream → DataStream
上游分区数据分发到下游对应分区中
partition1->partition1
partition2->partition2
注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:
Forward partitioning does not allow change of parallelism
* Upstream operation: Source: Sequence Source-1 parallelism: 2,
* downstream operation: Sink: Unnamed-4 parallelism: 4
* stream.forward.writeAsText("./data/stream2").setParallelism(4)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.forward.writeAsText("./data/stream2").setParallelism(2)
env.execute()
⑦ keyBy
场景:与业务场景匹配
DataStream → DataStream
根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区
MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
env.execute()
根据元素Hash值分发到下游分区中
⑧ PartitionCustom 自定义分区
DataStream → DataStream
通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区
object ShuffleOperator {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val stream = env.generateSequence(1,10).map((_,1))
stream.writeAsText("./data/stream1")
stream.partitionCustom(new customPartitioner(),0)
.writeAsText("./data/stream2").setParallelism(4)
env.execute()
}
class customPartitioner extends Partitioner[Long]{
override def partition(key: Long, numPartitions: Int): Int = {
key.toInt % numPartitions
}
}
}
3.4 Sink
Flink内置了大量sink,可以将Flink处理后的数据输出到HDFS、kafka、Redis、ES、MySQL等等。除此以外,需要用户自定义实现sink。
工程场景中,会经常消费kafka中数据,处理结果存储到Redis、HBase或者MySQL中
3.4.1 redis Sink
Flink处理的数据可以存储到Redis中,以便实时查询
Flink内嵌连接Redis的连接器,只需要导入连接Redis的依赖就可以
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
WordCount写入到Redis中,选择的是HSET数据类型
代码如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01",8888)
val result = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
//若redis是单机
val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("node01").setPort(6379).build()
//如果是 redis集群
/*val addresses = new util.HashSet[InetSocketAddress]()
addresses.add(new InetSocketAddress("node01",6379))
addresses.add(new InetSocketAddress("node01",6379))
val clusterConfig = new FlinkJedisClusterConfig.Builder().setNodes(addresses).build()*/
result.addSink(new RedisSink[(String,Int)](config,new RedisMapper[(String,Int)] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"wc")
}
override def getKeyFromData(t: (String, Int)) = {
t._1
}
override def getValueFromData(t: (String, Int)) = {
t._2 + ""
}
}))
env.execute()
3.4.2 Kafka Sink
处理结果写入到kafka topic中,Flink也是默认支持,需要添加连接器依赖,跟读取kafka数据用的连接器依赖相同
跟读取kafka数据用的连接器依赖相同
之前添加过就不需要再次添加了
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
object KafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01",8888)
val result = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
val props = new Properties()
props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
// props.setProperty("key.serializer",classOf[StringSerializer].getName)
// props.setProperty("value.serializer",classOf[StringSerializer].getName)
/**
public FlinkKafkaProducer(
FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
*/
result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
}
},props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
env.execute()
}
}
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
object KafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01",8888)
val result = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
val props = new Properties()
props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
// props.setProperty("key.serializer",classOf[StringSerializer].getName)
// props.setProperty("value.serializer",classOf[StringSerializer].getName)
/**
public FlinkKafkaProducer(
FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
*/
result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
}
},props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
env.execute()
}
}
3.4.3 MySQL Sink(幂等性)
Flink处理结果写入到MySQL中,这并不是Flink默认支持的,需要添加MySQL的驱动依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
因为不是内嵌支持的,所以需要基于RichSinkFunction自定义sink
消费kafka中数据,统计各个卡口的流量,并且存入到MySQL中
注意:需要去重,操作MySQL需要幂等性
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer
object MySQLSink {
case class CarInfo(monitorId: String, carId: String, eventTime: String, Speed: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置连接kafka的配置信息
val props = new Properties()
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "flink-kafka-001")
props.setProperty("key.deserializer", classOf[StringSerializer].getName)
props.setProperty("value.deserializer", classOf[StringSerializer].getName)
//第一个参数 : 消费的topic名
val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
//什么时候停止,停止条件是什么
override def isEndOfStream(t: (String, String)): Boolean = false
//要进行序列化的字节流
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
val key = new String(consumerRecord.key(), "UTF-8")
val value = new String(consumerRecord.value(), "UTF-8")
(key, value)
}
//指定一下返回的数据类型 Flink提供的类型
override def getProducedType: TypeInformation[(String, String)] = {
createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
}
}, props))
stream.map(data => {
val value = data._2
val splits = value.split("\t")
val monitorId = splits(0)
(monitorId, 1)
}).keyBy(_._1)
.reduce(new ReduceFunction[(String, Int)] {
//t1:上次聚合完的结果 t2:当前的数据
override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
(t1._1, t1._2 + t2._2)
}
}).addSink(new MySQLCustomSink)
env.execute()
}
//幂等性写入外部数据库MySQL
class MySQLCustomSink extends RichSinkFunction[(String, Int)] {
var conn: Connection = _
var insertPst: PreparedStatement = _
var updatePst: PreparedStatement = _
//每来一个元素都会调用一次
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
println(value)
updatePst.setInt(1, value._2)
updatePst.setString(2, value._1)
updatePst.execute()
println(updatePst.getUpdateCount)
if(updatePst.getUpdateCount == 0){
println("insert")
insertPst.setString(1, value._1)
insertPst.setInt(2, value._2)
insertPst.execute()
}
}
//thread初始化的时候执行一次
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123123")
insertPst = conn.prepareStatement("INSERT INTO car_flow(monitorId,count) VALUES(?,?)")
updatePst = conn.prepareStatement("UPDATE car_flow SET count = ? WHERE monitorId = ?")
}
//thread关闭的时候 执行一次
override def close(): Unit = {
insertPst.close()
updatePst.close()
conn.close()
}
}
}
3.4.4 Socket Sink
Flink处理结果发送到套接字(Socket)
基于RichSinkFunction自定义sink
import java.io.PrintStream
import java.net.{InetAddress, Socket}
import java.util.Properties
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer
//sink 到 套接字 socket
object SocketSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置连接kafka的配置信息
val props = new Properties()
//注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元数据)
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "flink-kafka-001")
props.setProperty("key.deserializer", classOf[StringSerializer].getName)
props.setProperty("value.deserializer", classOf[StringSerializer].getName)
//第一个参数 : 消费的topic名
val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
//什么时候停止,停止条件是什么
override def isEndOfStream(t: (String, String)): Boolean = false
//要进行序列化的字节流
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
val key = new String(consumerRecord.key(), "UTF-8")
val value = new String(consumerRecord.value(), "UTF-8")
(key, value)
}
//指定一下返回的数据类型 Flink提供的类型
override def getProducedType: TypeInformation[(String, String)] = {
createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
}
}, props))
stream.map(data => {
val value = data._2
val splits = value.split("\t")
val monitorId = splits(0)
(monitorId, 1)
}).keyBy(_._1)
.reduce(new ReduceFunction[(String, Int)] {
//t1:上次聚合完的结果 t2:当前的数据
override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
(t1._1, t1._2 + t2._2)
}
}).addSink(new SocketCustomSink("node01",8888))
env.execute()
}
class SocketCustomSink(host:String,port:Int) extends RichSinkFunction[(String,Int)]{
var socket: Socket = _
var writer:PrintStream = _
override def open(parameters: Configuration): Unit = {
socket = new Socket(InetAddress.getByName(host), port)
writer = new PrintStream(socket.getOutputStream)
}
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
writer.println(value._1 + "\t" +value._2)
writer.flush()
}
override def close(): Unit = {
writer.close()
socket.close()
}
}
}
3.4.5 HBase Sink
计算结果写入sink 两种实现方式:
- map算子写入 频繁创建hbase连接
- process写入 适合批量写入hbase
导入HBase依赖包
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
读取kafka数据,统计卡口流量保存至HBase数据库中
- HBase中创建对应的表
create 'car_flow',{NAME => 'count', VERSIONS => 1}
- 实现代码
import java.util.{Date, Properties}
import com.msb.stream.util.{DateUtils, HBaseUtil}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.serialization.StringSerializer
object HBaseSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置连接kafka的配置信息
val props = new Properties()
//注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元数据)
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "flink-kafka-001")
props.setProperty("key.deserializer", classOf[StringSerializer].getName)
props.setProperty("value.deserializer", classOf[StringSerializer].getName)
val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props))
stream.map(row => {
val arr = row.split("\t")
(arr(0), 1)
}).keyBy(_._1)
.reduce((v1: (String, Int), v2: (String, Int)) => {
(v1._1, v1._2 + v2._2)
}).process(new ProcessFunction[(String, Int), (String, Int)] {
var htab: HTable = _
override def open(parameters: Configuration): Unit = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
val hbaseName = "car_flow"
htab = new HTable(conf, hbaseName)
}
override def close(): Unit = {
htab.close()
}
override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
// rowkey:monitorid 时间戳(分钟) value:车流量
val min = DateUtils.getMin(new Date())
val put = new Put(Bytes.toBytes(value._1))
put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2))
htab.put(put)
}
})
env.execute()
}
}
3.4.6 Elasticsearch Sink
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
在主函数中调用:
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
println("saving data: " + t)
val json = new util.HashMap[String, String]()
json.put("data", t.toString)
val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(json)
requestIndexer.add(indexRequest)
println("saved successfully")
}
} )
dataStream.addSink( esSinkBuilder.build() )
3.5 支持的数据类型
Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。
3.5.1 基础数据类型
Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
3.5.2 Java和Scala元组(Tuples)
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18)
3.5.3 Scala样例类(case classes)
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
3.5.4 Java简单对象(POJOs)
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));
3.5.5 其它(Arrays, Lists, Maps, Enums, 等等)
Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。
3.6 实现UDF函数——更细粒度的控制流
3.6.1 函数类(Function Classes)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
下面例子实现了FilterFunction接口:
class FilterFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
val flinkTweets = tweets.filter(new FlinkFilter)
还可以将函数实现成匿名类
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
)
我们filter的字符串"flink"还可以当作参数传进去。
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}
3.6.2 匿名函数(Lambda Functions)
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))
3.6.3 富函数(Rich Functions)
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …
Rich Function有一个生命周期的概念。典型的生命周期方法有: - open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 以下可以做一些初始化工作,例如建立一个和HDFS的连接
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
}
override def close(): Unit = {
// 以下做一些清理工作,例如断开和HDFS的连接。
}
}
参考flink 官网 Flink DataStream API
参考flink 官网 Apache Kafka Connector 1.11
参考flink 官网 Apache Kafka Connector1.10
参考flink 官网 Operators
参考flink 官网 Connectors
网友评论