美文网首页
聊聊flink的TableFactory

聊聊flink的TableFactory

作者: go4it | 来源:发表于2019-02-07 12:52 被阅读27次

    本文主要研究一下flink的TableFactory

    实例

    class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {
    
      @Override
      public Map<String, String> requiredContext() {
        Map<String, String> context = new HashMap<>();
        context.put("update-mode", "append");
        context.put("connector.type", "my-system");
        return context;
      }
    
      @Override
      public List<String> supportedProperties() {
        List<String> list = new ArrayList<>();
        list.add("connector.debug");
        return list;
      }
    
      @Override
      public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
        boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));
    
        # additional validation of the passed properties can also happen here
    
        return new MySystemAppendTableSource(isDebug);
      }
    }
    
    public class MySystemConnector extends ConnectorDescriptor {
    
      public final boolean isDebug;
    
      public MySystemConnector(boolean isDebug) {
        super("my-system", 1, false);
        this.isDebug = isDebug;
      }
    
      @Override
      protected Map<String, String> toConnectorProperties() {
        Map<String, String> properties = new HashMap<>();
        properties.put("connector.debug", Boolean.toString(isDebug));
        return properties;
      }
    }
    
    • 本实例定义了MySystemTableSourceFactory,它的requiredContext为update-mode=append及connector.type=my-system,它的supportedProperties为connector.debug,它的createStreamTableSource方法创建的是MySystemAppendTableSource;MySystemConnector继承了ConnectorDescriptor,它定义了connector.type值为my-system,connector.property-version值为1,formatNeeded属性为false,其toConnectorProperties定义了connector.debug的值

    TableFactory

    flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactory.java

    @PublicEvolving
    public interface TableFactory {
    
        Map<String, String> requiredContext();
    
        List<String> supportedProperties();
    }
    
    • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常

    BatchTableSourceFactory

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSourceFactory.scala

    trait BatchTableSourceFactory[T] extends TableFactory {
    
      def createBatchTableSource(properties: util.Map[String, String]): BatchTableSource[T]
    }
    
    • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法

    BatchTableSinkFactory

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSinkFactory.scala

    trait BatchTableSinkFactory[T] extends TableFactory {
    
      def createBatchTableSink(properties: util.Map[String, String]): BatchTableSink[T]
    }
    
    • BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法

    StreamTableSourceFactory

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSourceFactory.scala

    trait StreamTableSourceFactory[T] extends TableFactory {
    
      def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[T]
    }
    
    • StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法

    StreamTableSinkFactory

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSinkFactory.scala

    trait StreamTableSinkFactory[T] extends TableFactory {
    
      def createStreamTableSink(properties: util.Map[String, String]): StreamTableSink[T]
    }
    
    • StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法

    ConnectorDescriptor

    flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectorDescriptor.java

    @PublicEvolving
    public abstract class ConnectorDescriptor extends DescriptorBase implements Descriptor {
    
        private String type;
    
        private int version;
    
        private boolean formatNeeded;
    
        /**
         * Constructs a {@link ConnectorDescriptor}.
         *
         * @param type string that identifies this connector
         * @param version property version for backwards compatibility
         * @param formatNeeded flag for basic validation of a needed format descriptor
         */
        public ConnectorDescriptor(String type, int version, boolean formatNeeded) {
            this.type = type;
            this.version = version;
            this.formatNeeded = formatNeeded;
        }
    
        @Override
        public final Map<String, String> toProperties() {
            final DescriptorProperties properties = new DescriptorProperties();
            properties.putString(CONNECTOR_TYPE, type);
            properties.putLong(CONNECTOR_PROPERTY_VERSION, version);
            properties.putProperties(toConnectorProperties());
            return properties.asMap();
        }
    
        /**
         * Returns if this connector requires a format descriptor.
         */
        protected final boolean isFormatNeeded() {
            return formatNeeded;
        }
    
        /**
         * Converts this descriptor into a set of connector properties. Usually prefixed with
         * {@link FormatDescriptorValidator#FORMAT}.
         */
        protected abstract Map<String, String> toConnectorProperties();
    }
    
    • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性

    TableFactoryUtil

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryUtil.scala

    object TableFactoryUtil {
    
      /**
        * Returns a table source for a table environment.
        */
      def findAndCreateTableSource[T](
          tableEnvironment: TableEnvironment,
          descriptor: Descriptor)
        : TableSource[T] = {
    
        val javaMap = descriptor.toProperties
    
        tableEnvironment match {
          case _: BatchTableEnvironment =>
            TableFactoryService
              .find(classOf[BatchTableSourceFactory[T]], javaMap)
              .createBatchTableSource(javaMap)
    
          case _: StreamTableEnvironment =>
            TableFactoryService
              .find(classOf[StreamTableSourceFactory[T]], javaMap)
              .createStreamTableSource(javaMap)
    
          case e@_ =>
            throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
        }
      }
    
      /**
        * Returns a table sink for a table environment.
        */
      def findAndCreateTableSink[T](
          tableEnvironment: TableEnvironment,
          descriptor: Descriptor)
        : TableSink[T] = {
    
        val javaMap = descriptor.toProperties
    
        tableEnvironment match {
          case _: BatchTableEnvironment =>
            TableFactoryService
              .find(classOf[BatchTableSinkFactory[T]], javaMap)
              .createBatchTableSink(javaMap)
    
          case _: StreamTableEnvironment =>
            TableFactoryService
              .find(classOf[StreamTableSinkFactory[T]], javaMap)
              .createStreamTableSink(javaMap)
    
          case e@_ =>
            throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
        }
      }
    }
    
    • TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory

    TableFactoryService

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryService.scala

    object TableFactoryService extends Logging {
    
      private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
    
      /**
        * Finds a table factory of the given class and descriptor.
        *
        * @param factoryClass desired factory class
        * @param descriptor descriptor describing the factory configuration
        * @tparam T factory class type
        * @return the matching factory
        */
      def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
        Preconditions.checkNotNull(descriptor)
    
        findInternal(factoryClass, descriptor.toProperties, None)
      }
    
      /**
        * Finds a table factory of the given class, descriptor, and classloader.
        *
        * @param factoryClass desired factory class
        * @param descriptor descriptor describing the factory configuration
        * @param classLoader classloader for service loading
        * @tparam T factory class type
        * @return the matching factory
        */
      def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
        Preconditions.checkNotNull(descriptor)
        Preconditions.checkNotNull(classLoader)
    
        findInternal(factoryClass, descriptor.toProperties, Some(classLoader))
      }
    
      /**
        * Finds a table factory of the given class and property map.
        *
        * @param factoryClass desired factory class
        * @param propertyMap properties that describe the factory configuration
        * @tparam T factory class type
        * @return the matching factory
        */
      def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
        findInternal(factoryClass, propertyMap, None)
      }
    
      /**
        * Finds a table factory of the given class, property map, and classloader.
        *
        * @param factoryClass desired factory class
        * @param propertyMap properties that describe the factory configuration
        * @param classLoader classloader for service loading
        * @tparam T factory class type
        * @return the matching factory
        */
      def find[T](
          factoryClass: Class[T],
          propertyMap: JMap[String, String],
          classLoader: ClassLoader)
        : T = {
        Preconditions.checkNotNull(classLoader)
    
        findInternal(factoryClass, propertyMap, Some(classLoader))
      }
    
      /**
        * Finds a table factory of the given class, property map, and classloader.
        *
        * @param factoryClass desired factory class
        * @param propertyMap properties that describe the factory configuration
        * @param classLoader classloader for service loading
        * @tparam T factory class type
        * @return the matching factory
        */
      private def findInternal[T](
          factoryClass: Class[T],
          propertyMap: JMap[String, String],
          classLoader: Option[ClassLoader])
        : T = {
    
        Preconditions.checkNotNull(factoryClass)
        Preconditions.checkNotNull(propertyMap)
    
        val properties = propertyMap.asScala.toMap
    
        val foundFactories = discoverFactories(classLoader)
    
        val classFactories = filterByFactoryClass(
          factoryClass,
          properties,
          foundFactories)
    
        val contextFactories = filterByContext(
          factoryClass,
          properties,
          foundFactories,
          classFactories)
    
        filterBySupportedProperties(
          factoryClass,
          properties,
          foundFactories,
          contextFactories)
      }
    
      /**
        * Searches for factories using Java service providers.
        *
        * @return all factories in the classpath
        */
      private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
        try {
          val iterator = classLoader match {
            case Some(customClassLoader) =>
              val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
              customLoader.iterator()
            case None =>
              defaultLoader.iterator()
          }
          iterator.asScala.toSeq
        } catch {
          case e: ServiceConfigurationError =>
            LOG.error("Could not load service provider for table factories.", e)
            throw new TableException("Could not load service provider for table factories.", e)
        }
      }
    
      /**
        * Filters factories with matching context by factory class.
        */
      private def filterByFactoryClass[T](
          factoryClass: Class[T],
          properties: Map[String, String],
          foundFactories: Seq[TableFactory])
        : Seq[TableFactory] = {
    
        val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
        if (classFactories.isEmpty) {
          throw new NoMatchingTableFactoryException(
            s"No factory implements '${factoryClass.getCanonicalName}'.",
            factoryClass,
            foundFactories,
            properties)
        }
        classFactories
      }
    
      /**
        * Filters for factories with matching context.
        *
        * @return all matching factories
        */
      private def filterByContext[T](
          factoryClass: Class[T],
          properties: Map[String, String],
          foundFactories: Seq[TableFactory],
          classFactories: Seq[TableFactory])
        : Seq[TableFactory] = {
    
        val matchingFactories = classFactories.filter { factory =>
          val requestedContext = normalizeContext(factory)
    
          val plainContext = mutable.Map[String, String]()
          plainContext ++= requestedContext
          // we remove the version for now until we have the first backwards compatibility case
          // with the version we can provide mappings in case the format changes
          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
          plainContext.remove(FORMAT_PROPERTY_VERSION)
          plainContext.remove(METADATA_PROPERTY_VERSION)
          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    
          // check if required context is met
          plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
        }
    
        if (matchingFactories.isEmpty) {
          throw new NoMatchingTableFactoryException(
            "No context matches.",
            factoryClass,
            foundFactories,
            properties)
        }
    
        matchingFactories
      }
    
      /**
        * Prepares the properties of a context to be used for match operations.
        */
      private def normalizeContext(factory: TableFactory): Map[String, String] = {
        val requiredContextJava = factory.requiredContext()
        if (requiredContextJava == null) {
          throw new TableException(
            s"Required context of factory '${factory.getClass.getName}' must not be null.")
        }
        requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
      }
    
      /**
        * Filters the matching class factories by supported properties.
        */
      private def filterBySupportedProperties[T](
          factoryClass: Class[T],
          properties: Map[String, String],
          foundFactories: Seq[TableFactory],
          classFactories: Seq[TableFactory])
        : T = {
    
        val plainGivenKeys = mutable.ArrayBuffer[String]()
        properties.keys.foreach { k =>
          // replace arrays with wildcard
          val key = k.replaceAll(".\\d+", ".#")
          // ignore duplicates
          if (!plainGivenKeys.contains(key)) {
            plainGivenKeys += key
          }
        }
        var lastKey: Option[String] = None
        val supportedFactories = classFactories.filter { factory =>
          val requiredContextKeys = normalizeContext(factory).keySet
          val (supportedKeys, wildcards) = normalizeSupportedProperties(factory)
          // ignore context keys
          val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_))
          // perform factory specific filtering of keys
          val givenFilteredKeys = filterSupportedPropertiesFactorySpecific(
            factory,
            givenContextFreeKeys)
    
          givenFilteredKeys.forall { k =>
            lastKey = Option(k)
            supportedKeys.contains(k) || wildcards.exists(k.startsWith)
          }
        }
    
        if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
          // special case: when there is only one matching factory but the last property key
          // was incorrect
          val factory = classFactories.head
          val (supportedKeys, _) = normalizeSupportedProperties(factory)
          throw new NoMatchingTableFactoryException(
            s"""
              |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
              |
              |Supported properties of this factory are:
              |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
            factoryClass,
            foundFactories,
            properties)
        } else if (supportedFactories.isEmpty) {
          throw new NoMatchingTableFactoryException(
            s"No factory supports all properties.",
            factoryClass,
            foundFactories,
            properties)
        } else if (supportedFactories.length > 1) {
          throw new AmbiguousTableFactoryException(
            supportedFactories,
            factoryClass,
            foundFactories,
            properties)
        }
    
        supportedFactories.head.asInstanceOf[T]
      }
    
      /**
        * Prepares the supported properties of a factory to be used for match operations.
        */
      private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = {
        val supportedPropertiesJava = factory.supportedProperties()
        if (supportedPropertiesJava == null) {
          throw new TableException(
            s"Supported properties of factory '${factory.getClass.getName}' must not be null.")
        }
        val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase)
    
        // extract wildcard prefixes
        val wildcards = extractWildcardPrefixes(supportedKeys)
    
        (supportedKeys, wildcards)
      }
    
      /**
        * Converts the prefix of properties with wildcards (e.g., "format.*").
        */
      private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = {
        propertyKeys
          .filter(_.endsWith("*"))
          .map(s => s.substring(0, s.length - 1))
      }
    
      /**
        * Performs filtering for special cases (i.e. table format factories with schema derivation).
        */
      private def filterSupportedPropertiesFactorySpecific(
          factory: TableFactory,
          keys: Seq[String])
        : Seq[String] = factory match {
    
        case formatFactory: TableFormatFactory[_] =>
          val includeSchema = formatFactory.supportsSchemaDerivation()
          // ignore non-format (or schema) keys
          keys.filter { k =>
            if (includeSchema) {
              k.startsWith(SchemaValidator.SCHEMA + ".") ||
                k.startsWith(FormatDescriptorValidator.FORMAT + ".")
            } else {
              k.startsWith(FormatDescriptorValidator.FORMAT + ".")
            }
          }
    
        case _ =>
          keys
      }
    }
    
    • TableFactoryService主要用于根据factoryClass及Descriptor(或者descriptor.toProperties)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

    小结

    • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常
    • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法;BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法;StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法;StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法
    • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性
    • TableFactoryUtil是个工具类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory
    • TableFactoryService主要用于根据factoryClass及Descriptor(或者descriptor.toProperties)来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的TableFactory

          本文链接:https://www.haomeiwen.com/subject/ozgwsqtx.html