Phoenix自定义函数UDF异常解决方案

作者: 大猪大猪 | 来源:发表于2018-08-21 23:27 被阅读1次

    Phoenix客户端使用自定义函数UDF时候是正常的,但是在本地测试的时候报Function类找不到的异常。

    异常演示

    例子

    @Test
        public void testSql2() throws SQLException {
            String sql = "select CRC32(\"userId\") from TEST_LOG";
            Configuration conf = new Configuration();
            conf.addResource("hbase-site.xml");//BUG修改处
            conf.addResource("hdfs-site.xml");
            PhoenixDriver phoenixDriver = PhoenixDriver.getInstaceWithProperties(conf,PhoenixDriver.loadProperties());
            ResultSet rs = phoenixDriver.query(sql);
            int columns=rs.getMetaData().getColumnCount();
            while(rs.next()) {
                for(int i=1;i<=columns;i++) {
                    System.out.print(rs.getString(i));
                    System.out.print("\t\t");
                }
                System.out.println();
            }
        }
    

    CRC32Function.java

    @BuiltInFunction(name = CRC32Function.NAME, args = {@Argument()})
    public class CRC32Function extends ScalarFunction {
        public static final String NAME = "CRC32";
        public static final Integer LENGTH = 19;
    
    
        public CRC32Function() throws SQLException {
        }
    
        public CRC32Function(List<Expression> children) throws SQLException {
            super(children);
        }
    
        public static void main(String[] args) {
            CRC32 crc32 = new CRC32();
            crc32.update("lake".getBytes());
            System.out.println(crc32.getValue());
        }
    
        @Override
        public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
            if (!getChildExpression().evaluate(tuple, ptr)) {
                return false;
            }
            if (ptr.getLength() == 0) {
                return true;
            }
            CRC32 crc32 = new CRC32();
            crc32.update(ptr.get(),ptr.getOffset(), ptr.getLength());
            ptr.set(Bytes.toBytes(String.valueOf(crc32.getValue())));
            return true;
        }
    
        @Override
        public PDataType getDataType() {
            return PVarchar.INSTANCE;
        }
    
        @Override
        public Integer getMaxLength() {
            return LENGTH;
        }
    
        @Override
        public boolean isNullable() {
            return getChildExpression().isNullable();
        }
    
        @Override
        public String getName() {
            return NAME;
        }
    
        private Expression getChildExpression() {
            return children.get(0);
        }
    }
    

    导入phoenix-core 编译打包上传至HDFS中

    hadoop fs -put phoenix-udfs-1.0-SNAPSHOT.jar /hbase/lib/
    

    Phoenix命令中创建函数

    create function CRC32(varchar) returns varchar as 'com.dounine.phoenixudfs.CRC32Function' using jar 'hdfs:///hbase/lib/phoenix-udfs-1.0-SNAPSHOT.jar'
    

    我们在Phoenix客户端查询Function表是有数据的

    jdbc:phoenix:host1.demo.com:2181> select * from SYSTEM."FUNCTION";
    +------------+----------------+-----------+------------------------------------------+--------------------------------------------------+--------+
    | TENANT_ID  | FUNCTION_NAME  | NUM_ARGS  |                CLASS_NAME                |                     JAR_PATH                     | RETURN |
    +------------+----------------+-----------+------------------------------------------+--------------------------------------------------+--------+
    |            | CRC32          | 1         | com.dounine.phoenixudfs.CRC32Function  | hdfs:///hbase/lib/phoenix-udfs-1.0-SNAPSHOT.jar  | varcha |
    |            | CRC32          | null      |                                          |                                                  |        |
    +------------+----------------+-----------+------------------------------------------+--------------------------------------------------+--------+
    4 rows selected (0.068 seconds)
    

    jar包查看

    jdbc:phoenix:storm2.starsriver.cn:2181> list jars;
    +---------------------------------------------------------------------------+
    |                               jar_location                                |
    +---------------------------------------------------------------------------+
    | hdfs://host5.demo.com:8020/hbase/lib/phoenix-udfs-1.0-SNAPSHOT.jar  |
    +---------------------------------------------------------------------------+
    1 row selected (0.645 seconds)
    

    程序异常如下

    java.sql.SQLException: java.lang.reflect.InvocationTargetException
    
        at org.apache.phoenix.parse.FunctionParseNode.create(FunctionParseNode.java:280)
        at org.apache.phoenix.compile.ExpressionCompiler.visitLeave(ExpressionCompiler.java:336)
        at org.apache.phoenix.compile.ProjectionCompiler$SelectClauseVisitor.visitLeave(ProjectionCompiler.java:700)
        at org.apache.phoenix.compile.ProjectionCompiler$SelectClauseVisitor.visitLeave(ProjectionCompiler.java:585)
        at org.apache.phoenix.parse.FunctionParseNode.accept(FunctionParseNode.java:86)
        at org.apache.phoenix.compile.ProjectionCompiler.compile(ProjectionCompiler.java:412)
        at org.apache.phoenix.compile.QueryCompiler.compileSingleFlatQuery(QueryCompiler.java:561)
        at org.apache.phoenix.compile.QueryCompiler.compileSingleQuery(QueryCompiler.java:507)
        at org.apache.phoenix.compile.QueryCompiler.compileSelect(QueryCompiler.java:193)
        at org.apache.phoenix.compile.QueryCompiler.compile(QueryCompiler.java:153)
        at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableSelectStatement.compilePlan(PhoenixStatement.java:490)
        at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableSelectStatement.compilePlan(PhoenixStatement.java:456)
        at org.apache.phoenix.jdbc.PhoenixStatement$1.call(PhoenixStatement.java:302)
        at org.apache.phoenix.jdbc.PhoenixStatement$1.call(PhoenixStatement.java:291)
        at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)
        at org.apache.phoenix.jdbc.PhoenixStatement.executeQuery(PhoenixStatement.java:290)
        at org.apache.phoenix.jdbc.PhoenixStatement.executeQuery(PhoenixStatement.java:283)
        at org.apache.phoenix.jdbc.PhoenixStatement.executeQuery(PhoenixStatement.java:1793)
        at cn.starsriver.flink.phoenix.PhoenixDriver.query(PhoenixDriver.java:96)
        at com.dounine.test.PhoenixTest1.testSql2(PhoenixTest1.java:70)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.phoenix.parse.FunctionParseNode.create(FunctionParseNode.java:268)
        ... 41 more
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.dounine.phoenixudfs.CRC32Function
        at org.apache.phoenix.expression.function.UDFExpression.constructUDFFunction(UDFExpression.java:170)
        at org.apache.phoenix.expression.function.UDFExpression.<init>(UDFExpression.java:72)
        ... 46 more
    Caused by: java.lang.ClassNotFoundException: cn.starsriver.phoenixudfs.CRC32Function
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at org.apache.hadoop.hbase.util.DynamicClassLoader.tryRefreshClass(DynamicClassLoader.java:173)
        at org.apache.hadoop.hbase.util.DynamicClassLoader.loadClass(DynamicClassLoader.java:140)
        at org.apache.phoenix.expression.function.UDFExpression.constructUDFFunction(UDFExpression.java:164)
        ... 47 more
    

    一切看起来都是正常的,那怎么会出现这种错误呢?
    原因就出在配置上,默认动态加载的jar包会复制一份到hbase.local.dir目录下,动态加载的jar也是默认从这个配置的目录中加载的,所以只要把这个目录配置正确即可。

    解决方案

    修改程序的hbase-site.xml中的hbase.local.dirUDFjar包所在目录即可

    <property>
          <name>hbase.local.dir</name>
          <value>/tmp/hbase-hbase/local</value>
        </property>
    

    目录下面有编译好的UDF

    [root@dounine bin]# ls /tmp/hbase-hbase/local/jars/
    phoenix-udfs-1.0-SNAPSHOT.jar
    

    相关文章

      网友评论

        本文标题:Phoenix自定义函数UDF异常解决方案

        本文链接:https://www.haomeiwen.com/subject/bfbsiftx.html