美文网首页
Flink DataSink

Flink DataSink

作者: 知识海洋中的淡水鱼 | 来源:发表于2020-01-06 13:22 被阅读0次

    同DataSource一样,flink流处理和批处理也都内置了很多DataSink,可以满足部分应用场景。但平时使用的应用场景比较多,光是靠内置的DataSink完全不满足日常使用。flink也考虑到了这个问题,允许我们实现自定义的DataSink。

    1 批处理

    最简单的DataSink就是print(),平时我们在编写flink程序时进行简单测试的时候通常都会使用print()在控制台上打印处理后的结果数据。
    真正业务应用比较多的还是writeAsCsv(),writeAsText()。还可以通过继承write()中FileOutputFormat类来实现将结果数据输出到其他格式文件中。

        targetDataSet.print()
        targetDataSet.writeAsCsv("file:///xxx/Documents/batch_csv")
        targetDataSet.writeAsText("file:///xxx/Documents/batch_txt")
        targetDataSet.write(
          outputFormat: FileOutputFormat[T],
          filePath: String,
          writeMode: FileSystem.WriteMode = null)
    
    01_batch_sink.png

    2 流处理

    2.1 kafka connector

    kafka connector是flink提供给我们的自定义连接器,可以直接实例化FlinkKafkaProducer对象来将记录存放到kafka中。

    object FlinkStreamDataSink {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val original: DataStream[String] = ...
    
        // kafka sink properties参数
        val producerConf = new Properties()
        producerConf.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
        producerConf.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
        producerConf.setProperty("group.id", "leslie")
    
        // sink结果数据到kafka中
        original.addSink(new FlinkKafkaProducer010[String]("test_1", new SimpleStringSchema(), producerConf))
    
        env.execute("flink_data_sink")
      }
    }
    
    01_stream_sink_kafka.png

    2.2 自定义DataSink

    自定义DataSink和自定义DataSource一样简单,只需要继承SinkFunction接口并重写其中的invoke()方法。

    object FlinkStreamCustomerDataSink {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // kafka source properties参数
        val props = new Properties()
        props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
        props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
        props.setProperty("group.id", "leslie")
        props.setProperty("auto.offset.reset", "latest")
    
        val originalStream: DataStream[String] = env
          .addSource(new FlinkKafkaConsumer010[String](
            "test", // 被消费的kafka topic
            new SimpleStringSchema(), // 序列化
            props)) // kafka source properties参数
    
        val targetStream: DataStream[(String, String, Int)] = originalStream
          .flatMap(_.split(","))
          .map { name =>
            val sex = if (name.contains("e")) "男" else "女"
            val age = if (name.contains("e")) 23 else 18
            (name, sex, age) // 根据名字来构造人物的基本信息
          }
    
        // 自定义DataSink
        targetStream.addSink(new MysqlDataSink())
    
        env.execute("fink_consumer_data_sink")
      }
    }
    

    下文代码MysqlDataSink类继承RichSinkFunction类,实现将记录存放到mysql的目的。为了避免重复创建和销毁mysql连接,我们和自定义DataSouce一样继承"富函数",在open(),close()方法中实现连接的创建和销毁(前一篇博客Flink DataSouce中有提到open()方法仅在函数类实例化的时候调用一次,close()则是在实例对象销毁前调用一次)。

    class MysqlDataSink extends RichSinkFunction[(String, String, Int)] {
      private var pStmt: PreparedStatement = _
      private var conn: Connection = _
    
      override def open(parameters: Configuration): Unit = {
        Class.forName("com.mysql.jdbc.Driver")
        val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
        val username = "root"
        val password = "123456"
        conn = DriverManager.getConnection(url, username, password);
        val sql =
          """
            |insert into user (name, sex, age) values (?, ?, ?);
            |""".stripMargin
        pStmt = conn.prepareStatement(sql)
      }
    
      override def close(): Unit = {
        if (conn != null) conn.close()
        if (pStmt != null) pStmt.close()
      }
    
      // 主体方法,插入数据到mysql中
      override def invoke(value: (String, String, Int)): Unit = {
        pStmt.setString(1, value._1)
        pStmt.setNString(2, value._2)
        pStmt.setInt(3, value._3)
        pStmt.execute()
      }
    }
    
    02_stream_sink_customer.png

    最后:
    上边就是Flink DataSink的介绍部分。当然DataSink相关的知识并不只有这么一点点,此文只是些基础知识,各位老哥们以后遇到具体的场景具体处理。
    下一篇博文将会把这两次提到的富函数给大家说说,也会通过富函数的讲解把flink的有状态编程也说说。

    相关文章

      网友评论

          本文标题:Flink DataSink

          本文链接:https://www.haomeiwen.com/subject/ywfkactx.html