简介
Compaction适用于MOR表。将file slice中记录增量数据变更的log文件中的record和base file(parquet文件)合并,最后生成新的base file。
Compaction目的是减少文件数目,清理无用的历史数据,提高查询性能。
Compaction分为两步:schedule和执行。其中schedule步骤判断是否需要执行compaction,并生成compaction计划,在timeline中写入一个requested状态的compaction instant。执行步骤读取之前生成的compaction计划并执行。执行完毕后compaction instant状态转换为completed。
Compaction的触发策略
触发策略对应配置项hoodie.compact.inline.trigger.strategy
,他有如下触发策略:
-
NUM_COMMITS
: trigger compaction when reach N delta commits。从上次compaction执行完之后,达到N个delta commit触发compaction。 -
NUM_COMMITS_AFTER_LAST_REQUEST
: trigger compaction when reach N delta commits since last compaction request。和前面不同的是从上次schedule的compaction开始算N个,如果没有scheduled的compaction,从上次执行完的compaction开始算。 -
TIME_ELAPSED
: trigger compaction when time elapsed > N seconds since last compaction。上次compaction执行完N秒后触发compaction。 -
NUM_AND_TIME
: trigger compaction when bothNUM_COMMITS
andTIME_ELAPSED
are satisfied。delta commit个数和间隔时间两个条件都要满足。 -
NUM_OR_TIME
: trigger compaction whenNUM_COMMITS
orTIME_ELAPSED
is satisfied。delta commit个数和间隔时间两个条件有一个满足。
Delta commit个数和间隔时间的配置项为:
- hoodie.compact.inline.max.delta.commits:在上次compaction发生后生成N个delta commit之后,可尝试schedule下一次compaction。默认值为5。
- hoodie.compact.inline.max.delta.seconds:距离上次compaction N秒钟之后可尝试schedule下一次compaction。默认值为3600秒。
同步和异步
Hudi可以在每次写入完毕的时候schedule或者执行compaction,这称为同步执行。也可以使用外部服务来schedule或者执行compaction,称之为异步执行。
同步schedule同步执行
配置项 | 设定值 |
---|---|
hoodie.compact.inline | true |
hoodie.compact.schedule.inline | false |
注意:这里不用开启hoodie.compact.schedule.inline。hoodie.compact.inline包含了schedule过程。schedule和执行compaction是连续运行的。
异步schedule异步执行
配置项 | 设定值 |
---|---|
hoodie.compact.inline | false |
hoodie.compact.schedule.inline | false |
通过Spark发起排期(schedule)作业:
sudo -u hadoop spark-submit \
--jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
--class 'org.apache.hudi.utilities.HoodieCompactor' \
/usr/lib/hudi/hudi-utilities-bundle.jar \
--spark-memory '4g' \
--mode 'schedule' \
--base-path "hdfs://..." \
--table-name "$TABLE_NAME" \
--hoodie-conf "hoodie.compact.inline.max.delta.commits=3"
通过Spark发起执行(execute)作业:
sudo -u hadoop spark-submit \
--jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
--class "org.apache.hudi.utilities.HoodieCompactor" \
/usr/lib/hudi/hudi-utilities-bundle.jar \
--spark-memory '4g' \
--mode 'execute' \
--base-path "hdfs://..." \
--table-name "$TABLE_NAME"
同步schedule异步执行
配置项 | 设定值 |
---|---|
hoodie.compact.inline | false |
hoodie.compact.schedule.inline | true |
异步执行的方式和上面相同。
使用Flink异步schedule或者执行compaction。
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table
参数和使用方式参考官网:Compaction | Apache Hudi
配置项
inline compaction配置:
- hoodie.compact.inline:每次写入之后是否触发schedule和执行compaction。默认为false,启用会增大写入延迟。
- hoodie.compact.schedule.inline:每次写入之后是否会schedule compaction。只是尝试schedule。compaction触发条件仍然是有效的。默认为false。
- hoodie.log.compaction.inline:每次写入之后是否会触发log compaction。默认为false,启用会增大写入延迟。Flink目前无法使用。
触发策略配置:
- hoodie.compact.inline.trigger.strategy:compaction触发策略
- hoodie.compact.inline.max.delta.commits:在上次compaction发生后生成N个delta commit之后,可尝试schedule下一次compaction。默认值为5。
- hoodie.compact.inline.max.delta.seconds:距离上次compaction N秒钟之后可尝试schedule下一次compaction。默认值为3600秒。
其他配置:
- hoodie.memory.compaction.max.size: compaction操作可使用的最大内存数
- hoodie.compaction.lazy.block.read:使用启用log block lazy read,默认启用,会减少内存消耗
- hoodie.compaction.reverse.log.read:正向读取log file还是反向,默认为正向
- hoodie.memory.dfs.buffer.max.size:读取文件系统的buffer size,默认值为16 * 1024 * 1024
- hoodie.memory.spillable.map.path:compaction缓存log内数据内存不足时溢写磁盘路径。默认为/tmp
- hoodie.common.spillable.diskmap.type:溢写磁盘的数据组织形式。默认为BITCASK
- hoodie.optimized.log.blocks.scan.enable:是否启用优化log block扫描。默认为false
Schedule代码分析
这一章我们分析compaction计划的生成步骤。
BaseHoodieWriteClient
我们从BaseHoodieWriteClient::scheduleCompaction
开始分析。调用链涉及到的方法如下:
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
}
public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
}
public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
return tableServiceClient.scheduleTableService(instantTime, extraMetadata, tableServiceType);
}
这个调用链并不复杂,对于Flink而言我们一路跟踪调用终到达HoodieFlinkMergeOnReadTable
的scheduleCompaction
方法。代码如下:
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(
HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata) {
ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
context, config, this, instantTime, extraMetadata, WriteOperationType.COMPACT);
return scheduleCompactionExecutor.execute();
}
这里使用了ScheduleCompactionActionExecutor
。这个执行器专用于生成compaction计划。接下来我们展开分析。
ScheduleCompactionActionExecutor
该执行器负责生成compaction plan。在生成compaction plan之前,检查该时间点之前是否存在未完成的写入。是否有pending compaction在当前时间点之后。如果有,compaction会发生冲突。成功生成compaction plan之后,在时间线上创建一个requested状态的类型为compaction的instant。
@Override
public Option<HoodieCompactionPlan> execute() {
// 只有MOR表支持压缩
ValidationUtils.checkArgument(this.table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
"Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not "
+ this.table.getMetaClient().getTableType().name());
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
// 如果没有使用多端写入并发控制,并且没有开启失败写入懒清理策略(仅在clean阶段清理)
// TODO(yihua): this validation is removed for Java client used by kafka-connect. Need to revisit this.
if (config.getEngineType() == EngineType.SPARK) {
// 如果使用的是SPARK引擎
// 检查最早的inflight状态的instant时间,必须在当前schedule的compaction之后
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
}
// Committed and pending compaction instants should have strictly lower timestamps
// 未完成的compaction和commit instant必须在本次schedule时间之前
// 考虑到连续多次schedule compaction但是没有执行压缩的情况,新schedule的compaction时间必须在原来schedule的之后,否则冲突
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstantsAsStream()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
}
// 生成compaction计划,后面分析
HoodieCompactionPlan plan = scheduleCompaction();
Option<HoodieCompactionPlan> option = Option.empty();
if (plan != null && nonEmpty(plan.getOperations())) {
// 如果需要压缩,compaction plan不会为空
// 保存额外的metadata
extraMetadata.ifPresent(plan::setExtraMetadata);
try {
if (operationType.equals(WriteOperationType.COMPACT)) {
// 如果是COMPACT类型,压缩log文件和base file
// 上面调用链中传入的operationType是COMPACT
// 写入一个状态为requested,类型为compaction的instant
HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, instantTime);
table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
} else {
// 如果是LOG_COMPACTION类型(仅压缩log文件)
// 同样写入一个状态为requested,类型为LOG_COMPACTION的instant
HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.LOG_COMPACTION_ACTION, instantTime);
table.getActiveTimeline().saveToLogCompactionRequested(logCompactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
}
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
option = Option.of(plan);
}
return option;
}
接下来重点是scheduleCompaction
方法,用来生成compaction计划。代码如下:
@Nullable
private HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
// judge if we need to compact according to num delta commits and time elapsed
// 检查是否需要压缩,根据配置的检查策略(N个delta commit后压缩,还是间隔多长时间之后压缩。也可以是这两个条件的与或关系)来决定
boolean compactable = needCompact(config.getInlineCompactTriggerStrategy());
if (compactable) {
// 如果可以压缩,生成compaction计划
LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
try {
context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan");
return planGenerator.generateCompactionPlan();
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
}
}
return new HoodieCompactionPlan();
}
在生成compaction plan之前,需要判断是否需要压缩。判断逻辑位于needCompact
方法。Hudi定义了一些需要执行compaction的判断条件:
- 从上次执行compaction(已完成的compaction)间隔n个delta commit之后可以尝试schedule新的compaction
- 从上次request compaction(scheduled的compaction,如果没有scheduled的compaction,和上面的相同) n个delta commit后尝试schedule新的compaction
- 上次执行compaction n秒之后尝试schedule新的compaction
- delta commit个数或间隔时间两个条件
- delta commit个数与间隔时间两个条件
接下来分析代码:
private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
// pair第一个参数是最近一次compaction之后的delta commit个数
// 第二个参数是最近一次compaction的instant time
Option<Pair<Integer, String>> latestDeltaCommitInfoOption = getLatestDeltaCommitInfo();
if (!latestDeltaCommitInfoOption.isPresent()) {
return false;
}
Pair<Integer, String> latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
// 如果仅压缩log,返回true
if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
return true;
}
// 获取从上次compaction间隔n个delta commit之后可以尝试schedule新的compaction
// 对应配置项hoodie.compact.inline.max.delta.commits
int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
// 获取从上次compaction间隔n秒后可以尝试schedule新的compaction
// 对应配置项hoodie.compact.inline.max.delta.seconds
int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax();
// 检查提交策略
switch (compactionTriggerStrategy) {
case NUM_COMMITS:
// 如果使用从上次执行compaction,n个delta commit后尝试schedule新的compaction的策略
// 判断delta commit数量是否大于等于inlineCompactDeltaCommitMax
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
if (compactable) {
LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
}
break;
case NUM_COMMITS_AFTER_LAST_REQUEST:
// 该策略是在从上次request compaction n个delta commit后尝试schedule新的compaction,而不是从上次执行compaction n个delta commit后尝试schedule新的compaction
latestDeltaCommitInfoOption = getLatestDeltaCommitInfoSinceLastCompactionRequest();
if (!latestDeltaCommitInfoOption.isPresent()) {
return false;
}
latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
if (compactable) {
LOG.info(String.format("The delta commits >= %s since the last compaction request, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
}
break;
case TIME_ELAPSED:
// 上次执行compaction n秒之后尝试schedule新的compaction
// 比较时间
compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax));
}
break;
case NUM_OR_TIME:
// 前面的两个条件,或关系
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft()
|| inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
inlineCompactDeltaSecondsMax));
}
break;
case NUM_AND_TIME:
// 前面的两个条件,与关系
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft()
&& inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
inlineCompactDeltaSecondsMax));
}
break;
default:
throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + config.getInlineCompactTriggerStrategy());
}
return compactable;
}
BaseHoodieCompactionPlanGenerator
BaseHoodieCompactionPlanGenerator
负责生成执行计划。generateCompactionPlan
方法遍历所有的partition path中的file slice,剔除掉不符合条件的file,加入到压缩计划。详细分析如下:
@Nullable
public HoodieCompactionPlan generateCompactionPlan() throws IOException {
// Accumulator to keep track of total log files for a table
HoodieAccumulator totalLogFiles = engineContext.newAccumulator();
// Accumulator to keep track of total log file slices for a table
HoodieAccumulator totalFileSlices = engineContext.newAccumulator();
// TODO : check if maxMemory is not greater than JVM or executor memory
// TODO - rollback any compactions in flight
// 获取metaClient
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
// 获取所有的分区
List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, writeConfig.getMetadataConfig(), metaClient.getBasePath());
// filter the partition paths if needed to reduce list status
// 不处理,返回partition paths
partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths);
// 如果没有分区,不用压缩
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no compaction plan
return null;
}
LOG.info("Looking for files to compact in " + partitionPaths + " partitions");
engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + writeConfig.getTableName());
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) this.hoodieTable.getSliceView();
// 获取文件系统中即将压缩的file group,这些文件后面需要排除
Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
.collect(Collectors.toSet());
// Exclude files in pending clustering from compaction.
// 需要排除的还有即将clustering操作涉及的文件
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
// 如果操作类型是log compact
if (filterLogCompactionOperations()) {
// 需要排除pending log compaction涉及到的文件
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
.collect(Collectors.toList()));
}
// 获取最后一个completed commit/rollback的instant time
String lastCompletedInstantTime = hoodieTable.getMetaClient()
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
// 遍历所有的partition path
List<HoodieCompactionOperation> operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
// 获取分区下的fileSlice,以instant time倒序排列
// 去掉replaced掉的file slice (replace commit)
// 过滤掉inflight compaction正在操作的base file,异步compaction可能存在这种情况
.getLatestFileSlices(partitionPath)
// 排除掉fgIdsInPendingCompactionAndClustering包含的文件
.filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering))
.map(s -> {
// 获取并排序log文件
List<HoodieLogFile> logFiles =
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
totalLogFiles.add(logFiles.size());
totalFileSlices.add(1L);
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
// for Map operations and collecting them finally in Avro generated classes for storing
// into meta files.6
// 获取base file
Option<HoodieBaseFile> dataFile = s.getBaseFile();
// 生成CompactionOperation
return new CompactionOperation(dataFile, partitionPath, logFiles,
writeConfig.getCompactionStrategy().captureMetrics(writeConfig, s));
}), partitionPaths.size()).stream()
.map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
LOG.info("Total of " + operations.size() + " compaction operations are retrieved");
LOG.info("Total number of latest files slices " + totalFileSlices.value());
LOG.info("Total number of log files " + totalLogFiles.value());
LOG.info("Total number of file slices " + totalFileSlices.value());
if (operations.isEmpty()) {
LOG.warn("No operations are retrieved for " + metaClient.getBasePath());
return null;
}
// Filter the compactions with the passed in filter. This lets us choose most effective compactions only
// 生成compaction计划
HoodieCompactionPlan compactionPlan = getCompactionPlan(metaClient, operations);
ValidationUtils.checkArgument(
compactionPlan.getOperations().stream().noneMatch(
op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
"Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
+ "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
}
return compactionPlan;
}
以上是生成compaction plan的全过程。下面我们分析compaction的执行过程。
Compact执行过程
入口调用链
入口从BaseHoodieWriteClient::compact
开始。一路涉及到的逻辑不多。这里列出调用链。
public HoodieWriteMetadata<O> compact(String compactionInstantTime) {
if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
throw new UnsupportedOperationException("Compaction should be delegated to table service manager instead of direct run.");
}
return compact(compactionInstantTime, config.shouldAutoCommit());
}
HoodieFlinkMergeOnReadTable::compact
方法:
@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor(),
new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()), WriteOperationType.COMPACT);
return convertMetadata(compactionExecutor.execute());
}
HoodieFlinkWriteClient::compact
方法:
@Override
protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
// only used for metadata table, the compaction happens in single thread
return tableServiceClient.compact(compactionInstantTime, shouldComplete);
}
HoodieFlinkTableServiceClient::compact
方法在执行完压缩之后调用commitCompaction
,将compaction instant从inflight状态转换为completed。
@Override
protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
// only used for metadata table, the compaction happens in single thread
HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
return compactionMetadata;
}
HoodieFlinkMergeOnReadTable::compact
方法:
@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor(),
new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()), WriteOperationType.COMPACT);
return convertMetadata(compactionExecutor.execute());
}
最终到达compaction执行器RunCompactionActionExecutor
,接下来详细展开分析。
RunCompactionActionExecutor
RunCompactionActionExecutor
负责执行compaction计划。
execute
方法内容如下:
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
// 获取pending的compaction timeline
HoodieTimeline pendingMajorOrMinorCompactionTimeline = WriteOperationType.COMPACT.equals(operationType)
? table.getActiveTimeline().filterPendingCompactionTimeline()
: table.getActiveTimeline().filterPendingLogCompactionTimeline();
// 回滚其中inflight状态的compaction
compactor.preCompact(table, pendingMajorOrMinorCompactionTimeline, this.operationType, instantTime);
HoodieWriteMetadata<HoodieData<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<>();
try {
// generate compaction plan
// should support configurable commit metadata
// 获取前面生成的compaction计划
HoodieCompactionPlan compactionPlan = operationType.equals(WriteOperationType.COMPACT)
? CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime)
: CompactionUtils.getLogCompactionPlan(table.getMetaClient(), instantTime);
// try to load internalSchema to support schema Evolution
// 读取schema
HoodieWriteConfig configCopy = config;
Pair<Option<String>, Option<String>> schemaPair = InternalSchemaCache
.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(), instantTime);
if (schemaPair.getLeft().isPresent() && schemaPair.getRight().isPresent()) {
// should not influence the original config, just copy it
configCopy = HoodieWriteConfig.newBuilder().withProperties(config.getProps()).build();
configCopy.setInternalSchemaString(schemaPair.getLeft().get());
configCopy.setSchema(schemaPair.getRight().get());
}
// 使用HoodieCompactor执行压缩过程,后面分析
HoodieData<WriteStatus> statuses = compactor.compact(
context, compactionPlan, table, configCopy, instantTime, compactionHandler);
// 对于Flink,无需持久化写入状态
compactor.maybePersist(statuses, config);
context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata: " + config.getTableName());
List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
if (schemaPair.getLeft().isPresent()) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, schemaPair.getLeft().get());
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaPair.getRight().get());
}
metadata.setOperationType(operationType);
compactionMetadata.setWriteStatuses(statuses);
compactionMetadata.setCommitted(false);
compactionMetadata.setCommitMetadata(Option.of(metadata));
} catch (Exception e) {
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
}
return compactionMetadata;
}
执行压缩的步骤交给HoodieCompactor
完成。
HoodieCompactor
HoodieCompactor
的compact
方法了实现MOR表的压缩逻辑。该方法将compaction instant从requested转换为inlflight状态,然后执行compaction计划。
public HoodieData<WriteStatus> compact(
HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
HoodieTable table, HoodieWriteConfig config, String compactionInstantTime,
HoodieCompactionHandler compactionHandler) {
// 没有压缩计划?返回
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
return context.emptyHoodieData();
}
CompactionExecutionHelper executionHelper = getCompactionExecutionStrategy(compactionPlan);
// Transition requested to inflight file.
// 将compaction instant状态转换为inflight
executionHelper.transitionRequestedToInflight(table, compactionInstantTime);
table.getMetaClient().reloadActiveTimeline();
HoodieTableMetaClient metaClient = table.getMetaClient();
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
// Here we firstly use the table schema as the reader schema to read
// log file.That is because in the case of MergeInto, the config.getSchema may not
// the same with the table schema.
// 获取table schema,用来读取log内的数据
try {
if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {
Schema readerSchema = schemaResolver.getTableAvroSchema(false);
config.setSchema(readerSchema.toString());
}
} catch (Exception e) {
// If there is no commit in the table, just ignore the exception.
}
// Compacting is very similar to applying updates to existing file
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compactor compacting " + operations + " files");
String maxInstantTime = getMaxInstantTime(metaClient);
context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName());
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
// 遍历CompactionOperation,执行compact方法
return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, taskContextSupplier, executionHelper))
.flatMap(List::iterator);
}
compact
扫描compaction plan中需要合并的log,和base file执行数据合并。方法内容如下:
/**
* Execute a single compaction operation and report back status.
*/
public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
HoodieTableMetaClient metaClient,
HoodieWriteConfig config,
CompactionOperation operation,
String instantTime,
String maxInstantTime,
TaskContextSupplier taskContextSupplier,
CompactionExecutionHelper executionHelper) throws IOException {
// 获取表所在的文件系统
FileSystem fs = metaClient.getFs();
Schema readerSchema;
Option<InternalSchema> internalSchemaOption = Option.empty();
// 获取最新的表schema(schema evolution)
if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
readerSchema = new Schema.Parser().parse(config.getSchema());
internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
// its safe to modify config here, since we running in task side.
// 配置compactionHandler,实际类型为HoodieTable
((HoodieTable) compactionHandler).getConfig().setDefault(config);
} else {
// 获取write schema
readerSchema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
}
LOG.info("Compaction operation started for base file: " + operation.getDataFileName() + " and delta files: " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
// TODO - FIX THIS
// Reads the entire avro file. Always only specific blocks should be read from the avro file
// (failure recover).
// Load all the delta commits since the last compaction commit and get all the blocks to be
// loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
// 获取compaction可用的最大内存数
// 如果配置了hoodie.memory.compaction.max.size,使用该值
// 如果没有,使用hoodie.memory.compaction.fraction
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
// 获取需要压缩的log文件全路径
List<String> logFiles = operation.getDeltaFileNames().stream().map(
p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
.collect(toList());
// 构造log数据合并扫描器
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
// 配置文件系统(例如HDFS等)
.withFileSystem(fs)
// 配置table的base path
.withBasePath(metaClient.getBasePath())
// 配置需要压缩的log文件路径
.withLogFilePaths(logFiles)
// 配置schema
.withReaderSchema(readerSchema)
// 配置最近的instant time,为commit, delta commit和rollback commit中时间最晚的completed状态的instant time
// log文件中在这个instant time之后的log block不会被扫描到
.withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime, maxInstantTime))
// 配置internal schema
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
// 配置压缩可用的最大内存
// scanner缓存了扫描到的HoodieRecord
// 如果超过内存限制,会spill到磁盘上
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
// 使用启用log block lazy read,默认启用,会减少内存消耗
// 对应配置项为hoodie.compaction.lazy.block.read
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
// 是从头到尾读取log file还是反过来,默认是正向
// 对应配置项为hoodie.compaction.reverse.log.read
.withReverseReader(config.getCompactionReverseLogReadEnabled())
// 读取文件系统的buffer size,对应配置项hoodie.memory.dfs.buffer.max.size
// 默认值为16 * 1024 * 1024
.withBufferSize(config.getMaxDFSStreamBufferSize())
// 前面提到的spill路径配置
// 对应配置项为hoodie.memory.spillable.map.path,默认值是/tmp
.withSpillableMapBasePath(config.getSpillableMapBasePath())
// 溢写磁盘的数据组织形式
// 对应配置项hoodie.common.spillable.diskmap.type。默认为BITCASK
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
// bitcask形式是否开启数据压缩
// 对应配置项hoodie.common.diskmap.compression.enabled。默认为true
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
// 设置是否在meta字段中包含_hoodie_operation
// 对应配置项hoodie.allow.operation.metadata.field,默认为false
.withOperationField(config.allowOperationMetadataField())
// 设置分区
.withPartition(operation.getPartitionPath())
// 是否启用优化log block扫描
// 对应配置项hoodie.optimized.log.blocks.scan.enable。默认为false
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
// 配置record合并器
// 根据配置的合并器class和支持的合并策略筛选出可用的合并器
// 默认为HoodieAvroRecordMerger
// 通过hoodie.datasource.write.record.merger.impls配置合并器class
// 通过hoodie.datasource.write.record.merger.strategy配置合并策略
.withRecordMerger(config.getRecordMerger())
.build();
// 获取现有的base file
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
// Considering following scenario: if all log blocks in this fileSlice is rollback, it returns an empty scanner.
// But in this case, we need to give it a base file. Otherwise, it will lose base file in following fileSlice.
if (!scanner.iterator().hasNext()) {
// 如果没有log file
// file slice回滚的时候可能会发生这种情况
if (!oldDataFileOpt.isPresent()) {
// 关闭scanner
// 写入状态返回空集合
scanner.close();
return new ArrayList<>();
} else {
// TODO: we may directly rename original parquet file if there is not evolution/devolution of schema
/*
TaskContextSupplier taskContextSupplier = hoodieCopyOnWriteTable.getTaskContextSupplier();
String newFileName = FSUtils.makeDataFileName(instantTime,
FSUtils.makeWriteToken(taskContextSupplier.getPartitionIdSupplier().get(), taskContextSupplier.getStageIdSupplier().get(), taskContextSupplier.getAttemptIdSupplier().get()),
operation.getFileId(), hoodieCopyOnWriteTable.getBaseFileExtension());
Path oldFilePath = new Path(oldDataFileOpt.get().getPath());
Path newFilePath = new Path(oldFilePath.getParent(), newFileName);
FileUtil.copy(fs,oldFilePath, fs, newFilePath, false, fs.getConf());
*/
}
}
// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
// 执行合并操作,和数据的更新和插入操作非常相似
// 如果base file存在,执行数据更新
// 如果base file不存在,写入数据到新的base file
// 对于Flink而言这里调用的是HoodieFlinkCopyOnWrite的handleUpdate或HandleInsert方法
result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, instantTime, scanner, oldDataFileOpt);
// 关闭scanner
scanner.close();
// 组装写入状态并返回
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
s.getStat().setPartitionPath(operation.getPartitionPath());
s.getStat()
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
s.getStat().setRuntimeStats(runtimeStats);
}).collect(toList());
}
这里的数据更新和插入操作可参考:Hudi 源码之数据写入逻辑的BaseFlinkCommitActionExecutor 章节。
到这里compaction的逻辑已经分析完毕。但是需要回过头来关注下HoodieMergedLogRecordScanner
。
HoodieMergedLogRecordScanner
默认情况在创建的时候执行full scan(全扫描)。读取所有log文件的record并放入缓存。
HoodieMergedLogRecordScanner
在扫描并缓存log文件的时候会合并log文件中的重复数据。接下来我们分析合并重复数据的相关逻辑。
private HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, String spillableMapBasePath,
Option<InstantRange> instantRange,
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
boolean withOperationField, boolean forceFullScan,
Option<String> partitionName,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
try {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
this.scannedPrefixes = new HashSet<>();
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}
// 默认需要执行完整扫描
// 扫描合并所有待压缩log文件中的record,保存在缓存中
if (forceFullScan) {
performScan();
}
}
performScan
方法调用了scanInternal
。我们查看它的代码:
protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) {
synchronized (this) {
// 是否启用多writer优化,默认未开启
if (enableOptimizedLogBlocksScan) {
scanInternalV2(keySpecOpt, skipProcessingBlocks);
} else {
scanInternalV1(keySpecOpt);
}
}
}
我们分析默认的scanInternalV1
方法。它扫描所有正常的log block,如果该log block是data block,调用processQueuedBlocksForInstant
方法,将此log block中的所有record写入缓存,同时重复数据与缓存中的合并。
private void scanInternalV1(Option<KeySpec> keySpecOpt) {
currentInstantLogBlocks = new ArrayDeque<>();
// ...
try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
LOG.info("Scanning log file " + logFile);
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
// Use the HoodieLogFileReader to iterate through the blocks in the log file
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
// 不处理写入失败和corrupt的block
if (logBlock.getBlockType() != CORRUPT_BLOCK
&& !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed, stop processing further
break;
}
if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) {
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
|| inflightInstantsTimeline.containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
continue;
}
if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
// filter the log block by instant range
continue;
}
}
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
+ logBlock.getLogBlockHeader().get(INSTANT_TIME));
// 对于数据类型的block,如果还没被处理过
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
// 将该log block中的record读入缓存,和缓存中的重复数据合并
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store the current block
currentInstantLogBlocks.push(logBlock);
break;
// ...
}
}
// merge the last read block when all the blocks are done reading
// 合并最后一次读入的数据
if (!currentInstantLogBlocks.isEmpty()) {
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// Done
progress = 1.0f;
} catch (IOException e) {
LOG.error("Got IOException when reading log file", e);
throw new HoodieIOException("IOException when reading log file ", e);
} catch (Exception e) {
LOG.error("Got exception when reading log file", e);
throw new HoodieException("Exception when reading log file ", e);
} finally {
try {
if (null != logFormatReaderWrapper) {
logFormatReaderWrapper.close();
}
} catch (IOException ioe) {
// Eat exception as we do not want to mask the original exception that can happen
LOG.error("Unable to close log format reader", ioe);
}
}
}
processQueuedBlocksForInstant
方法又调用了processDataBlock
方法,然后是processDataBlock
:
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int numLogFilesSeen,
Option<KeySpec> keySpecOpt) throws Exception {
while (!logBlocks.isEmpty()) {
LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
case HFILE_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
break;
case CORRUPT_BLOCK:
LOG.warn("Found a corrupt block which was not rolled back");
break;
default:
break;
}
}
// At this step the lastBlocks are consumed. We track approximate progress by number of log-files seen
progress = (numLogFilesSeen - 1) / logFilePaths.size();
}
private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
checkState(partitionNameOverrideOpt.isPresent() || partitionPathFieldOpt.isPresent(),
"Either partition-name override or partition-path field had to be present");
Option<Pair<String, String>> recordKeyPartitionPathFieldPair = populateMetaFields
? Option.empty()
: Option.of(Pair.of(recordKeyField, partitionPathFieldOpt.orElse(null)));
try (ClosableIteratorWithSchema<HoodieRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt)) {
while (recordIterator.hasNext()) {
HoodieRecord completedRecord = recordIterator.next()
.wrapIntoHoodieRecordPayloadWithParams(recordIterator.getSchema(),
hoodieTableMetaClient.getTableConfig().getProps(),
recordKeyPartitionPathFieldPair,
this.withOperationField,
this.partitionNameOverrideOpt,
populateMetaFields);
processNextRecord(completedRecord);
totalLogRecords.incrementAndGet();
}
}
}
最终到了HoodieMergedLogRecordScanner::processNextRecord
方法。此方法将record放入缓存。如果缓存中数据有重复,使用recordMerger
合并重复数据。RecordMerger合并数据的分析可以参考:Hudi 源码之数据写入逻辑的HoodieAvroRecordMerger::merge 合并数据 章节。
@Override
protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
// 获取record key
String key = newRecord.getRecordKey();
// 从缓存中取出key相同的老数据。
HoodieRecord<T> prevRecord = records.get(key);
if (prevRecord != null) {
// Merge and store the combined record
// 如果存在老数据,合并之
// 可以认为是新数据中的字段值替换老数据的字段值
HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema,
newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != prevRecord.getData()) {
// 如果合并后的数据确实有改动,需要更新这条数据
HoodieRecord latestHoodieRecord =
combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation());
latestHoodieRecord.unseal();
latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
latestHoodieRecord.seal();
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into records(Map).
records.put(key, latestHoodieRecord.copy());
}
} else {
// Put the record as is
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into records(Map).
// 如果缓存中没有找到老数据,直接放入缓存
records.put(key, newRecord.copy());
}
}
Inline Compaction
HoodieFlinkWriteClient
方法commit
的时候会调用commitStats
。这个方法逐个检查是否触发运行table service的条件,如果满足条件运行对应的table service。这些table service包含compaction。
public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
// for eager flush, multiple write stat may share one file path.
List<HoodieWriteStat> merged = writeStats.stream()
.collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath()))
.values().stream()
.map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get())
.collect(Collectors.toList());
return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
}
BaseHoodieWriteClient::commitStat
方法调用了runTableServicesInline
方法,一路追踪到BaseHoodieTableServiceClient::runTableServicesInline
方法。这个方法中涉及inline compaction的配置有3个:
- hoodie.compact.inline:每次写入之后是否触发compaction。默认为false,启用会增大写入延迟。
- hoodie.compact.schedule.inline:每次写入之后是否会schedule compaction。只是尝试schedule,前面提到的compaction条件仍然是有效的。默认为false。
- hoodie.log.compaction.inline:每次写入之后是否会触发log compaction。默认为false,启用会增大写入延迟。
protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
// 如果没有启用table service,不执行
if (!tableServicesEnabled(config)) {
return;
}
// ...
// Do an inline compaction if enabled
// 如果启用的inline compaction
// hoodie.compact.inline
if (config.inlineCompactionEnabled()) {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
// 尝试schedule然后执行compaction
inlineCompaction(table, extraMetadata);
} else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
}
// if just inline schedule is enabled
// 如果只是inline schedule
// hoodie.compact.schedule.inline
if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
&& table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
// proceed only if there are no pending compactions
metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
// schedule一个compaction
inlineScheduleCompaction(extraMetadata);
}
// Do an inline log compaction if enabled
// 如果启用了inline log compaction
// hoodie.log.compaction.inline
if (config.inlineLogCompactionEnabled()) {
runAnyPendingLogCompactions(table);
metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
inlineLogCompact(extraMetadata);
} else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
}
// ...
}
网友评论