美文网首页
聊聊flink的TableFunction

聊聊flink的TableFunction

作者: go4it | 来源:发表于2019-02-08 11:15 被阅读31次

    本文主要研究一下flink的TableFunction

    实例

    // The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
    public class Split extends TableFunction<Tuple2<String, Integer>> {
        private String separator = " ";
        
        public Split(String separator) {
            this.separator = separator;
        }
        
        public void eval(String str) {
            for (String s : str.split(separator)) {
                // use collect(...) to emit a row
                collect(new Tuple2<String, Integer>(s, s.length()));
            }
        }
    }
    
    BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    Table myTable = ...         // table schema: [a: String]
    
    // Register the function.
    tableEnv.registerFunction("split", new Split("#"));
    
    // Use the table function in the Java Table API. "as" specifies the field names of the table.
    myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
        .select("a, word, length");
    myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
        .select("a, word, length");
    
    // Use the table function in SQL with LATERAL and TABLE keywords.
    // CROSS JOIN a table function (equivalent to "join" in Table API).
    tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
    // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
    tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
    
    • 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据

    UserDefinedFunction

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala

    abstract class UserDefinedFunction extends Serializable {
      /**
        * Setup method for user-defined function. It can be used for initialization work.
        *
        * By default, this method does nothing.
        */
      @throws(classOf[Exception])
      def open(context: FunctionContext): Unit = {}
    
      /**
        * Tear-down method for user-defined function. It can be used for clean up work.
        *
        * By default, this method does nothing.
        */
      @throws(classOf[Exception])
      def close(): Unit = {}
    
      /**
        * @return true if and only if a call to this function is guaranteed to always return
        *         the same result given the same parameters; true is assumed by default
        *         if user's function is not pure functional, like random(), date(), now()...
        *         isDeterministic must return false
        */
      def isDeterministic: Boolean = true
    
      final def functionIdentifier: String = {
        val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
        getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
      }
    
      /**
        * Returns the name of the UDF that is used for plan explain and logging.
        */
      override def toString: String = getClass.getSimpleName
    
    }
    
    • UserDefinedFunction定义了open、close、functionIdentifier方法

    TableFunction

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala

    abstract class TableFunction[T] extends UserDefinedFunction {
    
      // ----------------------------------------------------------------------------------------------
    
      /**
        * Emit an output row.
        *
        * @param row the output row
        */
      protected def collect(row: T): Unit = {
        collector.collect(row)
      }
    
      // ----------------------------------------------------------------------------------------------
    
      /**
        * The code generated collector used to emit row.
        */
      private var collector: Collector[T] = _
    
      /**
        * Internal use. Sets the current collector.
        */
      private[flink] final def setCollector(collector: Collector[T]): Unit = {
        this.collector = collector
      }
    
      // ----------------------------------------------------------------------------------------------
    
      /**
        * Returns the result type of the evaluation method with a given signature.
        *
        * This method needs to be overridden in case Flink's type extraction facilities are not
        * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
        * method. Flink's type extraction facilities can handle basic types or
        * simple POJOs but might be wrong for more complex, custom, or composite types.
        *
        * @return [[TypeInformation]] of result type or null if Flink should determine the type
        */
      def getResultType: TypeInformation[T] = null
    
      /**
        * Returns [[TypeInformation]] about the operands of the evaluation method with a given
        * signature.
        *
        * In order to perform operand type inference in SQL (especially when NULL is used) it might be
        * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
        * By default Flink's type extraction facilities are used for this but might be wrong for
        * more complex, custom, or composite types.
        *
        * @param signature signature of the method the operand types need to be determined
        * @return [[TypeInformation]] of operand types
        */
      def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
        signature.map { c =>
          try {
            TypeExtractor.getForClass(c)
          } catch {
            case ite: InvalidTypesException =>
              throw new ValidationException(
                s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " +
                s"automatically determined. Please provide type information manually.")
          }
        }
      }
    
    }
    
    • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法

    ProcessOperator

    flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java

    @Internal
    public class ProcessOperator<IN, OUT>
            extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
            implements OneInputStreamOperator<IN, OUT> {
    
        private static final long serialVersionUID = 1L;
    
        private transient TimestampedCollector<OUT> collector;
    
        private transient ContextImpl context;
    
        /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
        private long currentWatermark = Long.MIN_VALUE;
    
        public ProcessOperator(ProcessFunction<IN, OUT> function) {
            super(function);
    
            chainingStrategy = ChainingStrategy.ALWAYS;
        }
    
        @Override
        public void open() throws Exception {
            super.open();
            collector = new TimestampedCollector<>(output);
    
            context = new ContextImpl(userFunction, getProcessingTimeService());
        }
    
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            collector.setTimestamp(element);
            context.element = element;
            userFunction.processElement(element.getValue(), context, collector);
            context.element = null;
        }
    
        @Override
        public void processWatermark(Watermark mark) throws Exception {
            super.processWatermark(mark);
            this.currentWatermark = mark.getTimestamp();
        }
    
        //......
    }
    
    • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner

    CRowCorrelateProcessRunner

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala

    class CRowCorrelateProcessRunner(
        processName: String,
        processCode: String,
        collectorName: String,
        collectorCode: String,
        @transient var returnType: TypeInformation[CRow])
      extends ProcessFunction[CRow, CRow]
      with ResultTypeQueryable[CRow]
      with Compiler[Any]
      with Logging {
    
      private var function: ProcessFunction[Row, Row] = _
      private var collector: TableFunctionCollector[_] = _
      private var cRowWrapper: CRowWrappingCollector = _
    
      override def open(parameters: Configuration): Unit = {
        LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
        val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
        LOG.debug("Instantiating TableFunctionCollector.")
        collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
        this.cRowWrapper = new CRowWrappingCollector()
    
        LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
        val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
        val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
        LOG.debug("Instantiating ProcessFunction.")
        function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
        FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
        FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
        FunctionUtils.openFunction(collector, parameters)
        FunctionUtils.openFunction(function, parameters)
      }
    
      override def processElement(
          in: CRow,
          ctx: ProcessFunction[CRow, CRow]#Context,
          out: Collector[CRow])
        : Unit = {
    
        cRowWrapper.out = out
        cRowWrapper.setChange(in.change)
    
        collector.setCollector(cRowWrapper)
        collector.setInput(in.row)
        collector.reset()
    
        function.processElement(
          in.row,
          ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
          cRowWrapper)
      }
    
      override def getProducedType: TypeInformation[CRow] = returnType
    
      override def close(): Unit = {
        FunctionUtils.closeFunction(collector)
        FunctionUtils.closeFunction(function)
      }
    }
    
    • CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

    小结

    • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
    • 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
    • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的TableFunction

          本文链接:https://www.haomeiwen.com/subject/efwnsqtx.html