序
本文主要研究一下flink的TableFunction
实例
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
private String separator = " ";
public Split(String separator) {
this.separator = separator;
}
public void eval(String str) {
for (String s : str.split(separator)) {
// use collect(...) to emit a row
collect(new Tuple2<String, Integer>(s, s.length()));
}
}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ... // table schema: [a: String]
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
.select("a, word, length");
myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
.select("a, word, length");
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
- 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据
UserDefinedFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala
abstract class UserDefinedFunction extends Serializable {
/**
* Setup method for user-defined function. It can be used for initialization work.
*
* By default, this method does nothing.
*/
@throws(classOf[Exception])
def open(context: FunctionContext): Unit = {}
/**
* Tear-down method for user-defined function. It can be used for clean up work.
*
* By default, this method does nothing.
*/
@throws(classOf[Exception])
def close(): Unit = {}
/**
* @return true if and only if a call to this function is guaranteed to always return
* the same result given the same parameters; true is assumed by default
* if user's function is not pure functional, like random(), date(), now()...
* isDeterministic must return false
*/
def isDeterministic: Boolean = true
final def functionIdentifier: String = {
val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
}
/**
* Returns the name of the UDF that is used for plan explain and logging.
*/
override def toString: String = getClass.getSimpleName
}
- UserDefinedFunction定义了open、close、functionIdentifier方法
TableFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala
abstract class TableFunction[T] extends UserDefinedFunction {
// ----------------------------------------------------------------------------------------------
/**
* Emit an output row.
*
* @param row the output row
*/
protected def collect(row: T): Unit = {
collector.collect(row)
}
// ----------------------------------------------------------------------------------------------
/**
* The code generated collector used to emit row.
*/
private var collector: Collector[T] = _
/**
* Internal use. Sets the current collector.
*/
private[flink] final def setCollector(collector: Collector[T]): Unit = {
this.collector = collector
}
// ----------------------------------------------------------------------------------------------
/**
* Returns the result type of the evaluation method with a given signature.
*
* This method needs to be overridden in case Flink's type extraction facilities are not
* sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
* method. Flink's type extraction facilities can handle basic types or
* simple POJOs but might be wrong for more complex, custom, or composite types.
*
* @return [[TypeInformation]] of result type or null if Flink should determine the type
*/
def getResultType: TypeInformation[T] = null
/**
* Returns [[TypeInformation]] about the operands of the evaluation method with a given
* signature.
*
* In order to perform operand type inference in SQL (especially when NULL is used) it might be
* necessary to determine the parameter [[TypeInformation]] of an evaluation method.
* By default Flink's type extraction facilities are used for this but might be wrong for
* more complex, custom, or composite types.
*
* @param signature signature of the method the operand types need to be determined
* @return [[TypeInformation]] of operand types
*/
def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
signature.map { c =>
try {
TypeExtractor.getForClass(c)
} catch {
case ite: InvalidTypesException =>
throw new ValidationException(
s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " +
s"automatically determined. Please provide type information manually.")
}
}
}
}
- TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法
ProcessOperator
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java
@Internal
public class ProcessOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
private transient TimestampedCollector<OUT> collector;
private transient ContextImpl context;
/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
private long currentWatermark = Long.MIN_VALUE;
public ProcessOperator(ProcessFunction<IN, OUT> function) {
super(function);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
context = new ContextImpl(userFunction, getProcessingTimeService());
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement(element.getValue(), context, collector);
context.element = null;
}
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.currentWatermark = mark.getTimestamp();
}
//......
}
- ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner
CRowCorrelateProcessRunner
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
class CRowCorrelateProcessRunner(
processName: String,
processCode: String,
collectorName: String,
collectorCode: String,
@transient var returnType: TypeInformation[CRow])
extends ProcessFunction[CRow, CRow]
with ResultTypeQueryable[CRow]
with Compiler[Any]
with Logging {
private var function: ProcessFunction[Row, Row] = _
private var collector: TableFunctionCollector[_] = _
private var cRowWrapper: CRowWrappingCollector = _
override def open(parameters: Configuration): Unit = {
LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
LOG.debug("Instantiating TableFunctionCollector.")
collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
this.cRowWrapper = new CRowWrappingCollector()
LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
LOG.debug("Instantiating ProcessFunction.")
function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
FunctionUtils.openFunction(collector, parameters)
FunctionUtils.openFunction(function, parameters)
}
override def processElement(
in: CRow,
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow])
: Unit = {
cRowWrapper.out = out
cRowWrapper.setChange(in.change)
collector.setCollector(cRowWrapper)
collector.setInput(in.row)
collector.reset()
function.processElement(
in.row,
ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
cRowWrapper)
}
override def getProducedType: TypeInformation[CRow] = returnType
override def close(): Unit = {
FunctionUtils.closeFunction(collector)
FunctionUtils.closeFunction(function)
}
}
- CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法
小结
- TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
- 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
- ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法
网友评论