美文网首页
聊聊flink Table的ScalarFunction

聊聊flink Table的ScalarFunction

作者: go4it | 来源:发表于2019-02-10 08:22 被阅读18次

    本文主要研究一下flink Table的ScalarFunction

    实例

    public class HashCode extends ScalarFunction {
    
        private int factor = 0;
    
        @Override
        public void open(FunctionContext context) throws Exception {
            // access "hashcode_factor" parameter
            // "12" would be the default value if parameter does not exist
            factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
        }
    
        public int eval(String s) {
            return s.hashCode() * factor;
        }
    }
    
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
    // set job parameter
    Configuration conf = new Configuration();
    conf.setString("hashcode_factor", "31");
    env.getConfig().setGlobalJobParameters(conf);
    
    // register the function
    tableEnv.registerFunction("hashCode", new HashCode());
    
    // use the function in Java Table API
    myTable.select("string, string.hashCode(), hashCode(string)");
    
    // use the function in SQL
    tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
    
    • HashCode继承了ScalarFunction,它定义了eval方法

    ScalarFunction

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala

    abstract class ScalarFunction extends UserDefinedFunction {
    
      /**
        * Creates a call to a [[ScalarFunction]] in Scala Table API.
        *
        * @param params actual parameters of function
        * @return [[Expression]] in form of a [[ScalarFunctionCall]]
        */
      final def apply(params: Expression*): Expression = {
        ScalarFunctionCall(this, params)
      }
    
      // ----------------------------------------------------------------------------------------------
    
      /**
        * Returns the result type of the evaluation method with a given signature.
        *
        * This method needs to be overridden in case Flink's type extraction facilities are not
        * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
        * method. Flink's type extraction facilities can handle basic types or
        * simple POJOs but might be wrong for more complex, custom, or composite types.
        *
        * @param signature signature of the method the return type needs to be determined
        * @return [[TypeInformation]] of result type or null if Flink should determine the type
        */
      def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
    
      /**
        * Returns [[TypeInformation]] about the operands of the evaluation method with a given
        * signature.
        *
        * In order to perform operand type inference in SQL (especially when NULL is used) it might be
        * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
        * By default Flink's type extraction facilities are used for this but might be wrong for
        * more complex, custom, or composite types.
        *
        * @param signature signature of the method the operand types need to be determined
        * @return [[TypeInformation]] of operand types
        */
      def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
        signature.map { c =>
          try {
            TypeExtractor.getForClass(c)
          } catch {
            case ite: InvalidTypesException =>
              throw new ValidationException(
                s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
                s"automatically determined. Please provide type information manually.")
          }
        }
      }
    }
    
    • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP

    CRowProcessRunner

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala

    class CRowProcessRunner(
        name: String,
        code: String,
        @transient var returnType: TypeInformation[CRow])
      extends ProcessFunction[CRow, CRow]
      with ResultTypeQueryable[CRow]
      with Compiler[ProcessFunction[Row, Row]]
      with Logging {
    
      private var function: ProcessFunction[Row, Row] = _
      private var cRowWrapper: CRowWrappingCollector = _
    
      override def open(parameters: Configuration): Unit = {
        LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
        val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
        LOG.debug("Instantiating ProcessFunction.")
        function = clazz.newInstance()
        FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
        FunctionUtils.openFunction(function, parameters)
    
        this.cRowWrapper = new CRowWrappingCollector()
      }
    
      override def processElement(
          in: CRow,
          ctx: ProcessFunction[CRow, CRow]#Context,
          out: Collector[CRow])
        : Unit = {
    
        cRowWrapper.out = out
        cRowWrapper.setChange(in.change)
        function.processElement(
          in.row,
          ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
          cRowWrapper)
      }
    
      override def getProducedType: TypeInformation[CRow] = returnType
    
      override def close(): Unit = {
        FunctionUtils.closeFunction(function)
      }
    }
    
    • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成

    ProcessFunction

    flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java

    @PublicEvolving
    public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
    
        private static final long serialVersionUID = 1L;
    
        /**
         * Process one element from the input stream.
         *
         * <p>This function can output zero or more elements using the {@link Collector} parameter
         * and also update internal state or set timers using the {@link Context} parameter.
         *
         * @param value The input value.
         * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
         *            a {@link TimerService} for registering timers and querying the time. The
         *            context is only valid during the invocation of this method, do not store it.
         * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    
        /**
         * Called when a timer set using {@link TimerService} fires.
         *
         * @param timestamp The timestamp of the firing timer.
         * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
         *            querying the {@link TimeDomain} of the firing timer and getting a
         *            {@link TimerService} for registering timers and querying the time.
         *            The context is only valid during the invocation of this method, do not store it.
         * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    
        /**
         * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
         * or {@link #onTimer(long, OnTimerContext, Collector)}.
         */
        public abstract class Context {
    
            /**
             * Timestamp of the element currently being processed or timestamp of a firing timer.
             *
             * <p>This might be {@code null}, for example if the time characteristic of your program
             * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
             */
            public abstract Long timestamp();
    
            /**
             * A {@link TimerService} for querying time and registering timers.
             */
            public abstract TimerService timerService();
    
            /**
             * Emits a record to the side output identified by the {@link OutputTag}.
             *
             * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
             * @param value The record to emit.
             */
            public abstract <X> void output(OutputTag<X> outputTag, X value);
        }
    
        /**
         * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
         */
        public abstract class OnTimerContext extends Context {
            /**
             * The {@link TimeDomain} of the firing timer.
             */
            public abstract TimeDomain timeDomain();
        }
    
    }
    
    • ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement

    DataStreamCalc

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala

    class DataStreamCalc(
        cluster: RelOptCluster,
        traitSet: RelTraitSet,
        input: RelNode,
        inputSchema: RowSchema,
        schema: RowSchema,
        calcProgram: RexProgram,
        ruleDescription: String)
      extends Calc(cluster, traitSet, input, calcProgram)
      with CommonCalc
      with DataStreamRel {
    
      //......
    
      override def translateToPlan(
          tableEnv: StreamTableEnvironment,
          queryConfig: StreamQueryConfig): DataStream[CRow] = {
    
        val config = tableEnv.getConfig
    
        val inputDataStream =
          getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
    
        // materialize time attributes in condition
        val condition = if (calcProgram.getCondition != null) {
          val materializedCondition = RelTimeIndicatorConverter.convertExpression(
            calcProgram.expandLocalRef(calcProgram.getCondition),
            inputSchema.relDataType,
            cluster.getRexBuilder)
          Some(materializedCondition)
        } else {
          None
        }
    
        // filter out time attributes
        val projection = calcProgram.getProjectList.asScala
          .map(calcProgram.expandLocalRef)
    
        val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)
    
        val genFunction = generateFunction(
          generator,
          ruleDescription,
          inputSchema,
          schema,
          projection,
          condition,
          config,
          classOf[ProcessFunction[CRow, CRow]])
    
        val inputParallelism = inputDataStream.getParallelism
    
        val processFunc = new CRowProcessRunner(
          genFunction.name,
          genFunction.code,
          CRowTypeInfo(schema.typeInfo))
    
        inputDataStream
          .process(processFunc)
          .name(calcOpName(calcProgram, getExpressionString))
          // keep parallelism to ensure order of accumulate and retract messages
          .setParallelism(inputParallelism)
      }
    }
    
    • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

    小结

    • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
    • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成;ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
    • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

    doc

    相关文章

      网友评论

          本文标题:聊聊flink Table的ScalarFunction

          本文链接:https://www.haomeiwen.com/subject/uksteqtx.html