美文网首页
聊聊flink Table的Set Operations

聊聊flink Table的Set Operations

作者: go4it | 来源:发表于2019-01-30 12:45 被阅读12次

    本文主要研究一下flink Table的Set Operations

    实例

    Union

    Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    Table right = tableEnv.fromDataSet(ds2, "a, b, c");
    Table result = left.union(right);
    
    • union方法类似sql的union

    UnionAll

    Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    Table right = tableEnv.fromDataSet(ds2, "a, b, c");
    Table result = left.unionAll(right);
    
    • unionAll方法类似sql的union all

    Intersect

    Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    Table right = tableEnv.fromDataSet(ds2, "d, e, f");
    Table result = left.intersect(right);
    
    • intersect方法类似sql的intersect

    IntersectAll

    Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    Table right = tableEnv.fromDataSet(ds2, "d, e, f");
    Table result = left.intersectAll(right);
    
    • intersectAll方法类似sql的intersect all

    Minus

    Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    Table right = tableEnv.fromDataSet(ds2, "a, b, c");
    Table result = left.minus(right);
    
    • minus方法类似sql的except

    MinusAll

    Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    Table right = tableEnv.fromDataSet(ds2, "a, b, c");
    Table result = left.minusAll(right);
    
    • minusAll方法类似sql的except all

    In

    Table left = ds1.toTable(tableEnv, "a, b, c");
    Table right = ds2.toTable(tableEnv, "a");
    
    // using implicit registration
    Table result = left.select("a, b, c").where("a.in(" + right + ")");
    
    // using explicit registration
    tableEnv.registerTable("RightTable", right);
    Table result = left.select("a, b, c").where("a.in(RightTable)");
    
    • in方法类似sql的in

    Table

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    class Table(
        private[flink] val tableEnv: TableEnvironment,
        private[flink] val logicalPlan: LogicalNode) {
    
      //......
    
      def union(right: Table): Table = {
        // check that right table belongs to the same TableEnvironment
        if (right.tableEnv != this.tableEnv) {
          throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
        }
        new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
      }
    
      def unionAll(right: Table): Table = {
        // check that right table belongs to the same TableEnvironment
        if (right.tableEnv != this.tableEnv) {
          throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
        }
        new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
      }
    
      def intersect(right: Table): Table = {
        // check that right table belongs to the same TableEnvironment
        if (right.tableEnv != this.tableEnv) {
          throw new ValidationException(
            "Only tables from the same TableEnvironment can be intersected.")
        }
        new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
      }
    
      def intersectAll(right: Table): Table = {
        // check that right table belongs to the same TableEnvironment
        if (right.tableEnv != this.tableEnv) {
          throw new ValidationException(
            "Only tables from the same TableEnvironment can be intersected.")
        }
        new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
      }
    
      def minus(right: Table): Table = {
        // check that right table belongs to the same TableEnvironment
        if (right.tableEnv != this.tableEnv) {
          throw new ValidationException("Only tables from the same TableEnvironment can be " +
            "subtracted.")
        }
        new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
          .validate(tableEnv))
      }
    
      def minusAll(right: Table): Table = {
        // check that right table belongs to the same TableEnvironment
        if (right.tableEnv != this.tableEnv) {
          throw new ValidationException("Only tables from the same TableEnvironment can be " +
            "subtracted.")
        }
        new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
          .validate(tableEnv))
      }
    
      //......
    }
    
    • union及unionAll使用的是Union,intersect及intersectAll使用的是Intersect,minus及minusAll使用的是Minus

    Union

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

    case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
      override def output: Seq[Attribute] = left.output
    
      override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
        left.construct(relBuilder)
        right.construct(relBuilder)
        relBuilder.union(all)
      }
    
      override def validate(tableEnv: TableEnvironment): LogicalNode = {
        if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) {
          failValidation(s"Union on stream tables is currently not supported.")
        }
    
        val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
        if (left.output.length != right.output.length) {
          failValidation(s"Union two tables of different column sizes:" +
            s" ${left.output.size} and ${right.output.size}")
        }
        val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
          l.resultType == r.resultType
        }
        if (!sameSchema) {
          failValidation(s"Union two tables of different schema:" +
            s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
            s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
        }
        resolvedUnion
      }
    }
    
    • Union继承了BinaryNode,其construct方法通过relBuilder.union来构建union操作

    Intersect

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

    case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
      override def output: Seq[Attribute] = left.output
    
      override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
        left.construct(relBuilder)
        right.construct(relBuilder)
        relBuilder.intersect(all)
      }
    
      override def validate(tableEnv: TableEnvironment): LogicalNode = {
        if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
          failValidation(s"Intersect on stream tables is currently not supported.")
        }
    
        val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]
        if (left.output.length != right.output.length) {
          failValidation(s"Intersect two tables of different column sizes:" +
            s" ${left.output.size} and ${right.output.size}")
        }
        // allow different column names between tables
        val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
          l.resultType == r.resultType
        }
        if (!sameSchema) {
          failValidation(s"Intersect two tables of different schema:" +
            s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
            s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
        }
        resolvedIntersect
      }
    }
    
    • Intersect继承了BinaryNode,其construct方法通过relBuilder.intersect来构建intersect操作

    Minus

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

    case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
      override def output: Seq[Attribute] = left.output
    
      override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
        left.construct(relBuilder)
        right.construct(relBuilder)
        relBuilder.minus(all)
      }
    
      override def validate(tableEnv: TableEnvironment): LogicalNode = {
        if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
          failValidation(s"Minus on stream tables is currently not supported.")
        }
    
        val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus]
        if (left.output.length != right.output.length) {
          failValidation(s"Minus two table of different column sizes:" +
            s" ${left.output.size} and ${right.output.size}")
        }
        val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
          l.resultType == r.resultType
        }
        if (!sameSchema) {
          failValidation(s"Minus two table of different schema:" +
            s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
            s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
        }
        resolvedMinus
      }
    }
    
    • Minus继承了BinaryNode,其construct方法通过relBuilder.minus来构建minus操作

    小结

    • Table对Set提供了union、unionAll、intersect、intersectAll、minus、minusAll、in(in在where子句中)操作
    • union及unionAll使用的是Union,intersect及intersectAll使用的是Intersect,minus及minusAll使用的是Minus
    • Union继承了BinaryNode,其construct方法通过relBuilder.union来构建union操作;Intersect继承了BinaryNode,其construct方法通过relBuilder.intersect来构建intersect操作;Minus继承了BinaryNode,其construct方法通过relBuilder.minus来构建minus操作

    doc

    相关文章

      网友评论

          本文标题:聊聊flink Table的Set Operations

          本文链接:https://www.haomeiwen.com/subject/gboqsqtx.html