美文网首页
聊聊flink的CsvTableSource

聊聊flink的CsvTableSource

作者: go4it | 来源:发表于2019-02-05 09:37 被阅读19次

    本文主要研究一下flink的CsvTableSource

    TableSource

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala

    trait TableSource[T] {
    
      /** Returns the [[TypeInformation]] for the return type of the [[TableSource]].
        * The fields of the return type are mapped to the table schema based on their name.
        *
        * @return The type of the returned [[DataSet]] or [[DataStream]].
        */
      def getReturnType: TypeInformation[T]
    
      /**
        * Returns the schema of the produced table.
        *
        * @return The [[TableSchema]] of the produced table.
        */
      def getTableSchema: TableSchema
    
      /**
        * Describes the table source.
        *
        * @return A String explaining the [[TableSource]].
        */
      def explainSource(): String =
        TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames)
    }
    
    • TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource

    BatchTableSource

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scala

    trait BatchTableSource[T] extends TableSource[T] {
    
      /**
        * Returns the data of the table as a [[DataSet]].
        *
        * NOTE: This method is for internal use only for defining a [[TableSource]].
        *       Do not use it in Table API programs.
        */
      def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
    }
    
    • BatchTableSource继承了TableSource,它定义了getDataSet方法

    StreamTableSource

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/StreamTableSource.scala

    trait StreamTableSource[T] extends TableSource[T] {
    
      /**
        * Returns the data of the table as a [[DataStream]].
        *
        * NOTE: This method is for internal use only for defining a [[TableSource]].
        *       Do not use it in Table API programs.
        */
      def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
    }
    
    • StreamTableSource继承了TableSource,它定义了getDataStream方法

    CsvTableSource

    flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/CsvTableSource.scala

    class CsvTableSource private (
        private val path: String,
        private val fieldNames: Array[String],
        private val fieldTypes: Array[TypeInformation[_]],
        private val selectedFields: Array[Int],
        private val fieldDelim: String,
        private val rowDelim: String,
        private val quoteCharacter: Character,
        private val ignoreFirstLine: Boolean,
        private val ignoreComments: String,
        private val lenient: Boolean)
      extends BatchTableSource[Row]
      with StreamTableSource[Row]
      with ProjectableTableSource[Row] {
    
      def this(
        path: String,
        fieldNames: Array[String],
        fieldTypes: Array[TypeInformation[_]],
        fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
        rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
        quoteCharacter: Character = null,
        ignoreFirstLine: Boolean = false,
        ignoreComments: String = null,
        lenient: Boolean = false) = {
    
        this(
          path,
          fieldNames,
          fieldTypes,
          fieldTypes.indices.toArray, // initially, all fields are returned
          fieldDelim,
          rowDelim,
          quoteCharacter,
          ignoreFirstLine,
          ignoreComments,
          lenient)
    
      }
    
      def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
        this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
          CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
      }
    
      if (fieldNames.length != fieldTypes.length) {
        throw new TableException("Number of field names and field types must be equal.")
      }
    
      private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
      private val selectedFieldNames = selectedFields.map(fieldNames(_))
    
      private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)
    
      override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
        execEnv.createInput(createCsvInput(), returnType).name(explainSource())
      }
    
      /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
      override def getReturnType: RowTypeInfo = returnType
    
      override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
        streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource())
      }
    
      /** Returns the schema of the produced table. */
      override def getTableSchema = new TableSchema(fieldNames, fieldTypes)
    
      /** Returns a copy of [[TableSource]] with ability to project fields */
      override def projectFields(fields: Array[Int]): CsvTableSource = {
    
        val selectedFields = if (fields.isEmpty) Array(0) else fields
    
        new CsvTableSource(
          path,
          fieldNames,
          fieldTypes,
          selectedFields,
          fieldDelim,
          rowDelim,
          quoteCharacter,
          ignoreFirstLine,
          ignoreComments,
          lenient)
      }
    
      private def createCsvInput(): RowCsvInputFormat = {
        val inputFormat = new RowCsvInputFormat(
          new Path(path),
          selectedFieldTypes,
          rowDelim,
          fieldDelim,
          selectedFields)
    
        inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
        inputFormat.setLenient(lenient)
        if (quoteCharacter != null) {
          inputFormat.enableQuotedStringParsing(quoteCharacter)
        }
        if (ignoreComments != null) {
          inputFormat.setCommentPrefix(ignoreComments)
        }
    
        inputFormat
      }
    
      override def equals(other: Any): Boolean = other match {
        case that: CsvTableSource => returnType == that.returnType &&
            path == that.path &&
            fieldDelim == that.fieldDelim &&
            rowDelim == that.rowDelim &&
            quoteCharacter == that.quoteCharacter &&
            ignoreFirstLine == that.ignoreFirstLine &&
            ignoreComments == that.ignoreComments &&
            lenient == that.lenient
        case _ => false
      }
    
      override def hashCode(): Int = {
        returnType.hashCode()
      }
    
      override def explainSource(): String = {
        s"CsvTableSource(" +
          s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
      }
    }
    
    • CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
    • ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来
    • getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串

    小结

    • TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource;BatchTableSource继承了TableSource,它定义了getDataSet方法;StreamTableSource继承了TableSource,它定义了getDataStream方法
    • CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
    • ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来;getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的CsvTableSource

          本文链接:https://www.haomeiwen.com/subject/kaamsqtx.html