美文网首页
聊聊flink的Table Formats

聊聊flink的Table Formats

作者: go4it | 来源:发表于2019-02-04 11:15 被阅读44次

    本文主要研究一下flink的Table Formats

    实例

    CSV Format

    .withFormat(
      new Csv()
        .field("field1", Types.STRING)    // required: ordered format fields
        .field("field2", Types.TIMESTAMP)
        .fieldDelimiter(",")              // optional: string delimiter "," by default
        .lineDelimiter("\n")              // optional: string delimiter "\n" by default
        .quoteCharacter('"')              // optional: single character for string values, empty by default
        .commentPrefix('#')               // optional: string to indicate comments, empty by default
        .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
        .ignoreParseErrors()              // optional: skip records with parse error instead of failing by default
    )
    
    • flink内置支持csv format,无需添加额外依赖

    JSON Format

    .withFormat(
      new Json()
        .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default
    
        // required: define the schema either by using type information which parses numbers to corresponding types
        .schema(Type.ROW(...))
    
        // or by using a JSON schema which parses to DECIMAL and TIMESTAMP
        .jsonSchema(
          "{" +
          "  type: 'object'," +
          "  properties: {" +
          "    lon: {" +
          "      type: 'number'" +
          "    }," +
          "    rideTime: {" +
          "      type: 'string'," +
          "      format: 'date-time'" +
          "    }" +
          "  }" +
          "}"
        )
    
        // or use the table's schema
        .deriveSchema()
    )
    
    • 可以使用schema或者jsonSchema或者deriveSchema来定义json format,需要额外添加flink-json依赖

    Apache Avro Format

    .withFormat(
      new Avro()
    
        // required: define the schema either by using an Avro specific record class
        .recordClass(User.class)
    
        // or by using an Avro schema
        .avroSchema(
          "{" +
          "  \"type\": \"record\"," +
          "  \"name\": \"test\"," +
          "  \"fields\" : [" +
          "    {\"name\": \"a\", \"type\": \"long\"}," +
          "    {\"name\": \"b\", \"type\": \"string\"}" +
          "  ]" +
          "}"
        )
    )
    
    • 可以使用recordClass或者avroSchema来定义Avro schema,需要添加flink-avro依赖

    ConnectTableDescriptor

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala

    abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
        private val tableEnv: TableEnvironment,
        private val connectorDescriptor: ConnectorDescriptor)
      extends TableDescriptor
      with SchematicDescriptor[D]
      with RegistrableDescriptor { this: D =>
    
      private var formatDescriptor: Option[FormatDescriptor] = None
      private var schemaDescriptor: Option[Schema] = None
    
      //......
    
      override def withFormat(format: FormatDescriptor): D = {
        formatDescriptor = Some(format)
        this
      }
    
      //......
    }
    
    • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor;ConnectTableDescriptor提供了withFormat方法,返回FormatDescriptor

    FormatDescriptor

    flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/FormatDescriptor.java

    @PublicEvolving
    public abstract class FormatDescriptor extends DescriptorBase implements Descriptor {
    
        private String type;
    
        private int version;
    
        /**
         * Constructs a {@link FormatDescriptor}.
         *
         * @param type string that identifies this format
         * @param version property version for backwards compatibility
         */
        public FormatDescriptor(String type, int version) {
            this.type = type;
            this.version = version;
        }
    
        @Override
        public final Map<String, String> toProperties() {
            final DescriptorProperties properties = new DescriptorProperties();
            properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type);
            properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version);
            properties.putProperties(toFormatProperties());
            return properties.asMap();
        }
    
        /**
         * Converts this descriptor into a set of format properties. Usually prefixed with
         * {@link FormatDescriptorValidator#FORMAT}.
         */
        protected abstract Map<String, String> toFormatProperties();
    }
    
    • FormatDescriptor是个抽象类,Csv、Json、Avro都是它的子类

    Csv

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Csv.scala

    class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
    
      private var fieldDelim: Option[String] = None
      private var lineDelim: Option[String] = None
      private val schema: mutable.LinkedHashMap[String, String] =
        mutable.LinkedHashMap[String, String]()
      private var quoteCharacter: Option[Character] = None
      private var commentPrefix: Option[String] = None
      private var isIgnoreFirstLine: Option[Boolean] = None
      private var lenient: Option[Boolean] = None
    
      def fieldDelimiter(delim: String): Csv = {
        this.fieldDelim = Some(delim)
        this
      }
    
      def lineDelimiter(delim: String): Csv = {
        this.lineDelim = Some(delim)
        this
      }
    
      def schema(schema: TableSchema): Csv = {
        this.schema.clear()
        schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
          field(n, t)
        }
        this
      }
    
      def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
        field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
        this
      }
    
      def field(fieldName: String, fieldType: String): Csv = {
        if (schema.contains(fieldName)) {
          throw new ValidationException(s"Duplicate field name $fieldName.")
        }
        schema += (fieldName -> fieldType)
        this
      }
    
      def quoteCharacter(quote: Character): Csv = {
        this.quoteCharacter = Option(quote)
        this
      }
    
      def commentPrefix(prefix: String): Csv = {
        this.commentPrefix = Option(prefix)
        this
      }
    
      def ignoreFirstLine(): Csv = {
        this.isIgnoreFirstLine = Some(true)
        this
      }
    
      def ignoreParseErrors(): Csv = {
        this.lenient = Some(true)
        this
      }
    
      override protected def toFormatProperties: util.Map[String, String] = {
        val properties = new DescriptorProperties()
    
        fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
        lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
    
        val subKeys = util.Arrays.asList(
          DescriptorProperties.TABLE_SCHEMA_NAME,
          DescriptorProperties.TABLE_SCHEMA_TYPE)
    
        val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJava
    
        properties.putIndexedFixedProperties(
          FORMAT_FIELDS,
          subKeys,
          subValues)
        quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
        commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
        isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
        lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
    
        properties.asMap()
      }
    }
    
    • Csv提供了field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors等方法

    Json

    flink-json-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Json.java

    public class Json extends FormatDescriptor {
    
        private Boolean failOnMissingField;
        private Boolean deriveSchema;
        private String jsonSchema;
        private String schema;
    
        public Json() {
            super(FORMAT_TYPE_VALUE, 1);
        }
    
        public Json failOnMissingField(boolean failOnMissingField) {
            this.failOnMissingField = failOnMissingField;
            return this;
        }
    
        public Json jsonSchema(String jsonSchema) {
            Preconditions.checkNotNull(jsonSchema);
            this.jsonSchema = jsonSchema;
            this.schema = null;
            this.deriveSchema = null;
            return this;
        }
    
        public Json schema(TypeInformation<Row> schemaType) {
            Preconditions.checkNotNull(schemaType);
            this.schema = TypeStringUtils.writeTypeInfo(schemaType);
            this.jsonSchema = null;
            this.deriveSchema = null;
            return this;
        }
    
        public Json deriveSchema() {
            this.deriveSchema = true;
            this.schema = null;
            this.jsonSchema = null;
            return this;
        }
    
        @Override
        protected Map<String, String> toFormatProperties() {
            final DescriptorProperties properties = new DescriptorProperties();
    
            if (deriveSchema != null) {
                properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema);
            }
    
            if (jsonSchema != null) {
                properties.putString(FORMAT_JSON_SCHEMA, jsonSchema);
            }
    
            if (schema != null) {
                properties.putString(FORMAT_SCHEMA, schema);
            }
    
            if (failOnMissingField != null) {
                properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField);
            }
    
            return properties.asMap();
        }
    }
    
    • Json提供了schema、jsonSchema、deriveSchema三种方式来定义json format

    Avro

    flink-avro-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Avro.java

    public class Avro extends FormatDescriptor {
    
        private Class<? extends SpecificRecord> recordClass;
        private String avroSchema;
    
        public Avro() {
            super(AvroValidator.FORMAT_TYPE_VALUE, 1);
        }
    
        public Avro recordClass(Class<? extends SpecificRecord> recordClass) {
            Preconditions.checkNotNull(recordClass);
            this.recordClass = recordClass;
            return this;
        }
    
        public Avro avroSchema(String avroSchema) {
            Preconditions.checkNotNull(avroSchema);
            this.avroSchema = avroSchema;
            return this;
        }
    
        @Override
        protected Map<String, String> toFormatProperties() {
            final DescriptorProperties properties = new DescriptorProperties();
    
            if (null != recordClass) {
                properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass);
            }
            if (null != avroSchema) {
                properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema);
            }
    
            return properties.asMap();
        }
    }
    
    • Avro提供了recordClass、avroSchema两种方式来定义avro format

    小结

    • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor
    • ConnectTableDescriptor提供了withFormat方法,返回FormatDescriptor;FormatDescriptor是个抽象类,Csv、Json、Avro都是它的子类
    • Csv提供了field、fieldDelimiter、lineDelimiter、quoteCharacter、commentPrefix、ignoreFirstLine、ignoreParseErrors等方法;Json提供了schema、jsonSchema、deriveSchema三种方式来定义json format;Avro提供了recordClass、avroSchema两种方式来定义avro format

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的Table Formats

          本文链接:https://www.haomeiwen.com/subject/dtspsqtx.html