BucketingSink是Flink连接外部文件系统的Sink的实现,支持Hadoop文件系统支持的所有文件系统,提供了多种文件滚的策略,支持exactly-once语义
。虽然在新版本中已经过期,但它仍然有源码阅读的价值。
前言
首先来看 BucketingSink都实现了哪些类:
public class BucketingSink<T>
extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback
- 继承了
RichSinkFunction
- 实现了
InputTypeConfigurable,CheckpointedFunction,CheckpointListener, ProcessingTimeCallback
InputTypeConfigurable
就比较简单,设置输入元素的类型。其它我会对每一个类的实现做解析:
RichSinkFunction::invoke()
invoke
是BucketingSink
的入口方法,当收到数据时调用此方法,下面是它的实现:
public void invoke(T value) throws Exception {
// 得到数据输出的路径
Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
//获取当前的ProcessTime
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
// 1. 从state中得到BucketState
BucketState<T> bucketState = state.getBucketState(bucketPath);
if (bucketState == null) {
// 如果状态为空(也就是处理inprogress的bucket),则新建一个实例
bucketState = new BucketState<>(currentProcessingTime);
//加入到state中
state.addBucketState(bucketPath, bucketState);
}
// 2. 判断是否可以关闭(滚动)现在的bucket。简单来说就是判断是否还有处理inprogress的bucket,并且是否可以关闭
if (shouldRoll(bucketState, currentProcessingTime)) {
// 3. 如果可以则新建一个inprogress的文件
openNewPartFile(bucketPath, bucketState);
}
//数据写入bucket中
bucketState.writer.write(value);
//更新最后一次的写入时间
bucketState.lastWrittenToTime = currentProcessingTime;
}
代码逻辑都比较简单明了,最主要的是理解里面的一些类的结构和一些方法: BucketState
的结构, shoudRoll和openNewPartFile
方法的实现。
-
BucketState
- 用于记录当前处理inprogress的文件,以及当checkpoint完成时,由in-progress变为pendding状态的文件。下面是主要的类的属性,
static final class BucketState<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 当前正在处理的文件(in-progress)
*/
String currentFile;
/**
* 上一个checkpoing的in-progress文件的有效长度
*/
long currentFileValidLength = -1;
/**
* bucket上一次写入数据的时间
*/
long lastWrittenToTime;
/**
* 文件创建的时间
*/
long creationTime;
/**
* 关闭Bucket时生成的pending file
*/
List<String> pendingFiles = new ArrayList<>();
/**
* snapshotState时,把pendinbgFile加入到此map,每个checkpointId对应的 pending file
*/
final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
......
-
shouldRoll()
- 判断是否应该关闭现在的bucket(in-progress),满足以下三个条件之一:- 还没有文件被创建过,这里不能单纯的理解为第一条数据来的时候。应该理解为路径下还没有In-progress文件。
- 当前的文件已经达到最大大小的配置(默认384M)
- 达到了最大的文件滚动时间
private boolean shouldRoll(BucketState<T> bucketState, long currentProcessingTime) throws IOException {
boolean shouldRoll = false;
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (!bucketState.isWriterOpen) {
// 满足第一个条件,还没有in-progress的文件,一般是启动任务或者,上一个in-progress文件被关闭后
shouldRoll = true;
LOG.debug("BucketingSink {} starting new bucket.", subtaskIndex);
} else {
//获取当前文件的大小,如果大于则返回true
long writePosition = bucketState.writer.getPos();
if (writePosition > batchSize) {
shouldRoll = true;
LOG.debug(
"BucketingSink {} starting new bucket because file position {} is above batch size {}.",
subtaskIndex,
writePosition,
batchSize);
} else {
// 当前处理时间减去文件创建时间,大于设置的最大滚动时间,返回true
if (currentProcessingTime - bucketState.creationTime > batchRolloverInterval) {
shouldRoll = true;
LOG.debug(
"BucketingSink {} starting new bucket because file is older than roll over interval {}.",
subtaskIndex,
batchRolloverInterval);
}
}
}
return shouldRoll;
}
-
openNewPartFile()
- 关闭当前(in-progress)文件,并创建一个新的in-progress文件
private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception {
//关闭当前(in-progress)的文件,并且把.in-progress的文件改名成.pending后缀的文件
closeCurrentPartFile(bucketState);
if (!fs.exists(bucketPath)) {
try {
if (fs.mkdirs(bucketPath)) {
LOG.debug("Created new bucket directory: {}", bucketPath);
}
} catch (IOException e) {
throw new RuntimeException("Could not create new bucket path.", e);
}
}
/**
* 下面代码块的逻辑是按升序获取partCounter值,直到达到尚未使用的最小值。
* 因为不同的并行子任务是单线程的,所以这里不会有并发问题。所以我们可以看到文件名称为
* part-1-0,part-1-1=》第一个数字是subtaskIndex,第二个数字是partCounter
*/
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); // 获取子任务的index
//得到文件名 String.format("%s-%s-%s%s", partPrefix, subtaskIndex, partCounter, partSuffix)
Path partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter);
// 循环检查路径下是否已经存在同名文件,或者同名pending的文件前缀,同名in-progress文件前缀
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter);
}
// 记录文件创建时间
bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
// 再次加1,因为上面的while循环如果不成立是不加1的,下一次进来的时候还得从头开始
bucketState.partCounter++;
LOG.debug("Next part path is {}", partPath.toString());
bucketState.currentFile = partPath.toString();
// 加上.in-progress的后缀
Path inProgressPath = getInProgressPathFor(partPath);
// 设置writer,默认用StringWriter
if (bucketState.writer == null) {
//每个并行子任务,使用不同的Writer实例。要重新new
bucketState.writer = writerTemplate.duplicate();
if (bucketState.writer == null) {
throw new UnsupportedOperationException(
"Could not duplicate writer. " +
"Class '" + writerTemplate.getClass().getCanonicalName() + "' must implement the 'Writer.duplicate()' method."
);
}
}
// 打开文件
bucketState.writer.open(fs, inProgressPath);
bucketState.isWriterOpen = true;
}
closeCurrentPartFile
方法比较简单,就不用多说。openNewPartFile
方法的逻辑也比较清晰,首先关闭当前处理的文件,并且重命名为pending文件。然后创建新的progress文件,打开bucket。
总结下invoke的流程:
1.获取当前子任务当前分区路径下的文件状态
2.判断当前的state是否可以关闭。
3.如果满足关闭条件,则先关闭,再新建一个bucket.
4.最后写入数据到bucket.
CheckpointedFunction
接着看BucketingSink
的状态管理,BucketingSink使用的是ListState,里面主要管理的是当处理的文件的各种信息,包括当前的inprogress文件,pending文件列表,文件创建时间等信息,具体内容可以结合上面的BucketState类观看阅读。
-
initializeState()
- 初始化状态,任务启动时,从checkpoint恢复状态
public void initializeState(FunctionInitializationContext context) throws Exception {
Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
//初始化FileSystem,简单的理解就是像连接Mysql一样,初始化连接
try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when initializing the state of the BucketingSink.", e);
}
//反射获取truncate方法,如果hadoop版本没有truncate方法,从checkpoint恢复时,用最近成功一次的checkpoint创建一个.valid-length的文件
if (this.refTruncate == null) {
this.refTruncate = reflectTruncate(fs);
}
OperatorStateStore stateStore = context.getOperatorStateStore();
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
for (State<T> recoveredState : restoredBucketStates.get()) {
/*
* 1.peding状态的文件全部恢复成part file
* 2.in-progress的文件全部恢复成part file,如果有truncate方法,则恢复的文件则是不包含失效的部分,如果没有,则新建一个同名的.valid-length,
* 但是,使用时,需要自己处理,只读取有效的部分。
*/
handleRestoredBucketState(recoveredState);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
}
}
} else {
LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
}
}
比较重要的是handleRestoredBucketState
方法,它做了这样几件事情
1. 清除掉pendingFiles的list,因为pendingFiles的list只是关闭文件时由progress重命名而来,它会在执行checkpoint时,放到pendingFile对应的map中,所以只需要恢复map中的pendingFile.
2. 处理in-progress文件:首先判断是否有同名的pending和in-progress文件,如果存在, 则rename成partfile文件,接着是恢复上一次完成的checkpoint,如果支持truncate方法,则直接把多余的截取掉,但是如果不支持truncate方法,则新建一个同名的.valid-length文件,里面记录了有效的长度,后续读取这个文件时需要自己处理只读取有效的部分。
我还是举个例子来说明这里吧:
- 假如part-1.0.in-progress文件里已经写入了以下发条数据,此时做了checkpoint,那么成功后state里的有效长度是: 25
flink is nice
java is nice
- chekpoint后,又来了一条数据,此时的有效长度是39,但是还没有做checkpoint,系统就挂了此时part-1-0.in-progress文件里有三条数据。
scala is nice
- 从checkpoint恢复时,会先把part-1-0.in-progress文件变成part-1-0,里面依然有三条数据,但是最近成功的checkpoint里只承认两条是有效的,也就是valid-length是25。如果hadoop支持truncate方法,则直接会把part-1-0里的第三条数据截取的,只保留有效的数据
flink is nice
java is nice
反之,如果 hadoop 不支持truncate方法,则新建一个part-1-0.valid-length文件,里面存入有效长度值25,此时part-1-0文件还是三条数据
flink is nice
java is nice
scala is nice
所以这需要后续读取这个文件时,先读取part-1-0.valid-length文件,拿到有效长度,再从part-1-0文件里拿到有效数据。
-
snapshotState()
- 保存当前子任务的快照,执行checkpoint,注意这里执行后变不是立马成功,要notifyCheckpointComplete之后才算成功。先上代码:
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
//先先掉上一次的state
restoredBucketStates.clear();
synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
//遍历bucketStates
for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();
// 如果文件还没有关闭,要再取一次有效长度,确保是最新的有效长度
if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}
synchronized (bucketState.pendingFilesPerCheckpoint) {
// 把pending文件放入pendingFilesPerCheckpoint的map中,
// key是checkpointId,value是pending文件列表(因为如果checkpoint时间设置过长,有可能有多个pending文件)
// 等待notifyCheckpointComplete之后,才把里面的文件变成partfile
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);
if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}
}
snapshotState的逻辑比较简单,简单来说就是保存状态,然后把pending文件按checkpointId存入,等待notifyCheckpointComplete后,再把pending文件变成partfile文件。
CheckpointListener
上面提到的snapshotState后,等待notifyCheckpointComplete通知完成,把pending文件变成partfile。
-
notifyCheckpointComplete()
- 通知checkpoint执行完成,把pending文件变成partfile文件,整个流程结束
public void notifyCheckpointComplete(long checkpointId) throws Exception {
synchronized (state.bucketStates) {
//遍历bucketStates
Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = state.bucketStates.entrySet().iterator();
while (bucketStatesIt.hasNext()) {
BucketState<T> bucketState = bucketStatesIt.next().getValue();
synchronized (bucketState.pendingFilesPerCheckpoint) {
Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt =
bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
// 再遍历pendingFilesPerCheckpoint,
while (pendingCheckpointsIt.hasNext()) {
Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next();
//拿到checkpointId
Long pastCheckpointId = entry.getKey();
List<String> pendingPaths = entry.getValue();
// 和当前完成的checkpointId比较,小于等于则可以处理
if (pastCheckpointId <= checkpointId) {
LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
for (String filename : pendingPaths) {
Path finalPath = new Path(filename);
Path pendingPath = getPendingPathFor(finalPath);
fs.rename(pendingPath, finalPath);
LOG.debug(
"Moving pending file {} to final location having completed checkpoint {}.",
pendingPath,
pastCheckpointId);
}
//用迭代器,确保删除不报错
pendingCheckpointsIt.remove();
}
}
// 这里的场景暂时没想到,它这里说是Writer被关闭,pendingFiles却是空的。但是pendingFiles为空
//就说明in-progress的文件没有被关闭,就说是Writer就不会被关闭,所以这里没弄明白。
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
// We've dealt with all the pending files and the writer for this bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
bucketStatesIt.remove();
}
}
}
}
}
逻辑都写在代码里了,这里就不再多说。
ProcessingTimeCallback
最后再来说 ProcessingTimeCallback的onProcessingTime方法,前面我们提到关闭文件要满足三个条件之一:
- 还没有文件被创建过,这里不能单纯的理解为第一条数据来的时候。应该理解为路径下还没有In-progress文件。
- 当前的文件已经达到最大大小的配置(默认384M)
- 达到了最大的文件滚动时间
那么onProcessingTime就是第4条件,它会定期检查in-progress的Bucket,当发现文件处理非活跃状态的时间达到配置的时间后,就会关闭这个文件。
public void onProcessingTime(long timestamp) throws Exception {
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
// 定时关闭文件
closePartFilesByTime(currentProcessingTime);
//注册下一次的Timer
processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
}
private void closePartFilesByTime(long currentProcessingTime) throws Exception {
synchronized (state.bucketStates) {
for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
// 两个条件:
// 1. 当前时间减去配置的非活跃阈值大于 最后一次数据写入时间
// 2. 当前时间减去最大滚动阈值大于文件创建时间(这个和前面的条件一样,放在这里是因为这里不需要数据来驱动。)
if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold)
|| (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) {
LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
closeCurrentPartFile(entry.getValue());
}
}
}
}
总结
以上就是BucketingSink的源码解析,总的来说,阅读BucketingSink的源码还是能学到很多东西的,比如Flink的state用法以及于如何运用Flink的Checkpoint机制来保证exactly-once
,但是它还是有个弊端就是,它的状态管理用的是ListState, 也必须是ListState(因为你无法保证用户在使用BucketingSink是key by的,要通用必须用ListState),而文件的命名方式借助于subTaskIndex,所以当更改并行度时,数据不能正常恢复。
后续我本来打算再写一个StreamingFileSink的源码解析的,但是我发现StreamingFileSink的总体思想和BucketingSink的思想是差不多的,只是StreamingFileSink的实现比较灵活,对各个模块组件化了(代码很绕),代码也很容易看懂,但是很费时间,也比较容易忘记,个人觉得StreamingFileSink的代码阅读价值并不高于BucketingSink,所以我就没有写StreamingFileSink的源码解析。在这里就简单比较下两者的差别吧。
- BucketingSink和StreamingFileSink的state管理都使用了ListState,不同在于StreamingFileSink存的是序列化后的对象(byte),所以StreamingFileSink在checkpoint对网络宽带占用较少,但checkpoint时,耗时可能会比BucketingSink多。还有StreamingFileSink的state还多了partCounter的计数,这个很重要,这样就不用每次遍历目录下的文件去拿partCounter了,直接从state里取。
- 从Checkpoint恢复时,两者对in-progress文件的rollback也有所不同,BucketingSink是兼容了hadoop版本对truncate方法的支持,而StreamingFileSink只适用于hadoop2.7或者以上的版本,也就是说,StreamingFileSink不会有.valid-length的文件了。
- StreamingFileSink的滚动策略更丰富,用户也可以自己实现自己的滚动策略, 所以StreamingFileSink更灵活。
以上3点是BucketingSink和StreamingFileSink比较明显的区别。我个人觉得,在使用上,如果你的hadoop是2.7或者以上的版本,我推荐用StreamingFileSink,反之使用BucketingSink; 在代码阅读学习上,比较推荐阅读BucketingSink,因为思想是差不多,而且阅读起来比StreamingFileSink轻松,代码逻辑更加清晰。
网友评论