Flink API

作者: Rex_2013 | 来源:发表于2020-09-03 16:00 被阅读0次

    1.Flink API介绍

    Flink提供了不同的抽象级别以开发流式或者批处理应用程序


    分层API
    • Stateful Stream Processing 最低级的抽象接口是状态化的数据流接口(stateful
      streaming)。这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户
      自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册
      event time 和 processing time 处理回调函数的方法来实现复杂的计算
    • DataStream/DataSet API 是 Flink 提供的核心 API ,DataSet 处理
      有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map /
      flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算
    • Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起
      来却更加简洁,可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与
      DataStream 以及 DataSet 混合使用
    • SQL Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似。
      SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行

    2.Dataflows 数据流

    在Flink的世界观中,一切都是数据流,所以对于批计算来说,那只是流计算的一个特例而已

    Flink Dataflows是由三部分组成,分别是:SourceTransformationSink结束

    • Source负责读取数据源
    • Transformation利用各种算子进行处理加工
    • Sink最终输出到外部(console、kafka、redis、DB......)


      DataFlow

    当source数据源的数量比较大或计算逻辑相对比较复杂的情况下,需要提高并行度来处理数据,采用并行数据流

    通过设置不同算子的并行度 source并行度设置为2 map也是2.... 代表会启动多个并行的线程来处理数据


    parallelized view

    在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分。每一个dataflow以一个或多个sources开始以一个或多个sinks结束。dataflow类似于任意的有向无环图(DAG)。在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系,但有时候,一个transformation可能对应多个operator


    3.Flink DataStream API

    3.1 Environment

    StreamExecutionEnvironment是所有Flink程序的基础。您可以使用以下静态方法获得一个StreamExecutionEnvironment:

    #创建一个执行环境,表示当前执行程序的上下文。 
    #如果程序是独立调用的,则此方法返回本地执行环境;
    #如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境
    #也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
    getExecutionEnvironment()
    
    #返回本地执行环境 
    createLocalEnvironment()
    
    #返回集群执行环境,将Jar提交到远程服务器。
    #需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
    createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
    

    3.2 Data Sources

    Flink内嵌支持的数据源非常多,比如HDFS、Socket、Kafka、Collections Flink也提供了addSource方式,可以自定义数据源,本小节将讲解Flink所有内嵌数据源及自定义数据源的原理及API


    3.2.1 File-based:

    通过读取本地、HDFS文件创建一个数据源

    env.readTextFile(path) 
    env.readTextFile(path)
    env.readFile(fileInputFormat, path, watchType, interval, pathFilter)
    

    readTextFile底层调用的就是readFile方法,readFile是一个更加底层的方式,使用起来会更加的灵活

    3.2.2 Socket-based:

    接受Socket Server中的数据

    env.socketTextStream("node09",8888)
    

    3.2.3 Collection-based:

    env.fromCollection(Seq)
    env.fromCollection(Iterator)
    env.fromElements(elements: _*)
    env.fromParallelCollection(SplittableIterator)
    env.generateSequence(from, to)
    

    3.2.4 Kafka Source

    Flink的Kafka 消费类FlinkKafkaConsumer 这个是通用的kafka连接器适用0.10 以上的版本(或FlinkKafkaConsumer011 对应的是Kafka 0.11.x或FlinkKafkaConsumer010 对应Kafka 0.10.x)
    kafka 作为source的例子:

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test");
    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
    

    上面使用Flink已经定义好的反序列化shema SimpleStringSchema 但是返回的结果只有Kafka的value,而没有其它信息
    如果需要获得Kafka的消息的key、value 和元数据,就需要通过实现KafkaDeserializationSchema接口方法deserialize 来实现

    flink 对kafka的连接比较重要,后面会专门研究kafka connector

    3.2.5 Custom Source

    我们可以通过实现 Flink 的SourceFunction 来实现单个或者多个并行度的 Source。具体调用如下:

    val stream = env.addSource( new MySensorSource() )
    

    我们希望可以随机生成传感器数据,MySensorSource具体的代码实现如下:

    class MySensorSource extends SourceFunction[SensorReading]{
    
        // flag: 表示数据源是否还在正常运行
        var running: Boolean = true
    
        override def cancel(): Unit = {
            running = false
        }
    
        override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
            // 初始化一个随机数发生器
            val rand = new Random()
    
            var curTemp = 1.to(10).map(
                i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )
            )
    
            while(running){
            // 更新温度值
            curTemp = curTemp.map(
                t => (t._1, t._2 + rand.nextGaussian() )
            )
            // 获取当前时间戳
            val curTime = System.currentTimeMillis()
    
            curTemp.foreach(
                t => ctx.collect(SensorReading(t._1, curTime, t._2))
            )
            Thread.sleep(100)
            }
        }
    }
    

    3.3 Transformations

    Transformations算子可以将一个或者多个算子转换成一个新的数据流,使用Transformations算子组合可以进行复杂的业务处理


    3.3.1 简单的操作 DataStream


    ① Map

    DataStream → DataStream

    遍历数据流中的每一个元素,产生一个新的元素

    ② FlatMap

    DataStream → DataStream

    遍历数据流中的每一个元素,产生N个元素 N=0,1,2,......

    ③ Filter

    DataStream → DataStream

    过滤算子,根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉


    3.3.2 分组流的操作 KeyedStream


    ① KeyBy

    DataStream → KeyedStream

    根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中,内部分区使用的是HashPartitioner

    指定分区字段的方式有三种:
    1、根据索引号或者field 指定 (最新版本已经不推荐使用这个方法指定分区)
    2、通过匿名函数来指定
    3、通过实现KeySelector接口 指定分区字段

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env.generateSequence(1, 100)
        stream
          .map(x => (x % 3, 1))
          //根据索引号来指定分区字段
          //      .keyBy(0)
          //通过传入匿名函数 指定分区字段
          //      .keyBy(x=>x._1)
          //通过实现KeySelector接口  指定分区字段
          .keyBy(new KeySelector[(Long, Int), Long] {
          override def getKey(value: (Long, Int)): Long = value._1
        })
          .sum(1)
          .print()
        env.execute()
    

    ② Reduce

    根据key聚合结果

    注意: reduce是基于分区后的流对象进行聚合,也就是说,DataStream类型的对象无法调用reduce方法

    .reduce((v1,v2) => (v1._1,v1._2 + v2._2))
    

    ④Fold

    一个有初始值的分组数据流的滚动折叠操作. 合并当前元素和前一次折叠操作的结果,并产生一个新的值.

    下面的fold函数就是当我们输入一个 (1,2,3,4,5)的序列, 将会产生一下面的句子:"start-1", "start-1-2", "start-1-2-3", ...

    val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
    

    ⑤ Aggregations

    KeyedStream → DataStream

    Aggregations代表的是一类聚合算子,具体算子如下:

    keyedStream.sum(0)
    keyedStream.sum("key")
    keyedStream.min(0)
    keyedStream.min("key")
    keyedStream.max(0)
    keyedStream.max("key")
    keyedStream.minBy(0)
    keyedStream.minBy("key")
    keyedStream.maxBy(0)
    keyedStream.maxBy("key")
    

    3.2.3 连接算子


    ① Union

    DataStream → DataStream

    合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素

    注意:需要保证数据流中元素类型一致

    val env = StreamExecutionEnvironment.getExecutionEnvironment
        val ds1 = env.fromCollection(List(("a",1),("b",2),("c",3)))
        val ds2 = env.fromCollection(List(("d",4),("e",5),("f",6)))
        val ds3 = env.fromCollection(List(("g",7),("h",8)))
    //    val ds3 = env.fromCollection(List((1,1),(2,2)))
        val unionStream = ds1.union(ds2,ds3)
        unionStream.print()
        env.execute()
    

    ② Connect

    DataStream,DataStream → ConnectedStreams

    合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态

    val ds1 = env.socketTextStream("node01", 8888)
    val ds2 = env.socketTextStream("node01", 9999)
    val wcStream1 = ds1.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    val wcStream2 = ds2.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    val restStream: ConnectedStreams[(String, Int), (String, Int)] = wcStream2.connect(wcStream1)
    

    ConnectedStreams 可以通过下面的算子来操作最后输出DataStream,具体看ConnectedStreams


    ConnectedStreams

    Connect与 Union 区别:

    1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
    2. Connect只能操作两个流,Union可以操作多个。

    3.2.4 连接流的操作算子 ConnectedStreams

    ① CoMap, CoFlatMap

    ConnectedStreams → DataStream
    CoMap, CoFlatMap并不是具体算子名字,而是一类操作名称

    凡是基于ConnectedStreams数据流做map遍历,这类操作叫做CoMap
    凡是基于ConnectedStreams数据流做flatMap遍历,这类操作叫做CoFlatMap

    CoMap第一种实现方式:

    connectedStream.map(new CoMapFunction[(String,Int),(String,Int),(String,Int)] {
          //对第一个数据流做计算
          override def map1(value: (String, Int)): (String, Int) = {
            (value._1+":first",value._2+100)
          }
          //对第二个数据流做计算
          override def map2(value: (String, Int)): (String, Int) = {
            (value._1+":second",value._2*100)
          }
        }).print()
    

    CoMap第二种实现方式:

    connectedStream.map(
          //对第一个数据流做计算
          x=>{(x._1+":first",x._2+100)}
          //对第二个数据流做计算
          ,y=>{(y._1+":second",y._2*100)}
        ).print()
    

    CoFlatMap第一种实现方式:

    ds1.connect(ds2).flatMap((x,c:Collector[String])=>{
          //对第一个数据流做计算
          x.split(" ").foreach(w=>{
            c.collect(w)
          })
    
        }
          //对第二个数据流做计算
          ,(y,c:Collector[String])=>{
          y.split(" ").foreach(d=>{
            c.collect(d)
          })
        }).print
    

    CoFlatMap第二种实现方式:

     ds1.connect(ds2).flatMap(
          //对第一个数据流做计算
          x=>{
          x.split(" ")
        }
          //对第二个数据流做计算
          ,y=>{
            y.split(" ")
          }).print()
    

    CoFlatMap第三种实现方式:

    ds1.connect(ds2).flatMap(new CoFlatMapFunction[String,String,(String,Int)] {
        //对第一个数据流做计算 
        override def flatMap1(value: String, out: Collector[(String, Int)]): Unit = {
            val words = value.split(" ")
            words.foreach(x=>{
              out.collect((x,1))
            })
          }
    
        //对第二个数据流做计算
        override def flatMap2(value: String, out: Collector[(String, Int)]): Unit = {
            val words = value.split(" ")
            words.foreach(x=>{
              out.collect((x,1))
            })
          }
        }).print()
    

    3.2.5 拆分流


    ① Split

    DataStream → SplitStream

    根据条件将一个流分成两个或者更多的流

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,100)
    val splitStream = stream.split(
        d => {
            d % 2 match {
                case 0 => List("even")
                case 1 => List("odd")
            }
        }
    )
    splitStream.select("even").print()
    env.execute()
    
    @deprecated Please use side output instead
    

    ② Select

    SplitStream → DataStream

    从SplitStream中选择一个或者多个数据流

    splitStream.select("even").print()
    

    ③ side output侧输出流

    流计算过程,可能遇到根据不同的条件来分隔数据流。filter分割造成不必要的数据复制

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env.socketTextStream("node01",8888)
        val gtTag = new OutputTag[String]("gt")
        val processStream = stream.process(new ProcessFunction[String, String] {
          override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
            try {
              val longVar = value.toLong
              if (longVar > 100) {
                out.collect(value)
              } else {
                ctx.output(gtTag, value)
              }
            } catch {
              case e => e.getMessage
                ctx.output(gtTag, value)
            }
          }
        })
        val sideStream = processStream.getSideOutput(gtTag)
        sideStream.print("sideStream")
        processStream.print("mainStream")
        env.execute()
    

    3.2.6 Iterate 迭代算子

    DataStream → IterativeStream → DataStream

    Iterate算子提供了对数据流迭代的支持,对于定义不断更新模型的算法特别有用

    迭代由两部分组成:迭代体、终止迭代条件

    不满足终止迭代条件的数据流会返回到stream流中,进行下一次迭代

    满足终止迭代条件的数据流继续往下游发送

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val initStream = env.socketTextStream("node01",8888)
    val stream = initStream.map(_.toLong)
    stream.iterate {
        iteration => {
            //定义迭代逻辑
            val iterationBody = iteration.map ( x => {
                println(x)
                if(x > 0) x - 1
                else x
            } )
            //> 0  大于0的值继续返回到stream流中,当 <= 0 继续往下游发送
            (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
        }
    }.print()
    env.execute()
    

    3.2.7 Physical partitioning 分区算子 分区策略


    ① shuffle

    场景:增大分区、提高并行度,解决数据倾斜

    DataStream → DataStream

    分区元素随机均匀分发到下游分区,网络开销比较大

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(1)
    println(stream.getParallelism)
    stream.shuffle.print()
    env.execute()
    

    console result: 上游数据比较随意的分发到下游

    ② rebalance

    场景:增大分区、提高并行度,解决数据倾斜

    DataStream → DataStream

    轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(3)
    val stream = env.generateSequence(1,100)
    val shuffleStream = stream.rebalance
    shuffleStream.print()
    env.execute()
    

    console result:上游数据比较均匀的分发到下游

    ③ rescale

    场景:减少分区 防止发生大量的网络传输 不会发生全量的重分区

    DataStream → DataStream

    通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素

    注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游

    image.png
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.rescale.writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    

    ④ broadcast

    场景:需要使用映射表、并且映射表会经常发生变动的场景

    DataStream → DataStream

    上游中每一个元素内容广播到下游每一个分区中

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    

    ⑤ global

    场景:并行度降为1

    DataStream → DataStream

    上游分区的数据只分发给下游的第一个分区

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.global.writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    

    ⑥ forward

    场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略

    DataStream → DataStream

    上游分区数据分发到下游对应分区中

    partition1->partition1

    partition2->partition2

    注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:

    Forward partitioning does not allow change of parallelism
    * Upstream operation: Source: Sequence Source-1 parallelism: 2,
    * downstream operation: Sink: Unnamed-4 parallelism: 4
    * stream.forward.writeAsText("./data/stream2").setParallelism(4)
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.forward.writeAsText("./data/stream2").setParallelism(2)
    env.execute()
    

    ⑦ keyBy

    场景:与业务场景匹配

    DataStream → DataStream

    根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区

    MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
    env.execute()
    

    根据元素Hash值分发到下游分区中

    ⑧ PartitionCustom 自定义分区

    DataStream → DataStream

    通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区

    object ShuffleOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(2)
        val stream = env.generateSequence(1,10).map((_,1))
        stream.writeAsText("./data/stream1")
        stream.partitionCustom(new customPartitioner(),0)
          .writeAsText("./data/stream2").setParallelism(4)
        env.execute()
      }
      class customPartitioner extends Partitioner[Long]{
        override def partition(key: Long, numPartitions: Int): Int = {
          key.toInt % numPartitions
        }
      }
    }
    

    3.4 Sink

    Flink内置了大量sink,可以将Flink处理后的数据输出到HDFS、kafka、Redis、ES、MySQL等等。除此以外,需要用户自定义实现sink。
    工程场景中,会经常消费kafka中数据,处理结果存储到Redis、HBase或者MySQL中


    3.4.1 redis Sink

    Flink处理的数据可以存储到Redis中,以便实时查询
    Flink内嵌连接Redis的连接器,只需要导入连接Redis的依赖就可以

            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
            </dependency>
    

    WordCount写入到Redis中,选择的是HSET数据类型

    代码如下:

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env.socketTextStream("node01",8888)
        val result = stream.flatMap(_.split(" "))
          .map((_, 1))
          .keyBy(0)
          .sum(1)
    
        //若redis是单机
        val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("node01").setPort(6379).build()
        //如果是 redis集群
        /*val addresses = new util.HashSet[InetSocketAddress]()
        addresses.add(new InetSocketAddress("node01",6379))
        addresses.add(new InetSocketAddress("node01",6379))
       val clusterConfig = new FlinkJedisClusterConfig.Builder().setNodes(addresses).build()*/
    
        result.addSink(new RedisSink[(String,Int)](config,new RedisMapper[(String,Int)] {
    
          override def getCommandDescription: RedisCommandDescription = {
            new RedisCommandDescription(RedisCommand.HSET,"wc")
          }
    
          override def getKeyFromData(t: (String, Int))  = {
            t._1
          }
    
          override def getValueFromData(t: (String, Int))  = {
            t._2 + ""
          }
        }))
        env.execute()
    

    3.4.2 Kafka Sink

    处理结果写入到kafka topic中,Flink也是默认支持,需要添加连接器依赖,跟读取kafka数据用的连接器依赖相同
    跟读取kafka数据用的连接器依赖相同

    之前添加过就不需要再次添加了

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink-version}</version>
            </dependency>
    
    import java.lang
    import java.util.Properties
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
    import org.apache.kafka.clients.producer.ProducerRecord
    import org.apache.kafka.common.serialization.StringSerializer
    
    object KafkaSink {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env.socketTextStream("node01",8888)
        val result = stream.flatMap(_.split(" "))
          .map((_, 1))
          .keyBy(0)
          .sum(1)
    
        val props = new Properties()
        props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
    //    props.setProperty("key.serializer",classOf[StringSerializer].getName)
    //    props.setProperty("value.serializer",classOf[StringSerializer].getName)
    
    
        /**
        public FlinkKafkaProducer(
         FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
          */
        result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
          override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
            new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
          }
        },props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
    
        env.execute()
      }
    }
    
    import java.lang
    import java.util.Properties
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
    import org.apache.kafka.clients.producer.ProducerRecord
    import org.apache.kafka.common.serialization.StringSerializer
    
    object KafkaSink {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env.socketTextStream("node01",8888)
        val result = stream.flatMap(_.split(" "))
          .map((_, 1))
          .keyBy(0)
          .sum(1)
    
        val props = new Properties()
        props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
    //    props.setProperty("key.serializer",classOf[StringSerializer].getName)
    //    props.setProperty("value.serializer",classOf[StringSerializer].getName)
    
    
        /**
        public FlinkKafkaProducer(
         FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
          */
        result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
          override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
            new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
          }
        },props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
    
        env.execute()
      }
    }
    

    3.4.3 MySQL Sink(幂等性)

    Flink处理结果写入到MySQL中,这并不是Flink默认支持的,需要添加MySQL的驱动依赖

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.44</version>
            </dependency>
    

    因为不是内嵌支持的,所以需要基于RichSinkFunction自定义sink

    消费kafka中数据,统计各个卡口的流量,并且存入到MySQL中

    注意:需要去重,操作MySQL需要幂等性

    import java.sql.{Connection, DriverManager, PreparedStatement}
    import java.util.Properties
    
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringSerializer
    
    object MySQLSink {
    
      case class CarInfo(monitorId: String, carId: String, eventTime: String, Speed: Long)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //设置连接kafka的配置信息
        val props = new Properties()
        props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
        props.setProperty("group.id", "flink-kafka-001")
        props.setProperty("key.deserializer", classOf[StringSerializer].getName)
        props.setProperty("value.deserializer", classOf[StringSerializer].getName)
    
        //第一个参数 : 消费的topic名
        val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
          //什么时候停止,停止条件是什么
          override def isEndOfStream(t: (String, String)): Boolean = false
    
          //要进行序列化的字节流
          override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
            val key = new String(consumerRecord.key(), "UTF-8")
            val value = new String(consumerRecord.value(), "UTF-8")
            (key, value)
          }
    
          //指定一下返回的数据类型  Flink提供的类型
          override def getProducedType: TypeInformation[(String, String)] = {
            createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
          }
        }, props))
    
        stream.map(data => {
          val value = data._2
          val splits = value.split("\t")
          val monitorId = splits(0)
          (monitorId, 1)
        }).keyBy(_._1)
          .reduce(new ReduceFunction[(String, Int)] {
            //t1:上次聚合完的结果  t2:当前的数据
            override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
              (t1._1, t1._2 + t2._2)
            }
          }).addSink(new MySQLCustomSink)
    
        env.execute()
      }
    
      //幂等性写入外部数据库MySQL
      class MySQLCustomSink extends RichSinkFunction[(String, Int)] {
        var conn: Connection = _
        var insertPst: PreparedStatement = _
        var updatePst: PreparedStatement = _
    
        //每来一个元素都会调用一次
        override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
          println(value)
          updatePst.setInt(1, value._2)
          updatePst.setString(2, value._1)
          updatePst.execute()
          println(updatePst.getUpdateCount)
          if(updatePst.getUpdateCount == 0){
            println("insert")
            insertPst.setString(1, value._1)
            insertPst.setInt(2, value._2)
            insertPst.execute()
          }
        }
    
        //thread初始化的时候执行一次
        override def open(parameters: Configuration): Unit = {
          conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123123")
          insertPst = conn.prepareStatement("INSERT INTO car_flow(monitorId,count) VALUES(?,?)")
          updatePst = conn.prepareStatement("UPDATE car_flow SET count = ? WHERE monitorId = ?")
        }
    
        //thread关闭的时候 执行一次
        override def close(): Unit = {
          insertPst.close()
          updatePst.close()
          conn.close()
        }
      }
    
    }
    

    3.4.4 Socket Sink

    Flink处理结果发送到套接字(Socket)

    基于RichSinkFunction自定义sink

    import java.io.PrintStream
    import java.net.{InetAddress, Socket}
    import java.util.Properties
    
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringSerializer
    
    //sink 到 套接字 socket
    object SocketSink {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //设置连接kafka的配置信息
        val props = new Properties()
        //注意   sparkstreaming + kafka(0.10之前版本) receiver模式  zookeeper url(元数据)
        props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
        props.setProperty("group.id", "flink-kafka-001")
        props.setProperty("key.deserializer", classOf[StringSerializer].getName)
        props.setProperty("value.deserializer", classOf[StringSerializer].getName)
    
        //第一个参数 : 消费的topic名
        val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
          //什么时候停止,停止条件是什么
          override def isEndOfStream(t: (String, String)): Boolean = false
    
          //要进行序列化的字节流
          override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
            val key = new String(consumerRecord.key(), "UTF-8")
            val value = new String(consumerRecord.value(), "UTF-8")
            (key, value)
          }
    
          //指定一下返回的数据类型  Flink提供的类型
          override def getProducedType: TypeInformation[(String, String)] = {
            createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
          }
        }, props))
    
        stream.map(data => {
          val value = data._2
          val splits = value.split("\t")
          val monitorId = splits(0)
          (monitorId, 1)
        }).keyBy(_._1)
          .reduce(new ReduceFunction[(String, Int)] {
            //t1:上次聚合完的结果  t2:当前的数据
            override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
              (t1._1, t1._2 + t2._2)
            }
          }).addSink(new SocketCustomSink("node01",8888))
    
        env.execute()
      }
    
      class SocketCustomSink(host:String,port:Int) extends RichSinkFunction[(String,Int)]{
        var socket: Socket  = _
        var writer:PrintStream = _
    
        override def open(parameters: Configuration): Unit = {
          socket = new Socket(InetAddress.getByName(host), port)
          writer = new PrintStream(socket.getOutputStream)
        }
    
        override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
          writer.println(value._1 + "\t" +value._2)
          writer.flush()
        }
    
        override def close(): Unit = {
          writer.close()
          socket.close()
        }
      }
    }
    

    3.4.5 HBase Sink

    计算结果写入sink 两种实现方式:

    1. map算子写入 频繁创建hbase连接
    2. process写入 适合批量写入hbase

    导入HBase依赖包

            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-common</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${hbase.version}</version>
            </dependency>
    

    读取kafka数据,统计卡口流量保存至HBase数据库中

    1. HBase中创建对应的表
    create 'car_flow',{NAME => 'count', VERSIONS => 1}
    
    1. 实现代码
    import java.util.{Date, Properties}
    
    import com.msb.stream.util.{DateUtils, HBaseUtil}
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.ProcessFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.flink.util.Collector
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{HTable, Put}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.kafka.common.serialization.StringSerializer
    
    
    object HBaseSinkTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //设置连接kafka的配置信息
        val props = new Properties()
        //注意   sparkstreaming + kafka(0.10之前版本) receiver模式  zookeeper url(元数据)
        props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
        props.setProperty("group.id", "flink-kafka-001")
        props.setProperty("key.deserializer", classOf[StringSerializer].getName)
        props.setProperty("value.deserializer", classOf[StringSerializer].getName)
    
        val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props))
    
    
        stream.map(row => {
          val arr = row.split("\t")
          (arr(0), 1)
        }).keyBy(_._1)
          .reduce((v1: (String, Int), v2: (String, Int)) => {
            (v1._1, v1._2 + v2._2)
          }).process(new ProcessFunction[(String, Int), (String, Int)] {
    
          var htab: HTable = _
    
          override def open(parameters: Configuration): Unit = {
            val conf = HBaseConfiguration.create()
            conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
            val hbaseName = "car_flow"
            htab = new HTable(conf, hbaseName)
          }
    
          override def close(): Unit = {
            htab.close()
          }
    
          override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
            // rowkey:monitorid   时间戳(分钟) value:车流量
            val min = DateUtils.getMin(new Date())
            val put = new Put(Bytes.toBytes(value._1))
            put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2))
            htab.put(put)
          }
        })
        env.execute()
      }
    }
    

    3.4.6 Elasticsearch Sink

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
        <version>${flink-version}</version>
    </dependency>
    

    在主函数中调用:

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))
    
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] {
      override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        println("saving data: " + t)
        val json = new util.HashMap[String, String]()
        json.put("data", t.toString)
        val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(json)
        requestIndexer.add(indexRequest)
        println("saved successfully")
      }
    } )
    dataStream.addSink( esSinkBuilder.build() )
    

    3.5 支持的数据类型

    Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
    Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
    Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。

    3.5.1 基础数据类型

    Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …​

    val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
    numbers.map( n => n + 1 )
    

    3.5.2 Java和Scala元组(Tuples)

    val persons: DataStream[(String, Integer)] = env.fromElements( 
    ("Adam", 17), 
    ("Sarah", 23) ) 
    persons.filter(p => p._2 > 18)
    

    3.5.3 Scala样例类(case classes)

    case class Person(name: String, age: Int) 
    val persons: DataStream[Person] = env.fromElements(
    Person("Adam", 17), 
    Person("Sarah", 23) )
    persons.filter(p => p.age > 18)
    

    3.5.4 Java简单对象(POJOs)

    public class Person {
    public String name;
    public int age;
      public Person() {}
      public Person(String name, int age) { 
    this.name = name;      
    this.age = age;  
    }
    }
    DataStream<Person> persons = env.fromElements(   
    new Person("Alex", 42),   
    new Person("Wendy", 23));
    

    3.5.5 其它(Arrays, Lists, Maps, Enums, 等等)

    Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。

    3.6 实现UDF函数——更细粒度的控制流

    3.6.1 函数类(Function Classes)

    Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
    下面例子实现了FilterFunction接口:

    class FilterFilter extends FilterFunction[String] {
          override def filter(value: String): Boolean = {
            value.contains("flink")
          }
    }
    val flinkTweets = tweets.filter(new FlinkFilter)
    

    还可以将函数实现成匿名类

    val flinkTweets = tweets.filter(
    new RichFilterFunction[String] {
    override def filter(value: String): Boolean = {
    value.contains("flink")
    }
    }
    )
    

    我们filter的字符串"flink"还可以当作参数传进去。

    val tweets: DataStream[String] = ...
    val flinkTweets = tweets.filter(new KeywordFilter("flink"))
    
    class KeywordFilter(keyWord: String) extends FilterFunction[String] {
    override def filter(value: String): Boolean = {
    value.contains(keyWord)
    }
    }
    

    3.6.2 匿名函数(Lambda Functions)

    val tweets: DataStream[String] = ...
    val flinkTweets = tweets.filter(_.contains("flink"))
    

    3.6.3 富函数(Rich Functions)

    “富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

    • RichMapFunction
    • RichFlatMapFunction
    • RichFilterFunction
    • …​
      Rich Function有一个生命周期的概念。典型的生命周期方法有:
    • open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
    • close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
    • getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
    class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
    var subTaskIndex = 0
    
    override def open(configuration: Configuration): Unit = {
    subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
    // 以下可以做一些初始化工作,例如建立一个和HDFS的连接
    }
    
    override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
    if (in % 2 == subTaskIndex) {
    out.collect((subTaskIndex, in))
    }
    }
    
    override def close(): Unit = {
    // 以下做一些清理工作,例如断开和HDFS的连接。
    }
    }
    

    参考flink 官网 Flink DataStream API
    参考flink 官网 Apache Kafka Connector 1.11
    参考flink 官网 Apache Kafka Connector1.10
    参考flink 官网 Operators
    参考flink 官网 Connectors

    相关文章

      网友评论

        本文标题:Flink API

        本文链接:https://www.haomeiwen.com/subject/tcfwsktx.html