美文网首页Flink大数据Flink学习指南
Flink Hudi 源码之COW表数据写入流程

Flink Hudi 源码之COW表数据写入流程

作者: AlienPaul | 来源:发表于2021-10-27 17:02 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    数据写入流程

    接上一篇Flink Hudi 源码之HoodieTableSink

    我们从StreamWriteFunction数据流写入逻辑的flushBucket方法开始分析。flushBucket将bucket中所有数据写入底层存储。

    SreamWriteFunction

    本篇的分析从flushBucket方法开始。

    private boolean flushBucket(DataBucket bucket) {
        // 获取最近一次未提交的instant time
        String instant = instantToWrite(true);
    
        // 如果获取不到instant,说明没有输入数据,方法返回
        if (instant == null) {
            // in case there are empty checkpoints that has no input data
            LOG.info("No inflight instant when flushing data, skip.");
            return false;
        }
    
        // 获取bucket缓存中的HoodieRecord
        List<HoodieRecord> records = bucket.writeBuffer();
        // 检查buffer中必须要有数据
        ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
        // 根据write.insert.drop.duplicates配置项,决定insert是否去重
        if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
            records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
        }
        // 修改buffer中第一条数据的location信息(instant time和fileID)
        bucket.preWrite(records);
        // 执行writeFunction,写入数据
        // writeFunction后面分析
        final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
        // 清空records集合
        records.clear();
        // 构建消息,表示数据已写入
        final WriteMetadataEvent event = WriteMetadataEvent.builder()
            .taskID(taskID)
            .instantTime(instant) // the write instant may shift but the event still use the currentInstant.
            .writeStatus(writeStatus)
            .lastBatch(false)
            .endInput(false)
            .build();
    
        // 发送消息给coordinator
        this.eventGateway.sendEventToCoordinator(event);
        writeStatuses.addAll(writeStatus);
        return true;
    }
    

    写入逻辑位于writeFunction中,但是他们的逻辑看不到。我们查看writeFunction的创建过程,在initWriteFunction方法中:

    private void initWriteFunction() {
        final String writeOperation = this.config.get(FlinkOptions.OPERATION);
        switch (WriteOperationType.fromValue(writeOperation)) {
            case INSERT:
                this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
                break;
            case UPSERT:
                this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
                break;
            case INSERT_OVERWRITE:
                this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime);
                break;
            case INSERT_OVERWRITE_TABLE:
                this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime);
                break;
            default:
                throw new RuntimeException("Unsupported write operation : " + writeOperation);
        }
    }
    

    该方法中的writeClientHoodieFlinkWriteClient。判断writeOperation的值,调用writeClient对应的处理方法。

    HoodieFlinkWriteClient

    由于涉及的操作种类比较多,本篇我们从writeClientinsert方法入手分析。

    @Override
    public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
        // 创建HoodieTable
        // 根据table类型(MOR或COW),创建HoodieFlinkMergeOnReadTable或HoodieFlinkCopyOnWriteTable
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
            getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
        // 检查要写入数据的schema和table的schema是否匹配
        table.validateUpsertSchema();
        // 执行预写入操作,给writeClient设置operationType
        preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
        // create the write handle if not exists
        // 如果HoodieWriteHandle不存在,创建一个
        final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), instantTime, table, records.listIterator());
        // 调用table的insert方法,将数据插入
        HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).insert(context, writeHandle, instantTime, records);
        // 如果记录了索引查找耗时,更新监控仪表盘
        if (result.getIndexLookupDuration().isPresent()) {
            metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
        }
        // 执行写入后操作,更新监控仪表数据并返回写入状态
        return postWrite(result, instantTime, table);
    }
    

    table的insert操作逻辑和writeHandle的类型有关,所以我们先分析getOrCreateWriteHandle方法,了解下Hudi会根据表类型和操作类型创建出什么种类的writeHandle

    private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
        HoodieRecord<T> record,
        HoodieWriteConfig config,
        String instantTime,
        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
        Iterator<HoodieRecord<T>> recordItr) {
        final HoodieRecordLocation loc = record.getCurrentLocation();
        final String fileID = loc.getFileId();
        final String partitionPath = record.getPartitionPath();
        // Always use FlinkCreateHandle when insert duplication turns on
        // 如果允许插入重复数据,返回FlinkCreateHandle
        if (config.allowDuplicateInserts()) {
            return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
                                           fileID, table.getTaskContextSupplier());
        }
    
        // bucketToHandles保存了file和handle的对应关系
        if (bucketToHandles.containsKey(fileID)) {
            // 如果找到file对应的handle,获取它
            MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
            // 是否需要合并log到file中,默认为true
            // 但对于FlinkAppendHandle只需要追加log,所以它重写了该方法,返回false
            if (lastHandle.shouldReplace()) {
                // 如果需要合并,创建一个FlinkMergeAndReplaceHandle
                HoodieWriteHandle<?, ?, ?, ?> writeHandle = new FlinkMergeAndReplaceHandle<>(
                    config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(),
                    lastHandle.getWritePath());
                // 将这个handle和fileID对应起来
                this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
                return writeHandle;
            }
        }
        
        final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
        final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
        if (isDelta) {
            // 如果表为MERGE_ON_READ类型
            // 使用FlinkAppendHandle
            writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
                                                  table.getTaskContextSupplier());
        } else if (loc.getInstantTime().equals("I")) {
            // 如果是新插入的数据,使用FlinkCreateHandle
            writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
                                                  fileID, table.getTaskContextSupplier());
        } else {
            // 否则使用FlinkMergeHandle
            writeHandle = new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath,
                                                 fileID, table.getTaskContextSupplier());
        }
        // 加入对应关系中
        this.bucketToHandles.put(fileID, writeHandle);
        return writeHandle;
    }
    

    这里Hudi可能创建出的writeHandle有如下四种:

    • FlinkCreateHandle:增量写入
    • FlinkMergeAndReplaceHandle:增量合并写入(复写旧文件)
    • FlinkAppendHandle:追加写入(MOR表)
    • FlinkMergeHandle:增量合并写入(滚动写入新文件)

    我们回到HoodieFlinkWriteClientinsert方法,关注这一行HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).insert(context, writeHandle, instantTime, records);。前面说过,table有两种类型HoodieFlinkMergeOnReadTableHoodieFlinkCopyOnWriteTable。我们从较简单的HoodieFlinkCopyOnWriteTable展开分析。

    public HoodieWriteMetadata<List<WriteStatus>> insert(
        HoodieEngineContext context,
        HoodieWriteHandle<?, ?, ?, ?> writeHandle,
        String instantTime,
        List<HoodieRecord<T>> records) {
        return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
    }
    

    这一步创建出数据插入提交执行器。在它的execute方法中通过FlinkWriteHelper执行数据写入操作。

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> execute() {
        return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
                                                    config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
    }
    

    继续跟踪FlinkWriteHelperwrite方法。它调用的是BaseFlinkCommitActionExecutorexecute方法。

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, List<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
                                                        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean shouldCombine, int shuffleParallelism,
                                                        BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor, boolean performTagging) {
        try {
            Instant lookupBegin = Instant.now();
            Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
    
            // 调用executor执行数据写入操作
            HoodieWriteMetadata<List<WriteStatus>> result = executor.execute(inputRecords);
            // 设置执行耗时
            result.setIndexLookupDuration(indexLookupDuration);
            return result;
        } catch (Throwable e) {
            if (e instanceof HoodieUpsertException) {
                throw (HoodieUpsertException) e;
            }
            throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
        }
    }
    

    BaseFlinkCommitActionExecutor

    继续分析execute方法。代码如下:

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
        HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
    
        List<WriteStatus> writeStatuses = new LinkedList<>();
        // 同一个bucket内数据的partition path和fileId是相同的,这里获取第一条record的信息就可以
        final HoodieRecord<?> record = inputRecords.get(0);
        final String partitionPath = record.getPartitionPath();
        final String fileId = record.getCurrentLocation().getFileId();
        // 指定bucket类型
        final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
            ? BucketType.INSERT
            : BucketType.UPDATE;
        // 处理upsert分区逻辑
        handleUpsertPartition(
            instantTime,
            partitionPath,
            fileId,
            bucketType,
            inputRecords.iterator())
            .forEachRemaining(writeStatuses::addAll);
        // 设置写入状态元数据
        setUpWriteMetadata(writeStatuses, result);
        return result;
    }
    

    我们查看分区数据upsert操作逻辑,位于handleUpsertPartition方法:

    protected Iterator<List<WriteStatus>> handleUpsertPartition(
        String instantTime,
        String partitionPath,
        String fileIdHint,
        BucketType bucketType,
        Iterator recordItr) {
        try {
            // 根据handle种类的不同,执行不同的处理操作
            if (this.writeHandle instanceof HoodieCreateHandle) {
                // During one checkpoint interval, an insert record could also be updated,
                // for example, for an operation sequence of a record:
                //    I, U,   | U, U
                // - batch1 - | - batch2 -
                // the first batch(batch1) operation triggers an INSERT bucket,
                // the second batch batch2 tries to reuse the same bucket
                // and append instead of UPDATE.
                return handleInsert(fileIdHint, recordItr);
            } else if (this.writeHandle instanceof HoodieMergeHandle) {
                return handleUpdate(partitionPath, fileIdHint, recordItr);
            } else {
                switch (bucketType) {
                    case INSERT:
                        return handleInsert(fileIdHint, recordItr);
                    case UPDATE:
                        return handleUpdate(partitionPath, fileIdHint, recordItr);
                    default:
                        throw new AssertionError();
                }
            }
        } catch (Throwable t) {
            String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
            LOG.error(msg, t);
            throw new HoodieUpsertException(msg, t);
        }
    }
    

    这里我们还以insert为主,分析数据插入操作。handleInsert操作代码如下:

    @Override
    public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
        throws Exception {
        // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
        // 如果record无数据,返回WriteStatus空集合对应的迭代器
        if (!recordItr.hasNext()) {
            LOG.info("Empty partition");
            return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
        }
        // 否则返回一个FlinkLazyInsertIterable
        return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
                                             taskContextSupplier, new ExplicitWriteHandleFactory<>(writeHandle));
    }
    

    FlinkLazyInsertIterable(一)

    FlinkLazyInsertIterable是一个延时的迭代器,也就是说,只有在遍历writeStatus的时候,才会执行数据的插入操作。我们看下它是怎么实现的。它的next方法位于父类LazyIterableIterator中:

    @Override
    public O next() {
        try {
            return computeNext();
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }
    

    遍历数据的时候调用了computeNext方法,它位于子类FlinkLazyInsertIterable。我们继续追踪。

    @Override
    protected List<WriteStatus> computeNext() {
        // Executor service used for launching writer thread.
        BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
            null;
        try {
            // 获取数据的schema
            final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
            // 创建BoundedInMemoryExecutor,是核心内容,后面分析
            bufferedIteratorExecutor =
                new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
            // 执行executor。executor的生产者和消费者并发执行,execute方法会等待执行结束后返回
            final List<WriteStatus> result = bufferedIteratorExecutor.execute();
            // 检查结果不为空,并且executor中积压的数据全部处理完毕
            assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
            return result;
        } catch (Exception e) {
            throw new HoodieException(e);
        } finally {
            // 确保executor使用后关闭
            if (null != bufferedIteratorExecutor) {
                bufferedIteratorExecutor.shutdownNow();
            }
        }
    }
    

    在继续跟进写入逻辑之前,我们必须先搞清楚BoundedInMemoryExecutor的用途。

    BoundedInMemoryExecutor

    BoundedInMemoryExecutor是一个生产者消费者模型作业的执行器,生产者结果的缓存使用有界内存队列BoundedInMemoryQueue。我们从它的execute方法开始分析。

    public E execute() {
        try {
            ExecutorCompletionService<Boolean> producerService = startProducers();
            Future<E> future = startConsumer();
            // Wait for consumer to be done
            return future.get();
        } catch (InterruptedException ie) {
            shutdownNow();
            Thread.currentThread().interrupt();
            throw new HoodieException(ie);
        } catch (Exception e) {
            throw new HoodieException(e);
        }
    }
    

    execute使用线程池,分别启动生产者和消费者作业。然后等待消费者运行完毕,返回结果。

    我们继续分析startProducers方法:

    public ExecutorCompletionService<Boolean> startProducers() {
        // Latch to control when and which producer thread will close the queue
        // 创建CountDownLatch,传入参数为producer个数
        // 每个producer运行结束都会countDown
        // 当count数为0的时候表示所有的生产者运行结束,需要关闭内存队列(标记写入结束)
        final CountDownLatch latch = new CountDownLatch(producers.size());
        // 线程池共producers.size() + 1个线程,满足每个producer和唯一的consumer各一个线程
        final ExecutorCompletionService<Boolean> completionService =
            new ExecutorCompletionService<Boolean>(executorService);
        producers.stream().map(producer -> {
            // 在新线程中执行生产者逻辑
            return completionService.submit(() -> {
                try {
                    // 此方法为空实现
                    preExecute();
                    // 生产者开始向队列生产数据
                    producer.produce(queue);
                } catch (Exception e) {
                    LOG.error("error producing records", e);
                    // 出现异常,在队列中标记失败
                    queue.markAsFailed(e);
                    throw e;
                } finally {
                    synchronized (latch) {
                        // 生产者完成任务或者出现异常的时候,countDown
                        latch.countDown();
                        // 如果count为0,说明所有生产者任务完成,关闭队列
                        if (latch.getCount() == 0) {
                            // Mark production as done so that consumer will be able to exit
                            queue.close();
                        }
                    }
                }
                return true;
            });
        }).collect(Collectors.toList());
        return completionService;
    }
    

    startConsumer方法和上面的非常类似,在新线程调用consume执行消费者逻辑。

    private Future<E> startConsumer() {
        return consumer.map(consumer -> {
            return executorService.submit(() -> {
                LOG.info("starting consumer thread");
                preExecute();
                try {
                    E result = consumer.consume(queue);
                    LOG.info("Queue Consumption is done; notifying producer threads");
                    return result;
                } catch (Exception e) {
                    LOG.error("error consuming records", e);
                    queue.markAsFailed(e);
                    throw e;
                }
            });
        }).orElse(CompletableFuture.completedFuture(null));
    }
    

    到这里为止BoundedInMemoryExecutor主要逻辑已分析完毕。

    FlinkLazyInsertIterable(二)

    接着上面写入逻辑的分析,FlinkLazyInsertIterable通过BoundedInMemoryExecutor异步执行写入。基于上面executor分析结论,我们接下来需要弄清楚他的producer和consumer任务逻辑。

    根据代码,生产者为new IteratorBasedQueueProducer<>(inputItr),参数inputItrHoodieRecord集合的iterator,我们查看下它的produce方法:

    @Override
    public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
        LOG.info("starting to buffer records");
        while (inputIterator.hasNext()) {
            queue.insertRecord(inputIterator.next());
        }
        LOG.info("finished buffering records");
    }
    

    逻辑并不复杂,将HoodieRecord集合的元素逐个插入到队列中。

    然后,我们分析消费者,参数中传入的消费者为Option.of(getInsertHandler())。我们查看getInsertHandler方法,它创建并返回了一个CopyOnWriteInsertHandler对象,专用于处理copy on write类型表的Insert操作。

    protected CopyOnWriteInsertHandler getInsertHandler() {
        return new CopyOnWriteInsertHandler(hoodieConfig, instantTime, areRecordsSorted, hoodieTable, idPrefix,
                                            taskContextSupplier, writeHandleFactory);
    }
    

    CopyOnWriteInsertHandler实现了BoundedInMemoryQueueConsumer消费者接口,consume方法将队列中的元素逐个取出,传递给子类的consumeOneRecord方法。内容如下:

    @Override
    public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
        // 插入队列保存的数据会被封装为HoodieInsertValueGenResult
        // 从封装类型取出HoodieRecord原始数据
        final HoodieRecord insertPayload = payload.record;
        String partitionPath = insertPayload.getPartitionPath();
        // 获取缓存的writeHandle
        HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
        if (handle == null) {
            // If the records are sorted, this means that we encounter a new partition path
            // and the records for the previous partition path are all written,
            // so we can safely closely existing open handle to reduce memory footprint.
            // 前面handleInsert创建FlinkLazyInsertIterable传入的areRecordsSorted参数为true
            // 会被视为之前的partition都写入完毕,关闭缓存的handle
            if (areRecordsSorted) {
                closeOpenHandles();
            }
            // Lazily initialize the handle, for the first time
            // 创建出新的writeHandle
            handle = writeHandleFactory.create(config, instantTime, hoodieTable,
                                               insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
            // 放入handle缓存中
            handles.put(partitionPath, handle);
        }
    
        // 如果handle已经写满了
        if (!handle.canWrite(payload.record)) {
            // Handle is full. Close the handle and add the WriteStatus
            // 写入handle已关闭状态
            statuses.addAll(handle.close());
            // Open new handle
            // 再创建一个新的handle
            handle = writeHandleFactory.create(config, instantTime, hoodieTable,
                                               insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
            // 放入handle缓存中
            handles.put(partitionPath, handle);
        }
        // 通过handle写入数据
        // payload.insertValue为avro格式的数据
        handle.write(insertPayload, payload.insertValue, payload.exception);
    }
    

    逻辑分析到了writeHandlewrite方法。下一节我们以负责插入数据的FlinkCreateHandle为例,分析数据写入过程。

    FlinkCreateHandle写入数据

    FlinkCreateHandlewrite方法位于父类HoodieWriteHandle中。

    public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<Exception> exception) {
        Option recordMetadata = record.getData().getMetadata();
        if (exception.isPresent() && exception.get() instanceof Throwable) {
            // Not throwing exception from here, since we don't want to fail the entire job for a single record
            writeStatus.markFailure(record, exception.get(), recordMetadata);
            LOG.error("Error writing record " + record, exception.get());
        } else {
            write(record, avroRecord);
        }
    }
    

    这个write方法如果需要数据包含错误信息的,会将错误信息写入writeStatus中。如果没有错误,调用write(HoodieRecord record, Option<IndexedRecord> insertValue)方法。此方法位于HoodieCreateHandle中。

    @Override
    public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
        Option recordMetadata = record.getData().getMetadata();
        // 获取数据操作类型,如果是删除类型,avroRecord为空
        if (HoodieOperation.isDelete(record.getOperation())) {
            avroRecord = Option.empty();
        }
        try {
            if (avroRecord.isPresent()) {
                // 如果是IgnoreRecord类型,不处理
                if (avroRecord.get().equals(IGNORE_RECORD)) {
                    return;
                }
                // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
                // 为record加入hoodie的meta字段,方便存储元数据信息
                IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
                // 这个地方没看明白明显区别
                // 字面意思是是否随数据保存Hoodie的元数据信息
                if (preserveHoodieMetadata) {
                    // fileWriter根据底层存储类型不同有如下类型:
                    // HoodieParquetWriter
                    // HoodieOrcWriter
                    // HoodieHFileWriter
                    // 将数据写入底层存储中
                    fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
                } else {
                    fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
                }
                // update the new location of record, so we know where to find it next
                // 解除密封状态,record可以被修改
                record.unseal();
                // 设置record真实写入location
                record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
                // 密封record,不可再修改
                record.seal();
                // 数据已写入计数器加1
                recordsWritten++;
                // 插入数据数量加1
                insertRecordsWritten++;
            } else {
                // 如果avroRecord为空,代表有数据需要删除,删除数据计数器加1
                recordsDeleted++;
            }
            // 标记写入成功
            writeStatus.markSuccess(record, recordMetadata);
            // deflate record payload after recording success. This will help users access payload as a
            // part of marking
            // record successful.
            // 清除record对象携带的数据,视为数据已插入成功
            record.deflate();
        } catch (Throwable t) {
            // Not throwing exception from here, since we don't want to fail the entire job
            // for a single record
            writeStatus.markFailure(record, t, recordMetadata);
            LOG.error("Error writing record " + record, t);
        }
    }
    

    到此为止Flink Hudi Copy on Write表的数据insert流程已分析完毕。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink Hudi 源码之COW表数据写入流程

        本文链接:https://www.haomeiwen.com/subject/ezspaltx.html