很多同学在使用Calcite的过程中需要自定义函数, 现在讲讲如何定自义函数
1. Calcite 内置函数和对应的流程
Calcite中内置的函数主要在SqlStdOperatorTable
中, 包括常见的算术运算符、时间函数等。现在就以一个列子来说明在SqlStdOperatorTable
中添加函数以达到注册函数的功能
- 在
SqlStdOperatorTable.java
中添加对应函数
public static final SqlFunction TIMESTR2LONG = new SqlFunction(
new SqlIdentifier("TIMESTR2LONG", SqlParserPos.ZERO),
//返回值为Long, 可以为NULL
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.BIGINT), SqlTypeTransforms.TO_NULLABLE),
//输入类型推测为Varchar, 即java中的String
InferTypes.VARCHAR_1024,
//类型检查,如果类型不是String, 报错
OperandTypes.family(SqlTypeFamily.STRING),
Lists.newArrayList(new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT).createSqlType(SqlTypeName.VARCHAR)),
SqlFunctionCategory.USER_DEFINED_FUNCTION);
- 在自已的代码中将Parser的OperatorTable 设置成上述 SqlStdOperatorTable
public class FunctionTestOne {
public static final SqlTypeFactoryImpl TYPE_FACTORY = new SqlTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
public static final RelDataTypeSystem TYPE_SYSTEM = RelDataTypeSystem.DEFAULT;
public static void main(String[] args) {
CalciteSchema rootSchema = CalciteSchema
.createRootSchema(false, false);
//添加表Test
rootSchema.add("test", new AbstractTable() {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataTypeFactory.Builder builder = new RelDataTypeFactory
.Builder(TYPE_FACTORY);
//列id, 类型int
builder.add("id", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.INTEGER));
//列name, 类型为varchar
builder.add("name", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
builder.add("time_str", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
return builder.build();
}
});
SqlParser.ConfigBuilder builder = SqlParser.configBuilder();
//以下需要设置成大写并且忽略大小写
builder.setQuotedCasing(Casing.TO_UPPER);
builder.setUnquotedCasing(Casing.TO_UPPER);
builder.setCaseSensitive(false);
final FrameworkConfig config = Frameworks.newConfigBuilder()
.defaultSchema(rootSchema.plus())
.parserConfig(builder.build())
//注意用你自已的SqlStdOperatorTable, 此处之所以同名
//目的是覆盖calcite中SqlStdOperatorTable
.operatorTable(SqlStdOperatorTable.instance())
.build();
Planner planner = Frameworks.getPlanner(config);
//now start to parser
try {
SqlNode originSqlNode = planner.parse("select name, timestr2long(time_str) from test where id < 5");
SqlNode sqlNode = planner.validate(originSqlNode);
RelRoot root = planner.rel(sqlNode);
System.out.println(RelOptUtil.toString(root.rel, ALL_ATTRIBUTES));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
稍微解释一下上述代码中主要步骤
-
创建rootSchema, 你可以把其理解为Linux中root目录, 当往rootSchema添加table是相当于添加一个普通文件,当往rootSchema添加schema时相当于添加一个目录, 相应的rootSchema就是整个数据库的根,可以往根数据库中添加函数、数据库、表等,添加的子数据库又可以把函数、数据库、表放置其中,不断递归
-
然后往rootSchema添加表后,然后设置Parser和OperatorTable 后生成Planner, 至于中间是怎么用的,后面会详细说明
-
最后进行Parser/Validate并转化成RelNode
现在简要说明一下函数注册主要过程
Parser阶段,在Parser.jj文件中 有以下内容:
LOOKAHEAD( [<SPECIFIC>] FunctionName() <LPAREN>)
e = NamedFunctionCall()
// NamedFunctionCall的主要内容如下
...
SqlNode NamedFunctionCall() :
qualifiedName = FunctionName()
createCall(qualifiedName, s.end(this), funcType, quantifier, args);
...
FunctionName() 主要是获取函数名,如例中的函数名 timestr2long
或者是Calcite内置的一些固定的函数名,如ABS, AVG等
createCall()是生成SqlCall(函数的主要形式), 而createCall()最终会调用
protected SqlCall createCall(
SqlIdentifier funName,
SqlParserPos pos,
SqlFunctionCategory funcType,
SqlLiteral functionQualifier,
SqlNode[] operands) {
SqlOperator fun = null;
// First, try a half-hearted resolution as a builtin function.
// If we find one, use it; this will guarantee that we
// preserve the correct syntax (i.e. don't quote builtin function
/// name when regenerating SQL).
if (funName.isSimple()) {
final List<SqlOperator> list = new ArrayList<>();
//这里opTab的值为SqlStdOperatorTable.instance();
opTab.lookupOperatorOverloads(funName, funcType, SqlSyntax.FUNCTION, list);
if (list.size() == 1) {
fun = list.get(0);
}
}
// Otherwise, just create a placeholder function. Later, during
// validation, it will be resolved into a real function reference.
if (fun == null) {
fun = new SqlUnresolvedFunction(funName, null, null, null, null,
funcType);
}
return fun.createCall(functionQualifier, pos, operands);
}
经过以上步骤可以发现注册过程流程为:
- 在SqlStdOperatorTable按照格式注册自已的函数名
- 在Parser过程中,会根据函数名自动在SqlStdOperatorTable查找对应函数
- 如果没有找到,会自动将函数解析成
SqlUnresolvedFunction
那问题来了,SqlUnresolvedFunction
什么时候会解析成具体的函数?答案是: Validate阶段, 在Validate阶段,Calcite 会针对函数做以下工作
- 根据函数名在所有函数注册表中查找函数(之所以说是所有函数注册表是因为Calcite还可以在其它地方注册函数, 后面的
ChainedSqlOperatorTable
,ListSqlOperatorTable
等)查找函数 - 验证函数的入参:包话参数类型、个数、函数类型等
- 递归验证每个入参是否合法,具体过程后续会详细说明
其它方式注册函数
上述注册函数略微tricky, 需要要本地搭建一个org.apache.calcite.sql.fun
包并且从calcite-core中复制SqlStdOperatorTable.java
的内容以覆盖内置的SqlStdOperatorTable,最后添加自已的函数,通过这种方式实现并不那么优雅,其实还有其它相对优雅的方式进行函数注册,主要有以下两种方式
1. 修改Parser.jj 注册
假如我要实现以下两个方法
public static Integer func1(String s) {
return s == null ? null : Integer.valueOf(s);
}
public static String func2(Integer i) {
return i == null ? null : String.valueOf(i);
}
要在Calcite中注册以上两个函数,只需要在parser.jj中添加以下内容, 见代码
/*
* Create User-defined function
*/
SqlNode extraFunction() :
{
SqlNode node;
SqlNode e;
List<SqlNode> args = null;
final Span s;
}
{ (
(
<FUNC1> { s = span(); }
<LPAREN>
e = Expression(ExprContext.ACCEPT_NON_QUERY)
{
startList(e);
}
<LPAREN>
{
node = FunctionUtil.FUNC1.createCall(s.end(this), args);
}
)
|
(
<FUNC2> { s = span(); }
<LPAREN>
e = Expression(ExprContext.ACCEPT_NON_QUERY)
{
startList(e);
}
<LPAREN>
{
node = FunctionUtil.FUNC2.createCall(s.end(this), args);
}
)
)
{
return node;
}
}
<DEFAULT, DQID, BTID> TOKEN :
...
| < FUNCTION: "FUNCTION" >
| < FUNC1: "FUNC1"> //添加的内容
| < FUNC2: "FUNC2"> //添加的内容
...
FunctionUtil 内容如下, 见代码
public class FunctionUtil {
public static final SqlFunction FUNC1 = new SqlFunction(
new SqlIdentifier("FUNC1", SqlParserPos.ZERO),
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.INTEGER), SqlTypeTransforms.TO_NULLABLE),
InferTypes.VARCHAR_1024,
OperandTypes.family(SqlTypeFamily.STRING),
Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR)),
SqlFunctionCategory.USER_DEFINED_FUNCTION);
public static final SqlFunction FUNC2 = new SqlFunction(
new SqlIdentifier("FUNC2", SqlParserPos.ZERO),
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
InferTypes.FIRST_KNOWN,
OperandTypes.family(SqlTypeFamily.INTEGER),
Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER)),
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
最后的测试代码如下,见代码
//now test func
//将创建的函数放入SqlOperatorTable()中
ListSqlOperatorTable listSqlOperatorTable = new ListSqlOperatorTable();
listSqlOperatorTable.add(FUNC1);
listSqlOperatorTable.add(FUNC2);
final FrameworkConfig funcConfig = Frameworks.newConfigBuilder()
.defaultSchema(rootSchema.plus())
.parserConfig(builder.build())
//添加一个专们用于添加函数的 listSqlOperatorTable
.operatorTable(ChainedSqlOperatorTable.of(listSqlOperatorTable,
SqlStdOperatorTable.instance()))
.build();
Planner planner2 = Frameworks.getPlanner(funcConfig);
SqlNode func1SqlNodeOrg = planner2.parse("select func1(name) from test where id > 4");
SqlNode func1SqlNode = planner2.validate(func1SqlNodeOrg);
RelRoot func1Root = planner2.rel(func1SqlNode);
System.out.println("-------- func1 test -------");
System.out.println(RelOptUtil.toString(func1Root.rel, ALL_ATTRIBUTES));
Planner planner3 = Frameworks.getPlanner(funcConfig);
SqlNode func2SqlNodeOrg = planner3.parse("select func2(id) from test where id > 4");
SqlNode func2SqlNode = planner3.validate(func2SqlNodeOrg);
RelRoot func2Root = planner3.rel(func2SqlNode);
System.out.println("-------- func2 test -------");
System.out.println(RelOptUtil.toString(func2Root.rel, ALL_ATTRIBUTES));
2. Schema 注册
细心的读者可以发现,以上两种方式都要对calcite做侵入式修改。内置方法需要覆盖SqlStdOpeartorTable, 而修改Parser.jj则需要自已书写Parser逻辑,并在Validate阶段注册函数。考虑到一下场景,需要类似于Hive 注册UDF那样,动态的注册函数,上述两种方式是无法实现的,那如何实现动态注册函数呢? 用schema 注册, 下面以一个例子说明, 代码在这
//主要代码
public static RelRoot sqlToRelNode(String sql) {
try {
SchemaPlus plus = ROOT_SCHEMA.plus();
plus.add("FUNC1", ScalarFunctionImpl.create(
FunctionUtil.class, "func1"));
plus.add("FUNC2", ScalarFunctionImpl.create(
FunctionUtil.class, "func2"));
SqlParser parser = SqlParser.create(sql, config.getParserConfig());
SqlNode sqlNode = parser.parseStmt();
//这里需要注意大小写问题,否则表会无法找到
Properties properties = new Properties();
properties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
String.valueOf(config.getParserConfig().caseSensitive()));
CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader(
CalciteSchema.from(rootSchema(plus)),
CalciteSchema.from(config.getDefaultSchema()).path(null),
TYPE_FACTORY,
new CalciteConnectionConfigImpl(properties));
//to supported user' define function
SqlOperatorTable sqlOperatorTable = ChainedSqlOperatorTable
.of(config.getOperatorTable(), calciteCatalogReader);
TestSqlValidatorImpl validator = new TestSqlValidatorImpl(
sqlOperatorTable,
calciteCatalogReader,
TYPE_FACTORY,
SqlConformanceEnum.DEFAULT);
//try to union trait set
//addRelTraitDef for is HepPlanner has not effect in fact
VolcanoPlanner volcanoPlanner = new VolcanoPlanner();
SqlNode validateSqlNode = validator.validate(sqlNode);
final RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
RelOptCluster cluster = RelOptCluster.create(volcanoPlanner, rexBuilder);
final SqlToRelConverter.Config sqlToRelConverterConfig
= SqlToRelConverter.configBuilder()
.withConfig(config.getSqlToRelConverterConfig())
.withTrimUnusedFields(false)
.withConvertTableAccess(false)
.build();
final SqlToRelConverter sqlToRelConverter =
new SqlToRelConverter(null, validator,
calciteCatalogReader, cluster, config.getConvertletTable(),
sqlToRelConverterConfig);
RelRoot root =
sqlToRelConverter.convertQuery(validateSqlNode, false, true);
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
final RelBuilder relBuilder = sqlToRelConverterConfig
.getRelBuilderFactory().create(cluster, null);
//change trait set of TableScan
return root.withRel(
RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
Schema 注册相对于前两者来说过程复杂一点,相好处是可以定制化, 主要有以下特点
- 随时注册 随时使用
- 引入了schema空间,同一个function可以注册到不同的schema并可以隔离
采用Schema 注册的主要步骤在CalciteCatalogReader
读取schema, 限于篇幅,我会专门用一个篇文章详细说明CalciteCatalogReader
3. 总结
以上为三种方式在Calcite注册函数,那么这三种有什么区别
- 从灵活度来说,前两种不如通过schema注册,而且通过schema注册也不需要去修改calcite核心代码,适合于初学者使用。更重要的是通过schema注册可以实现函数隔离,可以实现不同数据库级别的函数之持。
- 从实现容易度来说, 第一种更为简单,只需要简单修改一个内置的表即可
那么对于一个项目来说 采用哪里方式更好?我的建议是
- 如果需要动态注册或者schema级别函数隔离,建议采用第三种
- 如果函数相对固定而且函数数量较多,建议采用第一种
- 如果需要更加细致定制函数且函数数量不多,可以采用第二种
网友评论