美文网首页Flink实战
Flink实战之自定义flink sql connector

Flink实战之自定义flink sql connector

作者: 〇白衣卿相〇 | 来源:发表于2020-07-12 00:23 被阅读0次

    背景

    最近工作中需要自定义开发一些flink sql的connector,因为官方提供的connector毕竟有限,在我们工作中可能会用到各种各样的中间件。所以官方没有提供的就需要我们自定义开发。
    就是如:
    CREATE TABLE XXX(
    A STRING,
    B BIGINT)
    WITH(
    'connect.type' = 'kafka',
    ...
    )
    所以开发一个自己的connector需要做哪些,本文就来总结一下开发的主要步骤,以及我遇到的问题怎么解决的。

    开发

    1. 自定义Factory,根据需要实现StreamTableSourceFactory和StreamTableSinkFactory

    2. 根据需要继承ConnectorDescriptorValidator,定义自己的connector参数(with 后面跟的那些)

    3. Factory中的requiredContext、supportedProperties都比较重要,框架中对Factory的过滤和检查需要他们

    4. 需要自定义个TableSink,根据你需要连接的中间件选择是AppendStreamTableSink、Upsert、Retract

    重写consumeDataStream方法

    1. 自定义一个SinkFunction,在invoke方法中实现将数据写入到外部中间件。

    以上5步基本上可以写一个简单的sql-connector了

    问题

    1. 异常信息:

      org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
      the classpath.
      
      Reason: Required context properties mismatch.
      
      The following properties are requested:
      connector.address=localhost:9091
      connector.job=testJob
      connector.metrics=testMetrics
      connector.type=xxxx
      schema.0.data-type=ROW<`val` DOUBLE>
      schema.0.name=value
      

      有两个问题都导致上面的报错

      1. discoverFactory时找不到我自定义的Factory

      解决方法:

      添加如下目录及文件

    在这里插入图片描述
    1. 根据properties过滤时过滤掉了我的Factory,代码在TableFactoryService#filterBySupportedProperties

    ​ 解决方法:在自定义Factory#supportedProperties方法中添加schema的配置

         // schema
         properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
         properties.add(SCHEMA + ".#." + SCHEMA_NAME);
    

    2.序列化问题

    异常信息:

    xxx类不能被序列化

    原因:开始在我的sinkFunction中有个TableSchema属性,该属性不能被序列化,TableSchema我是用来获取字段信息的,后来直接改成了fieldName数组,从TableSchema.getFieldNames()获取。

    改完后又报了我使用的Util类不能序列化,我把util实现了Serializable接口解决

    3.sink类型不匹配问题

    异常如下:

    org.apache.flink.table.api.TableException: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, org.apache.flink.connector.prometheus.xxxTableSink doesn't implement this method.
    
     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:142)
     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
     at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
     at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
     at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
     at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
     at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
     at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
     at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
     at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
     at PrometheusConnectorTest.test(PrometheusConnectorTest.java:13)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
     at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
     at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
     at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
     at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
    

    但是我xxxTableSink实现的是AppendStreamTableSink,AppendStreamTableSink继承了StreamTableSink

    解决方法:

    实现一个emitDataStream空方法,重写consumeDataStream即可

    @Override
     public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
         return dataStream
             .addSink(new xxxSinkFunction())
             .setParallelism(dataStream.getParallelism())
             .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
     }
    

    暂时还没涉及到TableFormat的自定义,如果后面涉及到这方面的开发再来分享

    详细代码可以参考我的git项目
    https://github.com/zhuxiaoshang/flink-be-god/tree/branch_1.10/flink-connector/flink-sql-connector-customized/src/main/java/sql/connector/customized

    如果对你帮助,帮忙点个star。

    相关文章

      网友评论

        本文标题:Flink实战之自定义flink sql connector

        本文链接:https://www.haomeiwen.com/subject/mzgwcktx.html