Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
数据写入流程
接上一篇Flink Hudi 源码之HoodieTableSink。
我们从StreamWriteFunction
数据流写入逻辑的flushBucket
方法开始分析。flushBucket
将bucket中所有数据写入底层存储。
SreamWriteFunction
本篇的分析从flushBucket
方法开始。
private boolean flushBucket(DataBucket bucket) {
// 获取最近一次未提交的instant time
String instant = instantToWrite(true);
// 如果获取不到instant,说明没有输入数据,方法返回
if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, skip.");
return false;
}
// 获取bucket缓存中的HoodieRecord
List<HoodieRecord> records = bucket.writeBuffer();
// 检查buffer中必须要有数据
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
// 根据write.insert.drop.duplicates配置项,决定insert是否去重
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
// 修改buffer中第一条数据的location信息(instant time和fileID)
bucket.preWrite(records);
// 执行writeFunction,写入数据
// writeFunction后面分析
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
// 清空records集合
records.clear();
// 构建消息,表示数据已写入
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
.writeStatus(writeStatus)
.lastBatch(false)
.endInput(false)
.build();
// 发送消息给coordinator
this.eventGateway.sendEventToCoordinator(event);
writeStatuses.addAll(writeStatus);
return true;
}
写入逻辑位于writeFunction
中,但是他们的逻辑看不到。我们查看writeFunction
的创建过程,在initWriteFunction
方法中:
private void initWriteFunction() {
final String writeOperation = this.config.get(FlinkOptions.OPERATION);
switch (WriteOperationType.fromValue(writeOperation)) {
case INSERT:
this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
break;
case UPSERT:
this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
break;
case INSERT_OVERWRITE:
this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime);
break;
case INSERT_OVERWRITE_TABLE:
this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime);
break;
default:
throw new RuntimeException("Unsupported write operation : " + writeOperation);
}
}
该方法中的writeClient
为HoodieFlinkWriteClient
。判断writeOperation
的值,调用writeClient
对应的处理方法。
HoodieFlinkWriteClient
由于涉及的操作种类比较多,本篇我们从writeClient
的insert
方法入手分析。
@Override
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
// 创建HoodieTable
// 根据table类型(MOR或COW),创建HoodieFlinkMergeOnReadTable或HoodieFlinkCopyOnWriteTable
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
// 检查要写入数据的schema和table的schema是否匹配
table.validateUpsertSchema();
// 执行预写入操作,给writeClient设置operationType
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// create the write handle if not exists
// 如果HoodieWriteHandle不存在,创建一个
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), instantTime, table, records.listIterator());
// 调用table的insert方法,将数据插入
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).insert(context, writeHandle, instantTime, records);
// 如果记录了索引查找耗时,更新监控仪表盘
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
// 执行写入后操作,更新监控仪表数据并返回写入状态
return postWrite(result, instantTime, table);
}
table的insert操作逻辑和writeHandle
的类型有关,所以我们先分析getOrCreateWriteHandle
方法,了解下Hudi会根据表类型和操作类型创建出什么种类的writeHandle
。
private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
HoodieRecord<T> record,
HoodieWriteConfig config,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
Iterator<HoodieRecord<T>> recordItr) {
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
// Always use FlinkCreateHandle when insert duplication turns on
// 如果允许插入重复数据,返回FlinkCreateHandle
if (config.allowDuplicateInserts()) {
return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
}
// bucketToHandles保存了file和handle的对应关系
if (bucketToHandles.containsKey(fileID)) {
// 如果找到file对应的handle,获取它
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
// 是否需要合并log到file中,默认为true
// 但对于FlinkAppendHandle只需要追加log,所以它重写了该方法,返回false
if (lastHandle.shouldReplace()) {
// 如果需要合并,创建一个FlinkMergeAndReplaceHandle
HoodieWriteHandle<?, ?, ?, ?> writeHandle = new FlinkMergeAndReplaceHandle<>(
config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(),
lastHandle.getWritePath());
// 将这个handle和fileID对应起来
this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
return writeHandle;
}
}
final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (isDelta) {
// 如果表为MERGE_ON_READ类型
// 使用FlinkAppendHandle
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else if (loc.getInstantTime().equals("I")) {
// 如果是新插入的数据,使用FlinkCreateHandle
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
} else {
// 否则使用FlinkMergeHandle
writeHandle = new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath,
fileID, table.getTaskContextSupplier());
}
// 加入对应关系中
this.bucketToHandles.put(fileID, writeHandle);
return writeHandle;
}
这里Hudi可能创建出的writeHandle
有如下四种:
- FlinkCreateHandle:增量写入
- FlinkMergeAndReplaceHandle:增量合并写入(复写旧文件)
- FlinkAppendHandle:追加写入(MOR表)
- FlinkMergeHandle:增量合并写入(滚动写入新文件)
我们回到HoodieFlinkWriteClient
的insert
方法,关注这一行HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).insert(context, writeHandle, instantTime, records);
。前面说过,table有两种类型HoodieFlinkMergeOnReadTable
和HoodieFlinkCopyOnWriteTable
。我们从较简单的HoodieFlinkCopyOnWriteTable
展开分析。
public HoodieWriteMetadata<List<WriteStatus>> insert(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> records) {
return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
}
这一步创建出数据插入提交执行器。在它的execute
方法中通过FlinkWriteHelper
执行数据写入操作。
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
}
继续跟踪FlinkWriteHelper
的write
方法。它调用的是BaseFlinkCommitActionExecutor
的execute
方法。
@Override
public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, List<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean shouldCombine, int shuffleParallelism,
BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor, boolean performTagging) {
try {
Instant lookupBegin = Instant.now();
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
// 调用executor执行数据写入操作
HoodieWriteMetadata<List<WriteStatus>> result = executor.execute(inputRecords);
// 设置执行耗时
result.setIndexLookupDuration(indexLookupDuration);
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
}
}
BaseFlinkCommitActionExecutor
继续分析execute
方法。代码如下:
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
List<WriteStatus> writeStatuses = new LinkedList<>();
// 同一个bucket内数据的partition path和fileId是相同的,这里获取第一条record的信息就可以
final HoodieRecord<?> record = inputRecords.get(0);
final String partitionPath = record.getPartitionPath();
final String fileId = record.getCurrentLocation().getFileId();
// 指定bucket类型
final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
? BucketType.INSERT
: BucketType.UPDATE;
// 处理upsert分区逻辑
handleUpsertPartition(
instantTime,
partitionPath,
fileId,
bucketType,
inputRecords.iterator())
.forEachRemaining(writeStatuses::addAll);
// 设置写入状态元数据
setUpWriteMetadata(writeStatuses, result);
return result;
}
我们查看分区数据upsert操作逻辑,位于handleUpsertPartition
方法:
protected Iterator<List<WriteStatus>> handleUpsertPartition(
String instantTime,
String partitionPath,
String fileIdHint,
BucketType bucketType,
Iterator recordItr) {
try {
// 根据handle种类的不同,执行不同的处理操作
if (this.writeHandle instanceof HoodieCreateHandle) {
// During one checkpoint interval, an insert record could also be updated,
// for example, for an operation sequence of a record:
// I, U, | U, U
// - batch1 - | - batch2 -
// the first batch(batch1) operation triggers an INSERT bucket,
// the second batch batch2 tries to reuse the same bucket
// and append instead of UPDATE.
return handleInsert(fileIdHint, recordItr);
} else if (this.writeHandle instanceof HoodieMergeHandle) {
return handleUpdate(partitionPath, fileIdHint, recordItr);
} else {
switch (bucketType) {
case INSERT:
return handleInsert(fileIdHint, recordItr);
case UPDATE:
return handleUpdate(partitionPath, fileIdHint, recordItr);
default:
throw new AssertionError();
}
}
} catch (Throwable t) {
String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
这里我们还以insert为主,分析数据插入操作。handleInsert
操作代码如下:
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
// 如果record无数据,返回WriteStatus空集合对应的迭代器
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
// 否则返回一个FlinkLazyInsertIterable
return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
taskContextSupplier, new ExplicitWriteHandleFactory<>(writeHandle));
}
FlinkLazyInsertIterable(一)
FlinkLazyInsertIterable
是一个延时的迭代器,也就是说,只有在遍历writeStatus
的时候,才会执行数据的插入操作。我们看下它是怎么实现的。它的next
方法位于父类LazyIterableIterator
中:
@Override
public O next() {
try {
return computeNext();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
遍历数据的时候调用了computeNext
方法,它位于子类FlinkLazyInsertIterable
。我们继续追踪。
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
try {
// 获取数据的schema
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
// 创建BoundedInMemoryExecutor,是核心内容,后面分析
bufferedIteratorExecutor =
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
// 执行executor。executor的生产者和消费者并发执行,execute方法会等待执行结束后返回
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
// 检查结果不为空,并且executor中积压的数据全部处理完毕
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
} catch (Exception e) {
throw new HoodieException(e);
} finally {
// 确保executor使用后关闭
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
}
}
}
在继续跟进写入逻辑之前,我们必须先搞清楚BoundedInMemoryExecutor
的用途。
BoundedInMemoryExecutor
BoundedInMemoryExecutor
是一个生产者消费者模型作业的执行器,生产者结果的缓存使用有界内存队列BoundedInMemoryQueue
。我们从它的execute
方法开始分析。
public E execute() {
try {
ExecutorCompletionService<Boolean> producerService = startProducers();
Future<E> future = startConsumer();
// Wait for consumer to be done
return future.get();
} catch (InterruptedException ie) {
shutdownNow();
Thread.currentThread().interrupt();
throw new HoodieException(ie);
} catch (Exception e) {
throw new HoodieException(e);
}
}
execute
使用线程池,分别启动生产者和消费者作业。然后等待消费者运行完毕,返回结果。
我们继续分析startProducers
方法:
public ExecutorCompletionService<Boolean> startProducers() {
// Latch to control when and which producer thread will close the queue
// 创建CountDownLatch,传入参数为producer个数
// 每个producer运行结束都会countDown
// 当count数为0的时候表示所有的生产者运行结束,需要关闭内存队列(标记写入结束)
final CountDownLatch latch = new CountDownLatch(producers.size());
// 线程池共producers.size() + 1个线程,满足每个producer和唯一的consumer各一个线程
final ExecutorCompletionService<Boolean> completionService =
new ExecutorCompletionService<Boolean>(executorService);
producers.stream().map(producer -> {
// 在新线程中执行生产者逻辑
return completionService.submit(() -> {
try {
// 此方法为空实现
preExecute();
// 生产者开始向队列生产数据
producer.produce(queue);
} catch (Exception e) {
LOG.error("error producing records", e);
// 出现异常,在队列中标记失败
queue.markAsFailed(e);
throw e;
} finally {
synchronized (latch) {
// 生产者完成任务或者出现异常的时候,countDown
latch.countDown();
// 如果count为0,说明所有生产者任务完成,关闭队列
if (latch.getCount() == 0) {
// Mark production as done so that consumer will be able to exit
queue.close();
}
}
}
return true;
});
}).collect(Collectors.toList());
return completionService;
}
startConsumer
方法和上面的非常类似,在新线程调用consume
执行消费者逻辑。
private Future<E> startConsumer() {
return consumer.map(consumer -> {
return executorService.submit(() -> {
LOG.info("starting consumer thread");
preExecute();
try {
E result = consumer.consume(queue);
LOG.info("Queue Consumption is done; notifying producer threads");
return result;
} catch (Exception e) {
LOG.error("error consuming records", e);
queue.markAsFailed(e);
throw e;
}
});
}).orElse(CompletableFuture.completedFuture(null));
}
到这里为止BoundedInMemoryExecutor
主要逻辑已分析完毕。
FlinkLazyInsertIterable(二)
接着上面写入逻辑的分析,FlinkLazyInsertIterable
通过BoundedInMemoryExecutor
异步执行写入。基于上面executor分析结论,我们接下来需要弄清楚他的producer和consumer任务逻辑。
根据代码,生产者为new IteratorBasedQueueProducer<>(inputItr)
,参数inputItr
为HoodieRecord
集合的iterator,我们查看下它的produce
方法:
@Override
public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
LOG.info("starting to buffer records");
while (inputIterator.hasNext()) {
queue.insertRecord(inputIterator.next());
}
LOG.info("finished buffering records");
}
逻辑并不复杂,将HoodieRecord
集合的元素逐个插入到队列中。
然后,我们分析消费者,参数中传入的消费者为Option.of(getInsertHandler())
。我们查看getInsertHandler
方法,它创建并返回了一个CopyOnWriteInsertHandler
对象,专用于处理copy on write类型表的Insert操作。
protected CopyOnWriteInsertHandler getInsertHandler() {
return new CopyOnWriteInsertHandler(hoodieConfig, instantTime, areRecordsSorted, hoodieTable, idPrefix,
taskContextSupplier, writeHandleFactory);
}
CopyOnWriteInsertHandler
实现了BoundedInMemoryQueueConsumer
消费者接口,consume
方法将队列中的元素逐个取出,传递给子类的consumeOneRecord
方法。内容如下:
@Override
public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
// 插入队列保存的数据会被封装为HoodieInsertValueGenResult
// 从封装类型取出HoodieRecord原始数据
final HoodieRecord insertPayload = payload.record;
String partitionPath = insertPayload.getPartitionPath();
// 获取缓存的writeHandle
HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
if (handle == null) {
// If the records are sorted, this means that we encounter a new partition path
// and the records for the previous partition path are all written,
// so we can safely closely existing open handle to reduce memory footprint.
// 前面handleInsert创建FlinkLazyInsertIterable传入的areRecordsSorted参数为true
// 会被视为之前的partition都写入完毕,关闭缓存的handle
if (areRecordsSorted) {
closeOpenHandles();
}
// Lazily initialize the handle, for the first time
// 创建出新的writeHandle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
// 放入handle缓存中
handles.put(partitionPath, handle);
}
// 如果handle已经写满了
if (!handle.canWrite(payload.record)) {
// Handle is full. Close the handle and add the WriteStatus
// 写入handle已关闭状态
statuses.addAll(handle.close());
// Open new handle
// 再创建一个新的handle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
// 放入handle缓存中
handles.put(partitionPath, handle);
}
// 通过handle写入数据
// payload.insertValue为avro格式的数据
handle.write(insertPayload, payload.insertValue, payload.exception);
}
逻辑分析到了writeHandle
的write
方法。下一节我们以负责插入数据的FlinkCreateHandle
为例,分析数据写入过程。
FlinkCreateHandle写入数据
FlinkCreateHandle
的write
方法位于父类HoodieWriteHandle
中。
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<Exception> exception) {
Option recordMetadata = record.getData().getMetadata();
if (exception.isPresent() && exception.get() instanceof Throwable) {
// Not throwing exception from here, since we don't want to fail the entire job for a single record
writeStatus.markFailure(record, exception.get(), recordMetadata);
LOG.error("Error writing record " + record, exception.get());
} else {
write(record, avroRecord);
}
}
这个write
方法如果需要数据包含错误信息的,会将错误信息写入writeStatus
中。如果没有错误,调用write(HoodieRecord record, Option<IndexedRecord> insertValue)
方法。此方法位于HoodieCreateHandle
中。
@Override
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
Option recordMetadata = record.getData().getMetadata();
// 获取数据操作类型,如果是删除类型,avroRecord为空
if (HoodieOperation.isDelete(record.getOperation())) {
avroRecord = Option.empty();
}
try {
if (avroRecord.isPresent()) {
// 如果是IgnoreRecord类型,不处理
if (avroRecord.get().equals(IGNORE_RECORD)) {
return;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
// 为record加入hoodie的meta字段,方便存储元数据信息
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
// 这个地方没看明白明显区别
// 字面意思是是否随数据保存Hoodie的元数据信息
if (preserveHoodieMetadata) {
// fileWriter根据底层存储类型不同有如下类型:
// HoodieParquetWriter
// HoodieOrcWriter
// HoodieHFileWriter
// 将数据写入底层存储中
fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
}
// update the new location of record, so we know where to find it next
// 解除密封状态,record可以被修改
record.unseal();
// 设置record真实写入location
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
// 密封record,不可再修改
record.seal();
// 数据已写入计数器加1
recordsWritten++;
// 插入数据数量加1
insertRecordsWritten++;
} else {
// 如果avroRecord为空,代表有数据需要删除,删除数据计数器加1
recordsDeleted++;
}
// 标记写入成功
writeStatus.markSuccess(record, recordMetadata);
// deflate record payload after recording success. This will help users access payload as a
// part of marking
// record successful.
// 清除record对象携带的数据,视为数据已插入成功
record.deflate();
} catch (Throwable t) {
// Not throwing exception from here, since we don't want to fail the entire job
// for a single record
writeStatus.markFailure(record, t, recordMetadata);
LOG.error("Error writing record " + record, t);
}
}
到此为止Flink Hudi Copy on Write表的数据insert流程已分析完毕。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。
网友评论