美文网首页
flink table factory基础知识

flink table factory基础知识

作者: loukey_j | 来源:发表于2019-10-27 17:47 被阅读0次

    一、概述

    在flink中很多组件都是TableFactory的子类。比如序列化,反序列化,tableSinkFactory,tableSourceFactory. TableFactory是用来创建序列化,反序列器,tableSource和 tableSink的工厂。

    二、TableFactory源码

    在flink框架中,TableFactory 的子类并不是程序员自己随心 new 出来的。flink的提供给程序员初始TableFactory的途径是通过属性去初始化。可以理解为首先flink会加载出所有的TableFactory子类实现,用户需要传递属性给flink,flink自动去查找出具有你对应属性的TableFactory,然后进行初始化,返回给程序员。所以在TableFactory 的源码中只有这2个方法,如下。

    public interface TableFactory {

        Map requiredContext();  //定义这个TableFactory  必须要有哪些属性。

        List supportedProperties(); //定义这个TableFactory 支持哪些属性

    }

    有了这2个方法。用户想要找到某个特定的或者自定义的TableFactory 实现,那就在传给flink的属性中必须要含有,必须属性,必须属性一个都不能少,不然找不到。除此之外你还可能会传很多其他属性。这些非必须属性,也不能瞎传,你不需要把supportedProperties的属性都传入,但是你传入的非必须属性必须是supportedProperties内。凡是报错说找不到TableFactory 的一定是某个环节被不满足,或者压根就没加载。如何加载请往下看。

    三、TableFactory实现

    trait BatchTableSourceFactory[T]    extends TableFactory  //用于创建批的TableSource

    trait BatchTableSinkFactory[T]        extends TableFactory //用于创建批的TableSink

    trait StreamTableSourceFactory[T] extends TableFactory //用于创建流的TableSource

    trait StreamTableSinkFactory[T]     extends TableFactory //用于创建流的TableSink

    TableSource、TableSink 是什么请查看 

    flink tableSource tableSink 基础知识 https://www.jianshu.com/p/036ca8d40cb9

    public interface TableFormatFactory extends TableFactory //用于创建序列化和反序列化器

    public interface SerializationSchemaFactory    extends TableFormatFactory //用于创建序列化器

    public interface DeserializationSchemaFactory extends TableFormatFactory //用于创建反序列化器

    什么时候需要序列化,什么时候需要反序列化呢?从kafka中收到一条消息需要转换成table中的一行这需要将字节数组反序列成一个row。反过来,table中的一行需要sink到kafka需要将row序列化成一个字节数组。

    这里以kafka为例看下StreamTableSourceFactory 和 StreamTableSinkFactory

    public abstract class KafkaTableSourceSinkFactoryBase     implements StreamTableSourceFactory,StreamTableSinkFactory{

        public MaprequiredContext() //Override TableFactory 的方法

        public ListsupportedProperties()  //Override TableFactory 的方法

        //Override StreamTableSinkFactory中方法 创建TableSink

        public StreamTableSink createStreamTableSink(Map properties) 

        //Override StreamTableSourceFactory中方法 创建TableSource

        public StreamTableSource createStreamTableSource(Map properties)

    }

    在kafka中KafkaTableSourceSinkFactoryBase 把 StreamTableSourceFactory,StreamTableSinkFactory一起实现了。我个人觉得还是分开了好。

    四、如何加载TableFactory

    我第一次跟踪源码是怎么创建TableFactory的时候非常困惑,在他选择TableFactory的时候感觉有一些莫名其妙。特别是自定的一个TableFactory压根就找不到。

    1、首先对于自定义的TableFactory一定要在META-INF 下建 services 目录,在services 下创建的包路径org.apache.flink.table.factories.TableFactory的文件,文件写入自定义TableFactory的实现类的吧。比如在Kafka010TableSourceSinkFactory 对应的jar flink-connector-kafka-0.10_2.11-1.7.0.jar中可以看到如下图

    Kafka010TableSourceSinkFactory

    2、寻找table创建ableSoure注册table

    val tableSource:TableSoure = TableFactoryUtil.findAndCreateTableSource(tableEnv, this)

    tableEnv.registerTableSource(name, tableSource)

    3、寻找table创建tableSoure

    TableFactoryUtil.findAndCreateTableSource

    4、查找tableFactory

    def find[T](factoryClass:Class[T], propertyMap: JMap[String, String]):T = {

    findInternal(factoryClass, propertyMap, None)

    }

    TableFactoryService.findInternal

    相关文章

      网友评论

          本文标题:flink table factory基础知识

          本文链接:https://www.haomeiwen.com/subject/avapvctx.html