美文网首页
聊聊flink Table的Joins

聊聊flink Table的Joins

作者: go4it | 来源:发表于2019-01-29 10:30 被阅读36次

    本文主要研究一下flink Table的Joins

    实例

    Inner Join

    Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    Table right = tableEnv.fromDataSet(ds2, "d, e, f");
    Table result = left.join(right).where("a = d").select("a, b, e");
    
    • join方法即inner join

    Outer Join

    Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    Table right = tableEnv.fromDataSet(ds2, "d, e, f");
    
    Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
    Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
    Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
    
    • outer join分为leftOuterJoin、rightOuterJoin、fullOuterJoin三种

    Time-windowed Join

    Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
    Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
    
    Table result = left.join(right)
      .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
      .select("a, b, e, ltime");
    
    • time-windowed join需要至少一个等值条件,然后还需要一个与两边时间相关的条件(可以使用<, <=, >=, >)

    Inner Join with Table Function

    // register User-Defined Table Function
    TableFunction<String> split = new MySplitUDTF();
    tableEnv.registerFunction("split", split);
    
    // join
    Table orders = tableEnv.scan("Orders");
    Table result = orders
        .join(new Table(tableEnv, "split(c)").as("s", "t", "v"))
        .select("a, b, s, t, v");
    
    • Table也可以跟table function进行inner join,如果table function返回空,则table的记录被丢弃

    Left Outer Join with Table Function

    // register User-Defined Table Function
    TableFunction<String> split = new MySplitUDTF();
    tableEnv.registerFunction("split", split);
    
    // join
    Table orders = tableEnv.scan("Orders");
    Table result = orders
        .leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v"))
        .select("a, b, s, t, v");
    
    • Table也可以跟table function进行left outer join,如果table function返回空,则table的记录保留,空的部分为null值

    Join with Temporal Table

    Table ratesHistory = tableEnv.scan("RatesHistory");
    
    // register temporal table function with a time attribute and primary key
    TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
        "r_proctime",
        "r_currency");
    tableEnv.registerFunction("rates", rates);
    
    // join with "Orders" based on the time attribute and key
    Table orders = tableEnv.scan("Orders");
    Table result = orders
        .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
    
    • Table也可以跟Temporal tables进行join,Temporal tables通过Table的createTemporalTableFunction而来,目前仅仅支持inner join的方式

    Table

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    class Table(
        private[flink] val tableEnv: TableEnvironment,
        private[flink] val logicalPlan: LogicalNode) {
      //......
    
      def join(right: Table): Table = {
        join(right, None, JoinType.INNER)
      }
    
      def join(right: Table, joinPredicate: String): Table = {
        join(right, joinPredicate, JoinType.INNER)
      }
    
      def join(right: Table, joinPredicate: Expression): Table = {
        join(right, Some(joinPredicate), JoinType.INNER)
      }
    
      def leftOuterJoin(right: Table): Table = {
        join(right, None, JoinType.LEFT_OUTER)
      }
    
      def leftOuterJoin(right: Table, joinPredicate: String): Table = {
        join(right, joinPredicate, JoinType.LEFT_OUTER)
      }
    
      def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
        join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
      }
    
      def rightOuterJoin(right: Table, joinPredicate: String): Table = {
        join(right, joinPredicate, JoinType.RIGHT_OUTER)
      }
    
      def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
        join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
      }
    
      def fullOuterJoin(right: Table, joinPredicate: String): Table = {
        join(right, joinPredicate, JoinType.FULL_OUTER)
      }
    
      def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
        join(right, Some(joinPredicate), JoinType.FULL_OUTER)
      }
    
      private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
        val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
        join(right, Some(joinPredicateExpr), joinType)
      }
    
      private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
    
        // check if we join with a table or a table function
        if (!containsUnboundedUDTFCall(right.logicalPlan)) {
          // regular table-table join
    
          // check that the TableEnvironment of right table is not null
          // and right table belongs to the same TableEnvironment
          if (right.tableEnv != this.tableEnv) {
            throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
          }
    
          new Table(
            tableEnv,
            Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
              .validate(tableEnv))
    
        } else {
          // join with a table function
    
          // check join type
          if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) {
            throw new ValidationException(
              "TableFunctions are currently supported for join and leftOuterJoin.")
          }
    
          val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall]
          val udtfCall = LogicalTableFunctionCall(
            udtf.functionName,
            udtf.tableFunction,
            udtf.parameters,
            udtf.resultType,
            udtf.fieldNames,
            this.logicalPlan
          ).validate(tableEnv)
    
          new Table(
            tableEnv,
            Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true)
              .validate(tableEnv))
        }
      }
    
      //......
    }
    
    • Table定义了join、leftOuterJoin、rightOuterJoin、fullOuterJoin方法,其最后都是调用的私有的join方法,其中JoinType用于表达join类型,分别有INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER这几种;另外接收String类型或者Expression的条件表达式,其中String类型最后是被解析为Expression类型;join方法最后是使用Join创建了新的Table

    Join

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

    case class Join(
        left: LogicalNode,
        right: LogicalNode,
        joinType: JoinType,
        condition: Option[Expression],
        correlated: Boolean) extends BinaryNode {
    
      override def output: Seq[Attribute] = {
        left.output ++ right.output
      }
    
      private case class JoinFieldReference(
        name: String,
        resultType: TypeInformation[_],
        left: LogicalNode,
        right: LogicalNode) extends Attribute {
    
        val isFromLeftInput: Boolean = left.output.map(_.name).contains(name)
    
        val (indexInInput, indexInJoin) = if (isFromLeftInput) {
          val indexInLeft = left.output.map(_.name).indexOf(name)
          (indexInLeft, indexInLeft)
        } else {
          val indexInRight = right.output.map(_.name).indexOf(name)
          (indexInRight, indexInRight + left.output.length)
        }
    
        override def toString = s"'$name"
    
        override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          // look up type of field
          val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType
          // create a new RexInputRef with index offset
          new RexInputRef(indexInJoin, fieldType)
        }
    
        override def withName(newName: String): Attribute = {
          if (newName == name) {
            this
          } else {
            JoinFieldReference(newName, resultType, left, right)
          }
        }
      }
    
      override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
        val node = super.resolveExpressions(tableEnv).asInstanceOf[Join]
        val partialFunction: PartialFunction[Expression, Expression] = {
          case field: ResolvedFieldReference => JoinFieldReference(
            field.name,
            field.resultType,
            left,
            right)
        }
        val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction))
        Join(node.left, node.right, node.joinType, resolvedCondition, correlated)
      }
    
      override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
        left.construct(relBuilder)
        right.construct(relBuilder)
    
        val corSet = mutable.Set[CorrelationId]()
        if (correlated) {
          corSet += relBuilder.peek().getCluster.createCorrel()
        }
    
        relBuilder.join(
          convertJoinType(joinType),
          condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
          corSet.asJava)
      }
    
      private def convertJoinType(joinType: JoinType) = joinType match {
        case JoinType.INNER => JoinRelType.INNER
        case JoinType.LEFT_OUTER => JoinRelType.LEFT
        case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
        case JoinType.FULL_OUTER => JoinRelType.FULL
      }
    
      private def ambiguousName: Set[String] =
        left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
    
      override def validate(tableEnv: TableEnvironment): LogicalNode = {
        val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
        if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
          failValidation(s"Filter operator requires a boolean expression as input, " +
            s"but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}")
        } else if (ambiguousName.nonEmpty) {
          failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
        }
    
        resolvedJoin.condition.foreach(testJoinCondition)
        resolvedJoin
      }
    
      private def testJoinCondition(expression: Expression): Unit = {
    
        def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
          case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
            if x.isFromLeftInput != y.isFromLeftInput => true
          case _ => false
        }
    
        def checkIfFilterCondition(exp: BinaryComparison) = exp.children match {
          case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil => false
          case (x: JoinFieldReference) :: (_) :: Nil => true
          case (_) :: (y: JoinFieldReference) :: Nil => true
          case _ => false
        }
    
        var equiJoinPredicateFound = false
        // Whether the predicate is literal true.
        val alwaysTrue = expression match {
          case x: Literal if x.value.equals(true) => true
          case _ => false
        }
    
        def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
          case x: And => x.children.foreach(validateConditions(_, isAndBranch))
          case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
          case x: EqualTo =>
            if (isAndBranch && checkIfJoinCondition(x)) {
              equiJoinPredicateFound = true
            }
          case x: BinaryComparison =>
          // The boolean literal should be a valid condition type.
          case x: Literal if x.resultType == Types.BOOLEAN =>
          case x => failValidation(
            s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
        }
    
        validateConditions(expression, isAndBranch = true)
    
        // Due to a bug in Apache Calcite (see CALCITE-2004 and FLINK-7865) we cannot accept join
        // predicates except literal true for TableFunction left outer join.
        if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) {
          if (!alwaysTrue) failValidation("TableFunction left outer join predicate can only be " +
            "empty or literal true.")
        } else {
          if (!equiJoinPredicateFound) {
            failValidation(
              s"Invalid join condition: $expression. At least one equi-join predicate is " +
                s"required.")
          }
        }
      }
    }
    
    • Join继承了BinaryNode,它内部将flink的JoinType转为calcite的JoinRelType类型,construct方法通过relBuilder.join来构建join关系

    小结

    • Table支持多种形式的join,其中包括Inner Join、Outer Join、Time-windowed Join、Inner Join with Table Function、Left Outer Join with Table Function、Join with Temporal Table
    • Table定义了join、leftOuterJoin、rightOuterJoin、fullOuterJoin方法,其最后都是调用的私有的join方法,其中JoinType用于表达join类型,分别有INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER这几种;另外接收String类型或者Expression的条件表达式,其中String类型最后是被解析为Expression类型;join方法最后是使用Join创建了新的Table
    • Join继承了BinaryNode,它内部将flink的JoinType转为calcite的JoinRelType类型,construct方法通过relBuilder.join来构建join关系

    doc

    相关文章

      网友评论

          本文标题:聊聊flink Table的Joins

          本文链接:https://www.haomeiwen.com/subject/gjntsqtx.html