美文网首页
Flink DataSouce

Flink DataSouce

作者: 知识海洋中的淡水鱼 | 来源:发表于2020-01-05 22:04 被阅读0次

    flink流处理和批处理都内置了很多数据源,可以满足我们大部分使用场景,当然也可以通过实现flink提供的接口来实现其他数据源的接入。接下来我们就分别来了解下flink批处理和流处理的数据源吧。

    1 批处理

    批处理的数据源主要大致分为两类:集合数据源、文件数据源。

    sequence

    集合数据源之迭代类。平时我们所使用到的scala中Seq(),Array(),List(),Map(),Set()等都是迭代类。

        env.fromCollection(data: Iterable[T])
        // 示例
        env.fromCollection(Seq(1, 2, 3))
        env.fromCollection(Array(1, 2, 3, 4, 5))
        env.fromCollection(List(1, 2, 3, 4, 5))
        env.fromCollection(Map("a" -> 1, "b" -> 2, "c" -> 3))
        env.fromCollection(Set(3, 5, 6))
    
        // 并行版本
        env.fromParallelCollection(iterator: SplittableIterator[T])
        // 示例
        env.fromParallelCollection(new NumberSequenceIterator(3, 19))
    
    01_parallel_collection.png

    集合类型源之元素数组。类似于java中的可变参数列表。

        env.fromElements(data: T*)
        // 示例
        env.fromElements(1, 2, 3)
        env.fromElements("a", "b", "c")
    

    file

    一般来说,我们的批处理程序的数据源主要都是文件数据源,包括本地文件系统中的和分布式文件系统中的文件。使用频次比较高的是readTextFile()和readCsvFile(),当然也可以通过重写readFlie(new FileInputStream[T], filePath: String)中的FileInputFormat来读取不同格式文件。

        env.readFile(new FileInputStream[T], filePath: String)
        env.readTextFile(filePath: String, charsetName: String = "UTF-8")
        env.readTextFileWithValue(filePath: String, charsetName: String = "UTF-8")
        env.readFileOfPrimitives(filePath: String, delimiter: String = "\n")
        env.readCsvFile(
          filePath: String,                 // csv文件地址
          lineDelimiter: String = "\n",     // 行划分符号
          fieldDelimiter: String = ",",     // 字段划分符号
          quoteCharacter: Character = null,
          ignoreFirstLine: Boolean = false, // 忽略首行
          ignoreComments: String = null,
          lenient: Boolean = false,         // 是否对数据严格判断。false表示严格判断,缺失的数据则会被忽略
          includedFields: Array[Int] = null,
          pojoFields: Array[String] = null
        )
    

    readFile,自定义读取文件内容的逻辑

        env.readFile(new MyFileInputFormat[String](), getPath("words.txt"))
    
    // 重写读取文件的逻辑
    class MyFileInputFormat extends FileInputFormat[String]() {
      private var end = false
      private var input: BufferedReader = _
    
      // 只执行一次,此处创建输入流对象
      override def open(fileSplit: FileInputSplit): Unit = {
        val inputStream = new FileInputStream(this.getFilePath.getPath)
        input = new BufferedReader(new InputStreamReader(inputStream))
      }
    
      // 判断是否读取到了文件尾部
      override def reachedEnd(): Boolean = this.end
    
      // 读取下一行的操作逻辑
      override def nextRecord(ot: String): String = {
        val str: String = input.readLine()
        if (str == null) {
          this.end = true
          ""
        } else str
      }
    }
    

    2 流处理

    flink流处理的数据源大致分为四种:集合数据源、文件数据源、套接字数据源、自定义数据源。flink还原生给我们写好了kafka数据源,这是实时流处理中使用的最频繁没有之一的数据源(下边会单独简单展示一下)。

    sequence

    不用多说了吧,完全参考批处理中的sequence。

        env.fromCollection(data: Iterable[T])
        env.fromElements(data: T*)
        env.fromParallelCollection(iterator: SplittableIterator[T])
    

    file

    一样参考批处理中的flie。只是有一点儿不同的是流处理中readFile()还能监视文件的变更状况来进行更多的处理方式,如文件新增了记录,可以重新处理文件或者直接退出。

        env.readTextFile(filePath: String, charsetName: String)
        env.readFile(
          inputFormat: FileInputFormat[T],
          filePath: String,
          watchType: FileProcessingMode,    // 监视路径并响应新数据,或处理一次并退出
          interval: Long)                   // 间隔时间millis
    

    socket

    直接监视指定机子socket端口的记录。

        env.socketTextStream(
          hostname: String, 
          port: Int, 
          delimiter: Char = '\n', 
          maxRetry: Long = 0)               // 如果端口监听中断,最大重试间隔时间
    

    自定义

    当上边所有的数据源都满足不了我们的场景需求时,我们可以通过继承flink暴露的SourceFunction来实现自己的数据源(下文会展示自定义MySQL数据源)。

        env.addSource(function: SourceContext[T] => Unit)
        env.addSource(function: SourceFunction[T])
    

    3 kafka connector

    kafka connector是flink提供给我们的自定义连接器,可以直接实例化FlinkKafkaConsumer对象来消费kafka中记录。

    object FlinkStreamDataSource {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        // kafka properties参数
        val props = new Properties()
        props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
        props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
        props.setProperty("group.id", "leslie")
        props.setProperty("auto.offset.reset", "latest")
    
        val original: DataStream[String] = env
          .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), props))
    
        original
          .flatMap(_.split(","))
          .map(_ + "_test")
          .print()
    
        env.execute("flink_streaming_data_source")
      }
    }
    
    01_datasource_kafka.png

    4 自定义DataSource

    想要实现自定义的数据源十分简单,只需继承flink的SourceFunction接口并重写其中的run(),cancel()方法。

    object FlinkStreamCustomerDataSource {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        // 自定义dataSource
        env.addSource(new MysqlDataSource).print()
    
        env.execute("customer_date_source")
      }
    }
    

    下文代码MysqlDataSource类是继承了RichSourceFunction类,RichSourceFunction实现SourceFunction接口并同时继承AbstractRichFunction抽象类,AbstractRichFunction抽象类又实现RichFunction接口。为什么我们的自定义数据源要多继承RichSourceFunction类呢?原因就在这个Rich上!我们将这样带有"Rich"前缀的函数类称为富函数,既然是富函数了那么一定是比普通的函数多给我们带来一些功能。RichSourceFunction中的open(),close()就是“富”出来的方法,open()方法仅在函数类实例化的时候调用一次(通常用来建立连接),close()则是在实例对象销毁前调用一次(通常用来关闭连接),可以避免重复进行创建连接销毁连接操作。(当然富函数不仅仅只“富”了这么一点点,还“富”出来运行时上下文,这可是个好东西。此处不扩展哦,以后用到再来讨论)

    02_rich_function.png

    我们需要进行获取外部存储组件数据的操作就在SourceFunction的run(),cancel()方法中实现。

    class MysqlDataSource extends RichSourceFunction[String] {
      private var pStmt: PreparedStatement = _
      private var conn: Connection = _
    
      // 开始方法,只执行一次,建立和mysql的连接
      override def open(parameters: Configuration): Unit = {
        Class.forName("com.mysql.jdbc.Driver")
        val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
        val username = "root"
        val password = "123456"
        conn = DriverManager.getConnection(url, username, password);
        val sql =
          """
            |select id, name, sex, age from user;
            |""".stripMargin
        pStmt = conn.prepareStatement(sql)
      }
    
      // 结束方法,只执行一次,关闭连接
      override def close(): Unit = {
        // 关闭连接
        if (pStmt != null) pStmt.close()
        if (conn != null) conn.close()
      }
    
      // 主体执行方法
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        val rs: ResultSet = pStmt.executeQuery()
        while (rs.next()) {
          val id: Int = rs.getInt("id")
          val name: String = rs.getString("name")
          val sex: String = rs.getString("sex")
          val age: Int = rs.getInt("age")
    
          ctx.collect(s"id: $id, name: $name, sex: $sex, age:$age") // 收集记录到上下文中
        }
      }
      override def cancel(): Unit = {}
    }
    
    02_customer_mysql.png

    相关文章

      网友评论

          本文标题:Flink DataSouce

          本文链接:https://www.haomeiwen.com/subject/jsncactx.html