美文网首页
apache iceberg v1写记录

apache iceberg v1写记录

作者: 神奇的考拉 | 来源:发表于2021-10-06 19:13 被阅读0次

一、概述
基于Iceberg-Flink- V1相关api来完成的数据写入


写入过程

二、实现

关键源码---FlinkSink.chainIcebergOperators

private <T> DataStreamSink<T> chainIcebergOperators() {
  // 省略部分代码
  // 加载iceberg table
  if (table == null) {
    tableLoader.open();  // open TableLoader
    try (TableLoader loader = tableLoader) {
      this.table = loader.loadTable();
    } catch (IOException e) {
      throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
    }
  }

  // 需将icebeg table schema转为Flink Row类型
  // Convert the requested flink table schema to flink row type.
  RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);

  // 根据指定partition field并且对应的分发模式是hash,通过keyBy(equality fields)来将输入进行分发
  // 若是分发模式是None不需要对输入流做任何处理直接返回
  // 若是分发模式是Range 当前版本还不支持
  // Distribute the records from input data stream based on the write.distribution-mode.
  DataStream<RowData> distributeStream = distributeDataStream(
      rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);

  // 此处通过并行writer来完成datafile写入
  // 不过需要注意的写入是否开启upsert,一旦开启则不能对datafile进行overwrite只能使用append
  // Add parallel writers that append rows to files
  SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);

  // 在checkpoint完成时或输入流结束,则通过单并行度完成data file的提交
  // Add single-parallelism committer that commits files
  // after successful checkpoint or end of input
  SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
  // 若是本次checkpoint成功,但commit文件失败,通过指定虚拟discard接收器在下次checkpoint成功在进行提交
  // Add dummy discard sink
  return appendDummySink(committerStream);
}

操作代码拆解

- distributeDataStream:输入流内容分组

private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
                                                   Map<String, String> properties,
                                                   PartitionSpec partitionSpec,
                                                   Schema iSchema,
                                                   RowType flinkRowType) {
    // 首先获取指定的write.distribution-mode                                                   
    DistributionMode writeMode;
    if (distributionMode == null) { // 默认使用NONE
      // Fallback to use distribution mode parsed from table properties if don't specify in job level.
      String modeName = PropertyUtil.propertyAsString(properties,
          WRITE_DISTRIBUTION_MODE,
          WRITE_DISTRIBUTION_MODE_DEFAULT);

      writeMode = DistributionMode.fromName(modeName);
    } else {   // HASH和RANGE
      writeMode = distributionMode;
    }

    switch (writeMode) {
      case NONE:  // 不需要做任何处理 直接返回
        return input;

      case HASH:  // 目前只要时按照partition field来进行分组的
        if (partitionSpec.isUnpartitioned()) { // 未分区
          return input;
        } else { // 若是分区,则通过partition field 进行keyBy
          return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
        }

      case RANGE: // 目前不支持,直接返回
        LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
            WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
        return input;

      default:
        throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
    }
  }
}

- appendWriter: 并行写入datafile

private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
   // 1、获取指定EquailityFields 
  // Find out the equality field id list based on the user-provided equality field column names.
  List<Integer> equalityFieldIds = Lists.newArrayList();
  if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
    for (String column : equalityFieldColumns) {
      org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
      Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
          column, table.schema());
      equalityFieldIds.add(field.fieldId());
    }
  }
  // 2、判断当前操作是否时upsert(可以通过job级别和table级别进行设置)
  // Fallback to use upsert mode parsed from table properties if don't specify in job level.
  boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
      UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);

  // Validate the equality fields and partition fields if we enable the upsert mode.
  // 若是当前操作upsert 则通过需要进行EquailityFields检查(若是写入时分区表,则对应的partition field也需要存在其中),
  // 并且当前针对datafile操作是不能进行overwrite
  if (upsertMode) {
    // upsert操作下 不能进行overwrite  
    Preconditions.checkState(!overwrite,
        "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
    // upsert操作下 对应的EquailityFields是不允许为空     
    Preconditions.checkState(!equalityFieldIds.isEmpty(),
        "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
    // 是否分区表    
    if (!table.spec().isUnpartitioned()) {
      for (PartitionField partitionField : table.spec().fields()) { // 分区表对应的分区字段必须存在EquailityFields集合中
        Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
            "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
            partitionField, equalityFieldColumns);
      }
    }
  }

  // 3、构建StreamWriter
  IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);
  // 3.1 将输入流写入到datafile是可以并发进行的
  int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
  SingleOutputStreamOperator<WriteResult> writerStream = input
      .transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
      .setParallelism(parallelism);
  if (uidPrefix != null) {
    writerStream = writerStream.uid(uidPrefix + "-writer");
  }
  return writerStream;
}

// 补充代码: 创建StreamWriter
static IcebergStreamWriter<RowData> createStreamWriter(Table table,
                                                       RowType flinkRowType,
                                                       List<Integer> equalityFieldIds,
                                                       boolean upsert) {
  // 1、获取指定的table 配置信息                                                         
  Preconditions.checkArgument(table != null, "Iceberg table should't be null");
  Map<String, String> props = table.properties();
  // 1.1 目标文件大小
  long targetFileSize = getTargetFileSizeBytes(props);
      // 1.2 file format(orc/parquet/avro)
  FileFormat fileFormat = getFileFormat(props);
  // 2、构建一个read-only且可序列化的表
  Table serializableTable = SerializableTable.copyOf(table);
  // 3、构建TaskWriter
  TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
      serializableTable, flinkRowType, targetFileSize,
      fileFormat, equalityFieldIds, upsert);
  // 4、创建StreamWriter
  return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}

- appendCommitter: 当checkpoint成功或输入流结束,通过文件提交器完成对应的文件commit

private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
  // 1、构建FilesCommitter  
  IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
  // 2、单并行度文件提交器
  SingleOutputStreamOperator<Void> committerStream = writerStream
      .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
      .setParallelism(1)
      .setMaxParallelism(1);
  if (uidPrefix != null) {
    committerStream = committerStream.uid(uidPrefix + "-committer");
  }
  return committerStream;
}

三、实例:基于HadoopCatalog进行iceberg数据写入

// 省略部分代码 
// flink datastream api并开启checkpoint
final Configuration config = new Configuration();
Map<String, String> properties =
        new ImmutableMap.Builder<String, String>()
          .put(CatalogProperties.WAREHOUSE_LOCATION, "file:///Users/XXX/tests/iceberg_namespace")
          .put(TableProperties.DEFAULT_FILE_FORMAT, "parquet")
          .build();

final Catalog catalog = CatalogUtil.loadCatalog("org.apache.iceberg.hadoop.HadoopCatalog", "hadoop", properties, config);
// final HadoopCatalog catalog = new HadoopCatalog();
// catalog.setConf(config);
// catalog.initialize("hadoop", properties);
//  schema
final Schema schema = new Schema(
        required(1, "data", Types.StringType.get()),
        required(2, "nested", Types.StructType.of(
                Types.NestedField.required(3, "f1", Types.StringType.get()),
                Types.NestedField.required(4, "f2", Types.StringType.get()),
                Types.NestedField.required(5, "f3", Types.LongType.get()))),
        required(6, "id", Types.LongType.get()));
// TableIdentifier
final TableIdentifier tableIdentifier = TableIdentifier.of("iceberg_db", "iceberg_table");
Table table = null;
// table
if (catalog.tableExists(tableIdentifier)) {
  table = catalog.loadTable(tableIdentifier);
} else {
  table = catalog.createTable(tableIdentifier, schema);
}
// TableLoader
final TableLoader tableLoader = TableLoader.fromCatalog(CatalogLoader.hadoop("hadoop", config, properties), tableIdentifier);
// 省略部分代码 datastream转为DataStream<RowData>
// FlinkSink
FlinkSink.forRowData(DataStream<RowData>)
        .table(table)
        .tableLoader(tableLoader)
        .writeParallelism(parallelism)
        .append();
// =========  省略部分代码  =========== // 

相关文章

网友评论

      本文标题:apache iceberg v1写记录

      本文链接:https://www.haomeiwen.com/subject/vtozgltx.html