美文网首页实时计算框架Flink
Flink12:Flink流处理Api之sink

Flink12:Flink流处理Api之sink

作者: 勇于自信 | 来源:发表于2020-05-05 23:11 被阅读0次

    流处理基本步骤:


    sink

    Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作:
    stream.addSink(new MySink(xxxx))
    官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。



    1.kafka作为sink的实践

    1.1. pom.xml添加配置

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>
    

    1.2. 主函数中的sink开发

    package com.atguigu.apitest.sinktest
    
    import java.util.Properties
    
    import com.atguigu.apitest.SensorReading
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
    
    object KafkaSinkTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        // source
    //    val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "localhost:9092")
        properties.setProperty("group.id", "consumer-group")
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.setProperty("auto.offset.reset", "latest")
    
        val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
    
        // Transform操作
    
        val dataStream = inputStream
          .map(
            data => {
              val dataArray = data.split(",")
              SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString  // 转成String方便序列化输出
            }
          )
    
        // sink
        dataStream.addSink( new FlinkKafkaProducer011[String]( "sinkTest", new SimpleStringSchema(), properties) )
        dataStream.print()
    
        env.execute("kafka sink test")
      }
    }
    
    

    1.3. 启动kafka服务:
    ]# ./bin/kafka-server-start.sh config/server.properties



    1.4. 启动kafka producer发送数据:
    ]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic senser


    1.5 启动kafka consumer接收数据:
    ]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic senser --from-beginning



    1.6 测试,运行代码,在producer输入:
    1,2,3
    4,5,6
    观察运行日志:

    "D:\develop\jdk1.8 64Bit\tools\bin\java.exe" "-javaagent:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\lib\idea_rt.jar=57958:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk1.8 64Bit\tools\jre\lib\charsets.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\deploy.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\access-bridge-64.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\cldrdata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\dnsns.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jaccess.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jfxrt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\localedata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\nashorn.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunec.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunjce_provider.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunmscapi.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunpkcs11.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\zipfs.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\javaws.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jce.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfr.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfxswt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jsse.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\management-agent.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\plugin.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\resources.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\rt.jar;D:\Code\flink-study\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.11\1.7.2\flink-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.7.2\flink-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.7.2\flink-annotations-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.7.2\flink-metrics-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.7.2\flink-java-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-5.0\flink-shaded-asm-6-6.2.1-5.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.7.2\force-shading-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.7.2\flink-streaming-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.2\flink-streaming-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.2\flink-runtime_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.2\flink-queryable-state-client-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.2\flink-hadoop-fs-1.7.2.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.2\flink-clients_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.2\flink-optimizer_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.11_2.11\1.7.2\flink-connector-kafka-0.11_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.7.2\flink-connector-kafka-0.10_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.7.2\flink-connector-kafka-0.9_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.2\flink-connector-kafka-base_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\Administrator\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar" com.sink.KafkaSinkTest
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    SensorReading(1,2,3.0)
    SensorReading(4,5,6.0)
    
    

    观察 kafka consumer输出结果:



    至此,完成kafka作为sink和source的实践。

    2. Redis作为sink的实践

    2.1. pom.xml配置中添加

    <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    

    2.2. 主函数开发

    package com.sink
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
    
    // 定义样例类,传感器id,时间戳,温度
    case class SensorReading(id:String,timestamp:Long,temperature:Double)
    
    class MyRedisMapper() extends RedisMapper[SensorReading]{
      // 定义保存数据到redis的命令
      override def getCommandDescription:RedisCommandDescription={
        // 把传感器id和温度值保存成哈希表 HSET key field value
        new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
      }
      // 定义保存到redis的key
      override def getKeyFromData(t: SensorReading): String = t.id
      // 定义保存到redis的value
      override def getValueFromData(t: SensorReading): String = t.temperature.toString
    }
    object RedisSinkTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val  inputStream = env.readTextFile("data/sensor.txt");
        val dataStream =inputStream.map(x=>{
          val dataArray = x.split(",")
          SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
        })
    
        val conf = new FlinkJedisPoolConfig.Builder()
          .setHost("localhost")
          .setPort(6379)
          .build()
    
        //sink
        dataStream.addSink(new RedisSink(conf,new MyRedisMapper()))
        env.execute()
      }
    
    }
    
    

    SensorReading类在前面已经定义过,就不给出定义了,如没定义,加上以下代码:

    // 定义样例类,传感器id,时间戳,温度
    case class SensorReading(id:String,timestamp:Long,temperature:Double)
    

    2.3 启动redis服务端和客户端
    2.4 运行代码,没有报错,查看redis结果:



    从结果看到,数据被插入成功。至此,作为redis的sink开发完成。

    3. Elasticsearch作为sink的实践

    3.1. pom.xml配置中添加

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>
    

    3.2. 在主函数开发

    package com.sink
    
    import java.util
    
    import org.apache.flink.api.common.functions.RuntimeContext
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
    import org.apache.http.HttpHost
    import org.elasticsearch.client.Requests
    
    object EsSinkTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        // source
        val inputStream = env.readTextFile("data/sensor.txt")
    
        // transform
        val dataStream = inputStream
          .map(
            data => {
              val dataArray = data.split(",")
              SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
            }
          )
    
        val httpHosts = new util.ArrayList[HttpHost]()
        httpHosts.add(new HttpHost("localhost", 9200))
    
        // 创建一个esSink 的builder
        val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
          httpHosts,
          new ElasticsearchSinkFunction[SensorReading] {
            override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
              println("saving data: " + element)
              // 包装成一个Map或者JsonObject
              val json = new util.HashMap[String, String]()
              json.put("sensor_id", element.id)
              json.put("temperature", element.temperature.toString)
              json.put("ts", element.timestamp.toString)
    
              // 创建index request,准备发送数据
              val indexRequest = Requests.indexRequest()
                .index("sensor")
                .`type`("readingdata")
                .source(json)
    
              // 利用index发送请求,写入数据
              indexer.add(indexRequest)
              println("data saved.")
            }
          }
        )
    
        // sink
        dataStream.addSink( esSinkBuilder.build() )
    
        env.execute("es sink test")
      }
    }
    
    
    4. JDBC 自定义sink实践

    4.1. pom.xml配置中添加

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.44</version>
    </dependency>
    

    4.2. 主函数开发

    package com.sink
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    class MyJdbcSink() extends  RichSinkFunction[SensorReading]{
      // 定义sql连接、预编译器
      var conn: Connection = _
      var insertStmt: PreparedStatement = _
      var updateStmt: PreparedStatement = _
    
      //初始化,创建连接和预编译语句
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","111111")
        insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temperature) VALUES (?,?)")
        updateStmt = conn.prepareStatement("UPDATE temperatures SET temperature = ? WHERE sensor = ?")
      }
      // 调用连接,执行sql
      override def invoke(value: SensorReading): Unit = {
        // 执行更新语句
        updateStmt.setDouble(1,value.temperature)
        updateStmt.setString(2,value.id)
        updateStmt.execute()
        // 如果update没有查到数据,那么执行插入语句
        if(updateStmt.getUpdateCount==0){
          insertStmt.setString(1,value.id)
          insertStmt.setDouble(2,value.temperature)
          insertStmt.execute()
        }
      }
      // 关闭时做清理工作
      override def close(): Unit = {
        insertStmt.close()
        updateStmt.close()
        conn.close()
      }
    }
    
    object JdbcSinkTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        // source
        val  inputStream = env.readTextFile("data/sensor.txt");
        // transform
        val dataStream =inputStream.map(x=>{
          val dataArray = x.split(",")
          SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
        })
        // sink
        dataStream.addSink(new MyJdbcSink())
        dataStream.print("jdbc test")
        env.execute()
      }
    
    }
    
    

    4.3 打开mysql终端,创建表:
    mysql> create database test;
    Query OK, 1 row affected (0.04 sec)

    mysql> use test;
    Database changed
    mysql> create table temperatures(
    -> sensor varchar(20),
    -> temperature double);
    Query OK, 0 rows affected (0.10 sec)


    4.4 运行代码
    控制台打印如下:



    查看mysql表:



    mysql表插入数据成功,至此,mysql作为sink的开发完成。

    相关文章

      网友评论

        本文标题:Flink12:Flink流处理Api之sink

        本文链接:https://www.haomeiwen.com/subject/qyovghtx.html