Flink DataSet API

作者: Alex90 | 来源:发表于2019-01-23 20:09 被阅读0次

    Data Source

    Data Source 创建初始数据集。Flink 附带了几种内置输入格式,可以从通用文件格式创建数据集。ExecutionEnvironment 上有创建的方法。

    基于文件的:

    • readTextFile(path) / TextInputFormat,按行读取文件并将其作为字符串返回。

    • readTextFileWithValue(path) / TextValueInputFormat,按行读取文件并将其作为 StringValues 返回。StringValues是可变字符串。

    • readCsvFile(path) / CsvInputFormat,按行读取文件,解析逗号(或其他字符)分隔,并返回元组或对象的数据集。

    • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat,解析基本数据类型(字符串或整数)的文件,以换行符(或其他字符)分隔。

    • readSequenceFile(Key, Value, path) / SequenceFileInputFormat,从指定路径读取解析 SequenceFile,并返回 <key, value> 元组。

    基于集合:

    • fromCollection(Seq),用对象序列创建数据集。集合中的所有元素必须属于同一类型。

    • fromCollection(Iterator),用迭代器创建数据集。指定迭代器返回的元素的数据类型。

    • fromElements(elements: _*),根据给定的对象序列创建数据集。所有对象必须属于同一类型。

    • fromParallelCollection(SplittableIterator),并行地从迭代器创建数据集。指定迭代器返回的元素的数据类型。

    • generateSequence(from, to),并行生成给定间隔的数字序列。

    通用:

    • readFile(inputFormat, path) / FileInputFormat,接受文件输入格式。

    • createInput(inputFormat) / InputFormat,接受通用输入格式。

    示例代码:

    val env  = ExecutionEnvironment.getExecutionEnvironment
    
    // read text file from local files system
    val localLines = env.readTextFile("file:///path/to/my/textfile")
    
    // read text file from a HDFS running at nnHost:nnPort
    val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
    
    // read a CSV file with three fields
    val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")
    
    // read a CSV file with five fields, taking only two of them
    val csvInput = env.readCsvFile[(String, Double)](
      "hdfs:///the/CSV/file",
      includedFields = Array(0, 3)) // take the first and the fourth field
    
    // CSV input can also be used with Case Classes
    case class MyCaseClass(str: String, dbl: Double)
    val csvInput = env.readCsvFile[MyCaseClass](
      "hdfs:///the/CSV/file",
      includedFields = Array(0, 3)) // take the first and the fourth field
    
    // read a CSV file with three fields into a POJO (Person) with corresponding fields
    val csvInput = env.readCsvFile[Person](
      "hdfs:///the/CSV/file",
      pojoFields = Array("name", "age", "zipcode"))
    
    // create a set from some given elements
    val values = env.fromElements("Foo", "bar", "foobar", "fubar")
    
    // generate a number sequence
    val numbers = env.generateSequence(1, 10000000)
    
    // read a file from the specified path of type SequenceFileInputFormat
    val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
     "hdfs://nnHost:nnPort/path/to/file")
    

    读压缩文件

    Flink 目前支持输入文件的透明解压缩,如果文件标有适当的文件扩展名。这意味着不需要进一步配置输入格式,并且任何 FileInputFormat 支持压缩,包括自定义输入格式。压缩文件可能无法并行读取,从而影响作业可伸缩性。当前支持的压缩方法:

    压缩方法 文件扩展名 可并行
    DEFLATE .deflate no / not
    GZip .gz,.gzip no / not
    Bzip2 .bz2 no / not
    XZ .xz no / not

    Transformation

    DataSet 的基础算子与 DataStream 算子大致相同,可以互作参考:

    DataSet

    Map
    一个数据元生成一个新的数据元

    data.map { x => x.toInt }
    

    FlatMap
    一个数据元生成多个数据元(可以为0)

    data.flatMap { str => str.split(" ") }
    

    MapPartition
    函数处理包含一个分区所有数据的“迭代器”,可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和先前的算子操作。

    data.mapPartition { in => in map { (_, 1) } }
    

    Filter
    每个数据元执行布尔函数,只保存函数返回 true 的数据元。

    data.filter { _ > 1000 }
    

    Distinct
    对数据集中的元素除重并返回新的数据集。

    val input: DataSet[(Int, String, Double)] = // [...]
    val output = input.distinct()
    
    // Distinct with field position keys
    val input: DataSet[(Int, Double, String)] = // [...]
    val output = input.distinct(0,2)
    
    // Distinct with KeySelector function
    val input: DataSet[Int] = // [...]
    val output = input.distinct {x => Math.abs(x)}
    
    // Distinct with key expression
    case class CustomType(aName : String, aNumber : Int) { }
    
    val input: DataSet[CustomType] = // [...]
    val output = input.distinct("aName", "aNumber")
    

    Reduce
    作用于整个 DataSet,合并该数据集的元素。

    val intNumbers = env.fromElements(1,2,3)
    // 输出 6
    val sum = intNumbers.reduce (_ + _)
    

    Aggregate
    对一组数据求聚合值,聚合可以应用于完整数据集或分组数据集。聚合转换只能应用于元组(Tuple)数据集,并且仅支持字段位置键进行分组。有一些常用的聚合算子,提供以下内置聚合函数(Aggregations):

    • Sum
    • Min
    • Max
    val input: DataSet[(Int, String, Double)] = env.fromElements(
      (1, "a", 10d), (1, "b", 20d), (2, "a", 30d), (3, "c", 50d)
    )
    
    val output: DataSet[(Int, String, Double)] = input.aggregate(Aggregations.SUM, 0).aggregate(Aggregations.MIN, 2)
    
    // 输出 (7,c,50.0)
    
    
    // 简化语法
    val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
    

    MinBy / MaxBy
    取元组数据集中指定一个或多个字段的值最小(最大)的元组,可以应用于完整数据集或分组数据集。用于比较的字段必须可比较的。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。

    val input: DataSet[(Int, String, Double)] = env.fromElements(
      (1, "b", 20d), (1, "a", 10d), (2, "a", 30d)
    )
    
    // 比较元组的第一个字段
    val output: DataSet[(Int, String, Double)] = input.minBy(0)
    // 输出 (1,b,20.0)
    
    
    // 比较元组的第一、三个字段
    val output: DataSet[(Int, String, Double)] = input.minBy(0,2)
    // 输出 (1,a,10d)
    

    Grouped DataSet

    Grouped DataSet 方法 groupBy() 用来将数据分组,有多种数据分组方法:

    • 对于 Pojo 类型,可以根据 KeyExpression 或 KeySelector 分区
    class WC(val word: String, val count: Int) {
      def this() {
        this(null, -1)
      }
      // [...]
    }
    
    val words: DataSet[WC] = // [...]
    
    // Grouped by Key Expression
    val wordCounts1 = words.groupBy("word")
    
    // Grouped by KeySelector Function
    val wordCounts2 = words.groupBy { _.word } 
    
    • 对于元组(Tuple)类型,可以根据字段位置分组
    val tuples = DataSet[(String, Int, Double)] = // [...]
    
    // 根据元组的第一和第二个字段分组
    val reducedTuples = tuples.groupBy(0, 1)
    
    • 分组并排序:
    val input: DataSet[(Int, String)] = // [...]
    val output = input.groupBy(0).sortGroup(1, Order.ASCENDING)
    

    Reduce
    作用于整个分组元素,合并该组的所有元素。

    val words: DataSet[WC] = // [...]
    val wordCounts = words.groupBy("word").reduce {
      (w1, w2) => new WC(w1.word, w1.count + w2.count)
    }
    

    ReduceGroup
    通过将此数据集中的所有元素传递给函数,创建一个新的数据集。该函数可以使用收集器输出零个或多个元素。也可以作用与完整数据集,迭代器会返回完整数据集的元素。

    val input: DataSet[(Int, String)] = env.fromElements((1, "a"), (1, "b"), (2, "a"), (3, "c"))
    val output = input.groupBy(0).reduceGroup {
        (in, out: Collector[(Int, String)]) =>
            out.collect(in.reduce((x, y) => (x._1, x._2 + y._2)))
    }
    
    // 输出 (3,c),(1,ab),(2,a)
    

    Aggregate
    聚合可以应用于分组数据集。

    val input: DataSet[(Int, String, Double)] = env.fromElements(
          (1, "a", 10d), (1, "b", 20d), (2, "a", 30d), (3, "c", 50d)
        )
    
    val output: DataSet[(Int, String, Double)] = input.groupBy(1).sum(0).max(2)
    
    // 输出 (3,a,50.0)
    

    MinBy / MaxBy
    可以应用于分组数据集

     val env = ExecutionEnvironment.getExecutionEnvironment
    val input: DataSet[(Int, String, Double)] = env.fromElements(
      (1, "b", 20d), (1, "a", 10d), (2, "a", 30d)
    )
    
    // 按第二个字段分组,取第一、三个字段最小的元组
    val output: DataSet[(Int, String, Double)] = input.groupBy(1)
      .minBy(0, 2)
    
    // 输出 (1,a,10.0),(1,b,20.0)
    

    Join

    将两个 DataSet 连接生成一个新的 DataSet。

    Join
    两个数据集指定的要连接的 key,进行 join,默认是一个 inner join。可以使用 JoinFunction 将该组连接元素转化为单个元素,也可以使用 FlatJoinFunction 将该组元素转化为任意多个元素(包括 none)。

    // where("0") 表示使用input1的第一个字段连接
    // equalTo("1") 表示使用input2的第二个字段,判断等于input1的第一个字段的值
    val result = input1.join(input2).where(0).equalTo(1)
    

    可以通过 JoinHint 参数来指定运行时执行连接的方式。参数描述了 join 是通过分区(partitioning)还是广播(broadcasting)发生的,以及使用算法是基于排序(sort-based)的还是基于哈希(hash-based)的。如果没有指定 JoinHint,系统将尝试对输入大小进行评估,并根据这些评估选择最佳策略。

    类似 Spark SQL 的 join 逻辑,会根据要连接的两个数据的大小,进行优化。如果不是非常了解那种连接方式在什么场景下更优,建议由系统选择,不要指定。

    // 广播input1,并使用 hash table 的方式
    val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
      .where(0).equalTo(1)
      
    // JoinHint 可选项:
    // OPTIMIZER_CHOOSES,由系统判断选择
    // BROADCAST_HASH_FIRST,第一个数据集构建哈希表并广播,由第二个表扫描。适用于第一个数据集较小的情况
    // BROADCAST_HASH_SECOND,适用于第二个数据集较小的情况
    // REPARTITION_HASH_FIRST,对两个数据同时进行分区,并从第一个输入构建哈希表。如果第一个输入小于第二个输入,则此策略很好。 
    // REPARTITION_HASH_SECOND,适用于第二个输入小于第一个输入。
    // REPARTITION_SORT_MERGE,对两个数据同时进行分区,并对每个输入进行排序(除非数据已经分区或排序)。输入通过已排序输入的流合并来连接。如果已经对一个或两个输入进行过分区排序的情况,则此策略很好。
    
    // REPARTITION_HASH_FIRST 是系统使用的默认回退策略,如果不能进行大小估计,并且不能重新使用预先存在的分区和排序顺序。
    

    为了引导优化器选择正确的执行策略,可以提示要关联的 DataSet 的大小:

    val input1: DataSet[(Int, String)] = // [...]
    val input2: DataSet[(Int, String)] = // [...]
    
    // 表示第二个数据集 input2 特别小
    val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)
    
    // 表示第二个数据集 input2 特别大
    val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
    

    对连接成功的数据有两种处理方式(类似 map 和 flatMap),以下面两个数据集连接为例:

    case class Rating(name: String, category: String, points: Int)
        
    val ratings: DataSet[Ratings] = // [...]
    val weights: DataSet[(String, Double)] = // [...]
    
    • Join with Join Function,接收第一个数据集的一个数据元和第二个数据集的一个数据元,并返回一个数据元
      val weightedRatings = ratings
          .join(weights)
          .where("category")
          .equalTo(0) {
              (rating, weight) => (rating.name, rating.points * weight._2)
          }
      
    • Join with Flat-Join Function,返回零个、一个或多个数据元
      val weightedRatings = ratings
          .join(weights)
          .where("category")
          .equalTo(0) {
              (rating, weight, out: Collector[(String, Double)]) =>
                  if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
          }
      

    ⚠️ Join 仅适用于等于连接的情况,其他连接类型需要使用 OuterJoin 或 CoGroup。

    OuterJoin
    多两个数据集执行左连接(leftOuterJoin)、右连接(rightOuterJoin)或全外连接(fullOuterJoin)。与 Join(inner join)的区别在于,如果在另一侧没有找到匹配的数据,则保存左侧(或右侧、两侧)的记录。

    input1.leftOuterJoin(input2)
          .where(0)              
          .equalTo(1)            
          .with(new JoinFunction<String, String, String>() {
              public String join(String v1, String v2) {
                 // NOTE:
                 // - v2 might be null for leftOuterJoin
                 // - v1 might be null for rightOuterJoin
                 // - v1 OR v2 might be null for fullOuterJoin
              }
          });
    

    CoGroup
    Reduce 操作的二维变体。对一个或多个字段中的每个输入进行分组,然后加入组。每对组调用转换函数。

    val iVals: DataSet[(String, Int)] = env.fromElements(("a", 10), ("b", 20), ("a", 30))
    val dVals: DataSet[(String, Double)] = env.fromElements(("a", 1.0), ("b", 2.0), ("c", 3.0))
    
    // iVals 第一个字段与 dVals 第一个字段连接
    val output: DataSet[Double] = iVals.coGroup(dVals).where(0).equalTo(0) {
      (iVals, dVals, out: Collector[Double]) =>
        // iVals [("a",10),("a",30)]
        val ints = iVals map {
          _._2
        } toSet
    
        // dVals [("a", 1.0)]
        for (dVal <- dVals) {
          for (i <- ints) {
            out.collect(dVal._2 * i)
          }
        }
    }
    
    // 输出 10.0,30.0,40.0
    

    Union
    构建两个数据集的并集。

    data.union(data2)
    

    Cross
    构建两个输入数据集的笛卡尔积。可选择使用 CrossFunction 将元素对转换为单个元素。

    val data1: DataSet[Int] = // [...]
    val data2: DataSet[String] = // [...]
    val result: DataSet[(Int, String)] = data1.cross(data2)
    

    Cross 是一个计算密集型操作,对大型数据集会带来挑战。建议使用 crossWithTiny()crossWithHuge() 优化。

    Repartition

    Rebalance
    均匀地重新负载数据集的并行分区以消除数据偏差。后面只可以接类似 map 的算子操作。

    val in: DataSet[String] = // [...]
    val out = in.rebalance().map { ... }
    

    Hash-Partition
    根据给定的 key 对数据集做 hash 分区。可以是 position keys,expression keys 或者 key selector functions。

    val in: DataSet[(String, Int)] = // [...]
    val out = in.partitionByHash(0).mapPartition { ... }
    

    Range-Partition
    根据给定的 key 对一个数据集进行 Range 分区。可以是 position keys,expression keys 或者 key selector functions。

    val in: DataSet[(Int, String)] = // [...]
    val result = in.partitionByRange(0).mapPartition { ... }
    

    Custom Partitioning
    手动指定数据分区。此方法仅适用于单个字段的 key。

    val in: DataSet[(Int, String)] = // [...]
    val result = in.partitionCustom(partitioner: Partitioner[K], key)
    

    其他

    Sort Partition
    本地以指定的顺序在指定的字段上对数据集的所有分区进行排序。可以指定 field position 或 filed expression。

    val in: DataSet[(Int, String)] = // [...]
    val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
    

    First-n
    返回数据集的前n个元素。可以应用于任意数据集。

    val in: DataSet[(Int, String)] = // [...]
    // DataSet
    val result1 = in.first(3)
    // Grouped DataSet
    val result2 = in.groupBy(0).first(3)
    // Grouped-sorted DataSet
    val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
    

    Project
    Java API 支持,Scala API 不支持,作用于元组的转换,从元组中选择字段的子集。

    DataSet<Tuple3<Integer, Double, String>> in = // [...]
    // converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
    DataSet<Tuple2<String, Integer>> out = in.project(2,0);
    

    Data Sink

    Data Sink 从 DataSet 中取出数据保存或者返回。Flink 各种内置的输出格式,在 DataSet 上的算子操作后面调用:

    • writeAsText() / TextOutputFormat,将元素以字符串形式写入文件。字符串通过调用每个元素的 toString() 方法获得。

    • writeAsCsv(...) / CsvOutputFormat,将元组字段以逗号分隔写入文件。行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。

    • print() / printToErr(),在标准输出/标准错误输出中打印每个元素的 toString() 返回值。

    • write() / FileOutputFormat,自定义文件输出的方法和基类。支持自定义对象到字节转换。

    • output() / OutputFormat,通用输出方法,用于非基于文件的数据接收器。

    示例代码:

    // text data
    val textData: DataSet[String] = // [...]
    
    // write DataSet to a file on the local file system
    textData.writeAsText("file:///my/result/on/localFS")
    
    // write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
    textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")
    
    // write DataSet to a file and overwrite the file if it exists
    textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)
    
    // tuples as lines with pipe as the separator "a|b|c"
    val values: DataSet[(String, Int, Double)] = // [...]
    values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
    
    // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
    values.writeAsText("file:///path/to/the/result/file")
    
    // this writes values as strings using a user-defined formatting
    values map { tuple => tuple._1 + " - " + tuple._2 }
      .writeAsText("file:///path/to/the/result/file")
    

    本地排序输出

    可以使用元组字段位置(field position)或字段表达式(field expression)在指定字段上对数据接收器的输出进行本地排序。这适用于每种输出格式。

    尚不支持全局排序的输出。

    val tData: DataSet[(Int, String, Double)] = // [...]
    val pData: DataSet[(BookPojo, Double)] = // [...]
    val sData: DataSet[String] = // [...]
    
    // sort output on String field in ascending order
    tData.sortPartition(1, Order.ASCENDING).print()
    
    // sort output on Double field in descending and Int field in ascending order
    tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()
    
    // sort output on the "author" field of nested BookPojo in descending order
    pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)
    
    // sort output on the full tuple in ascending order
    tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)
    
    // sort atomic type (String) output in descending order
    sData.sortPartition("_", Order.DESCENDING).writeAsText(...)
    

    Iteration

    迭代在 Flink 程序中实现循环。迭代运算符封装了程序的一部分并重复执行,将一次迭代的结果(部分结果)反馈到下一次迭代中。Flink 两种迭代类型:==BulkIteration== 和 ==DeltaIteration==。

    批量迭代(Bulk Iteration)

    调用 DataSet 的 iterate(int) 方法创建一个 BulkIteration,迭代以此为起点,返回一个 IterativeDataSet,可以用常规运算符进行转换。迭代调用的参数 int 指定最大迭代次数。

    IterativeDataSet 调用 closeWith(DataSet) 方法来指定哪个转换应该反馈到下一个迭代,可以选择使用 closeWith(DataSet,DataSet) 指定终止条件。如果该 DataSet 为空,则它将评估第二个 DataSet 并终止迭代。如果没有指定终止条件,则迭代在给定的最大次数迭代后终止。

    以下示例迭代地估计数量Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果此点位于单位圆内,增加计数。然后估计 Pi 作为结果计数除以迭代次数乘以4。

    val env = ExecutionEnvironment.getExecutionEnvironment()
    
    val initial = env.fromElements(0)
    
    val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
      val result = iterationInput.map { i =>
        val x = Math.random()
        val y = Math.random()
        i + (if (x * x + y * y < 1) 1 else 0)
      }
      result
    }
    
    val result = count map { c => c / 10000.0 * 4 }
    
    result.print()
    
    env.execute("Iterative Pi Example")
    

    可以查看 K-Means示例,该示例使用 BulkIteration 来聚类一组未标记的点。

    增量迭代(Delta Iteration)

    DeltaIteration 利用了某些算法在每次迭代中不会更改解的每个数据点的特点。

    除了在每次迭代中反馈的部分解决方案之外,还在迭代中维护状态,可以通过增量更新。迭代计算的结果是最后一次迭代之后的状态。参考 迭代的基本原理

    定义 DeltaIteration 类似于定义 BulkIteration。两个数据集构成每次迭代的输入(工作集和解集),并且在每次迭代中生成两个数据集作为结果(新工作集,增量解集)。

    调用初始解决方案集的 iterateDelta(initialWorkset, maxIterations, key) 方法创建一个 DeltaIteration:

    val initialSolutionSet: DataSet[(Long, Double)] = // [...]
    
    val initialWorkset: DataSet[(Long, Double)] = // [...]
    
    val maxIterations = 100
    val keyPosition = 0
    
    val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
      (solution, workset) =>
        val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
        val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())
    
        val nextWorkset = deltas.filter(new FilterByThreshold())
    
        (deltas, nextWorkset)
    }
    
    result.writeAsCsv(outputPath)
    
    env.execute()
    

    Reference:
    https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/

    相关文章

      网友评论

        本文标题:Flink DataSet API

        本文链接:https://www.haomeiwen.com/subject/txisjqtx.html