Flink 系列博客
Flink QuickStart
Flink双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
Flink 源码之WindowOperator
Flink 源码之StreamGraph生成
Flink 源码之JobGraph生成
Flink 源码之两阶段提交
Flink 源码之分布式快照
Flink 源码之时间处理
Flink 源码之节点间通信
Flink 源码之Credit Based反压
Flink 源码之快照
Flink 源码之FlinkKafkaConsumer
Flink 源码之内存管理
背景
Unaligned Checkpoint是Flink 1.11 新增的功能。在Flink之前的版本,checkpoint的对齐操作会使先收到barrier的input channel后续到来的数据缓存起来,一直等到所有的input channel都接收到chechkpoint barrier并且checkpoint操作完毕后,才放开数据进入operator。这样虽然保证了exactly-once,但是显著的增加了延迟,降低了性能。如果再遇到数据反压,情况会更加糟糕。
Unaligned Checkpoint的引入解决了传统Aligned Checkpoint同时数据高反压的场景下,一条数据流延迟高会影响到另一个数据流的问题。Unaligned checkpoint改变了过去checkpoint的逻辑。主要有以下几点:
- 如果有一个input channel接收到barrier,开始checkpoint过程,并记录下checkpoint id。
- 在operator输出缓存头部(最先出缓存的位置)中插入一个新的checkpoint barrier,用于向下游广播。
- 从各个input channel读取数据buffer写入到checkpoint,直到读取到checkpoint id为先前记录的id的barrier。(1)中的input channel由于已经读取到barrier了,它之后的数据不会被记录到checkpoint中。
- Aligned checkpoint在所有input channel接收到barrier候触发,unaligned checkpoint在任何一个input channel接收到第一个barrier时触发。
- Unaligned checkpoint不会阻塞任何input channel。
以上步骤用Flink官网的图描述如下:
Unaligned Checkpoint
其中黄色部分的数据需要写入到checkpoint中,包含输入端所有channel的checkpoint barrier之后的数据buffer,operator内部的状态和输出端buffer。
本篇围绕代码部分分析,关于Unaligned checkpoint特点部分不做过多的介绍。更为详细的解读请参考:https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
中文版的分析请参考:https://developer.aliyun.com/article/768710
关于Checkpoint全过程的分析请参考博客:Flink 源码之快照
Flink 1.10之前版本的Checkpoint barrier,barrier对齐操作和非对齐操作(无法保证exactly-once)相关的分析请参考博客:Flink 源码之分布式快照
源代码解析
Unaligned Checkpoint的逻辑主要在CheckpointBarrierUnaligner
和SubtaskCheckpointCoordinatorImpl
中。下面我们从配置的读取过程开始,分析Unaligned Checkpoint的实现原理。
InputProcessorUtil
CheckpointBarrierUnaligner
在InputProcessorUtil
的createCheckpointBarrierHandler
方法被创建出。InputProcessorUtil
负责为InputProcessor
创建CheckpointedInputGate
。
创建barrier handler的代码逻辑和解析如下所示:
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamConfig config,
InputGate[] inputGates,
SubtaskCheckpointCoordinator checkpointCoordinator,
String taskName,
AbstractInvokable toNotifyOnCheckpoint) {
// 读取配置中的checkpoint模式
switch (config.getCheckpointMode()) {
// 如果是exactly once模式
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
// 如果启用的unaligned checkpoint,则创建一个AlternatingCheckpointBarrierHandler
// AlternatingCheckpointBarrierHandler为一个组合类型
// 可以根据checkpoint barrier类型(checkpoint或savepoint),选择使用对应的CheckpointBarrierHandler
// 具体在后面分析
return new AlternatingCheckpointBarrierHandler(
new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates),
new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, toNotifyOnCheckpoint, inputGates),
toNotifyOnCheckpoint);
}
// 如果没有启用unaligned checkpoint,则返回CheckpointBarrierAligner
return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
case AT_LEAST_ONCE:
// 如果是at least once模式并启用了unaligned checkpoint,会抛出异常,不支持这种场景
if (config.isUnalignedCheckpointsEnabled()) {
throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE " +
"checkpointing mode");
}
// 计算出所有inputGate包含的inputChannel总数
int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
// 创建一个CheckpointBarrierTracker,用于老版本的非对齐checkpoint,无法保证精准一次
return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}
}
AlternatingCheckpointBarrierHandler
该类的功能我们在上一段代码分析中已经解释过了。接下来我们看一下它是怎么处理CheckpointBarrier
的。我们查看下processBarrier
方法,如下所示:
@Override
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws Exception {
// 如果接受到的barrier的id比上一个接收到的barrier id更小,说明这个barrier迟到,会被忽略
if (receivedBarrier.getId() < lastSeenBarrierId) {
return;
}
// 更新上一次接收到的barrier id
lastSeenBarrierId = receivedBarrier.getId();
// 获取上一个hander
CheckpointBarrierHandler previousHandler = activeHandler;
// 如果接收到的checkpointBarrier类型为checkpoint(另一种类型为savepoint),使用非对齐handler
activeHandler = receivedBarrier.isCheckpoint() ? unalignedHandler : alignedHandler;
// 如果上一个handler和当前handler不同,说明遇到了不同类型的barrier,则终止上一个handler正在进行的checkpoint操作
if (previousHandler != activeHandler) {
previousHandler.abortPendingCheckpoint(
lastSeenBarrierId,
new CheckpointException(format("checkpoint subsumed by %d", lastSeenBarrierId), CHECKPOINT_DECLINED_SUBSUMED));
}
// 调用activeHandler(unalignedHandler)的processBarrier方法
activeHandler.processBarrier(receivedBarrier, channelInfo);
}
CheckpointBarrierUnaligner
CheckpointBarrierUnaligner
负责记录各个input channel接收checkpoint barrier的状态,在接收到第一个barrier的时候触发checkpoint操作。
各个input channel记录的接收barrier状态在hasInflightBuffers
变量,定义如下:
private final Map<InputChannelInfo, Boolean> hasInflightBuffers;
该变量用于存储每个input channel是否有inflight buffer。
什么是inflight buffer?Inflight buffer指的是input channel的receivedBuffer范围内,在id为当前checkpoint id的barrier之后的所有数据buffer
CheckpointedInputGate
每次接收到一个barrier都会调用CheckpointBarrierUnaligner
的processBarrier
方法。该方法代码如下:
@Override
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws Exception {
// 获取接收到的barrier id
long barrierId = receivedBarrier.getId();
// 忽略掉旧的或取消了的checkpoint的barrier
if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
return;
}
// 如果当前进行的checkpoint id小于接收到的barrier id,说明需要开始处理新的checkpoint
if (currentConsumedCheckpointId < barrierId) {
// 更新当前进行的checkpoint
currentConsumedCheckpointId = barrierId;
// 重置接收的barrier数量为0
numBarrierConsumed = 0;
// hasInflightBuffers为一个map,保存了每个channel是否有inflight buffer
// inflight buffer的含义为在当前checkpoint对应的barrier之前接收到的buffer
// 这里设置所有的input channel都有inflight buffer
hasInflightBuffers.entrySet().forEach(hasInflightBuffer -> hasInflightBuffer.setValue(true));
}
// 如果当前进行的checkpoint id等于接收到的barrier id,说明channel中的inflight buffer已经处理完毕
// (第一个收到某特定checkpoint的barrier的input channel,会在这里把状态设置为false)
if (currentConsumedCheckpointId == barrierId) {
// 设置该channel不再有inflight buffer
hasInflightBuffers.put(channelInfo, false);
// 增加接收到的barrier数量
numBarrierConsumed++;
}
// 调用threadSafeUnaligner.notifyBarrierReceived
threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfo);
}
这段代码的主要作用是实时统计各个input channel是否还有inflight buffer。
在方法最后调用了threadSafeUnaligner
的notifyBarrierReceived
。在这之前我们需要先熟悉下threadSafeUnaligner
的部分重要成员变量。
threadSafeUnaligner
一些成员变量的定义和解释如下:
/**
* Tag the state of which input channel has not received the barrier, such that newly arriving buffers need
* to be written in the unaligned checkpoint.
*/
// 用来表示channel是否没有接收到barrier,true表示没有接收到
private final Map<InputChannelInfo, Boolean> storeNewBuffers;
/** The number of input channels which has received or processed the barrier. */
// 记录接收到多少个barrier
private int numBarriersReceived;
/** A future indicating that all barriers of the a given checkpoint have been read. */
// 如果所有的input channel都接收到了barrier,这个CompletableFuture会complete,其他使用whenComplete等待该变量状态变化的地方
private CompletableFuture<Void> allBarriersReceivedFuture = FutureUtils.completedVoidFuture();
现在,我们看一下notifyBarrierReceived
方法的代码:
@Override
public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
long barrierId = barrier.getId();
// 需要处理新的checkpoint
if (currentReceivedCheckpointId < barrierId) {
// 处理新的checkpoint,下面分析
handleNewCheckpoint(barrier);
// 在task的线程,调用handler的notifyCheckpoint,通知checkpoint开始
handler.executeInTaskThread(() -> handler.notifyCheckpoint(barrier), "notifyCheckpoint");
}
// 如果当前进行的checkpoint id等于接收到的barrier id
// 并且该channel没有接收到barrier
if (barrierId == currentReceivedCheckpointId && storeNewBuffers.get(channelInfo)) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received barrier from channel {} @ {}.", handler.taskName, channelInfo, barrierId);
}
// 修改状态为该channel已接收到buffer
storeNewBuffers.put(channelInfo, false);
// 已接收到barrier计数自增
// 如果和channel数相等,allBarriersReceivedFuture调用complete方法,标记所有的input channel都接收到了barrier
if (++numBarriersReceived == numOpenChannels) {
allBarriersReceivedFuture.complete(null);
}
}
}
这里面调用了handleNewCheckpoint
,我们展开分析下:
private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws IOException {
long barrierId = barrier.getId();
// 如果上一轮checkpoint还有input channel没有收到barrier,表明checkpoint过程异常
if (!allBarriersReceivedFuture.isDone()) {
// 创建异常
CheckpointException exception = new CheckpointException("Barrier id: " + barrierId, CHECKPOINT_DECLINED_SUBSUMED);
// 如果checkpoint正在进行,终止checkpoint过程并通知handler checkpoint过程终止
if (isCheckpointPending()) {
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
handler.taskName,
barrierId,
currentReceivedCheckpointId);
// let the task know we are not completing this
final long currentCheckpointId = currentReceivedCheckpointId;
handler.executeInTaskThread(() -> handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
}
// allBarriersReceivedFuture带异常状态完成
allBarriersReceivedFuture.completeExceptionally(exception);
}
// 标记checkpoint过程开始
handler.markCheckpointStart(barrier.getTimestamp());
// 更新当前的checkpoint id
currentReceivedCheckpointId = barrierId;
// 初始化storeNewBuffers,设置所有的input channel都没有收到barrier
storeNewBuffers.entrySet().forEach(storeNewBuffer -> storeNewBuffer.setValue(true));
// 重置接收到的barrier计数器
numBarriersReceived = 0;
// 重新创建一个allBarriersReceivedFuture
allBarriersReceivedFuture = new CompletableFuture<>();
// 告诉checkpoint协调器,初始化一个checkpoint,checkpoint过程开始
checkpointCoordinator.initCheckpoint(barrierId, barrier.getCheckpointOptions());
}
对于checkpointCoordinator
我们专门在下个章节分析。因为initCheckpoint
方法的逻辑不多,提前在这里分析一下。
checkpointCoordinator
的实现类为SubtaskCheckpointCoordinatorImpl
,我们查看下它的initCheckpoint
方法。
@Override
public void initCheckpoint(long id, CheckpointOptions checkpointOptions) {
// 如果启用了unaligned checkpoint,调用ChannelStateWriter的start方法
if (checkpointOptions.isUnalignedCheckpoint()) {
channelStateWriter.start(id, checkpointOptions);
}
}
ChannelStateWriter
负责在checkpoint或者是savepoint过程中异步写入channel状态,正是这个类负责把各个input channel的inflight buffer和operator的输出缓存(ResultSubpartition)的内容记录到checkpoint。
ChannelStateWriter
的start
的作用为开始记录一个新的checkpoint的内容,初始化写入状态。ChannelStateWriter
实现比较复杂,本篇暂不分析。
ThreadSafeUnaligner
继承了BufferReceivedListener
接口,因此有一个notifyBufferReceived
方法,该方法在RemoteInputChannel
的onBuffer
方法中调用:
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
boolean recycleBuffer = true;
try {
// ...
if (notifyReceivedBarrier != null) {
receivedCheckpointId = notifyReceivedBarrier.getId();
if (notifyReceivedBarrier.isCheckpoint()) {
listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
}
} else if (notifyReceivedBuffer != null) {
// 此处调用了notifyBufferReceived
listener.notifyBufferReceived(notifyReceivedBuffer, channelInfo);
}
} finally {
if (recycleBuffer) {
buffer.recycleBuffer();
}
}
}
意思是每当input channel接收到一个数据块,就会调用监听器的notifyBufferReceived
方法。ThreadSafeUnaligner
正是一个监听器。
接下来分析下该方法的源代码:
@Override
public synchronized void notifyBufferReceived(Buffer buffer, InputChannelInfo channelInfo) {
// 如果这个channel没有接收到checkpoint barrier
// 即将input channel中checkpoint barrier之后的数据都写入checkpoint中
if (storeNewBuffers.get(channelInfo)) {
// 使用ChannelStateWriter写入buffer内容
// addInputData方法将buffer加入到ChannelStateWriter中,等待稍后写入到checkpoint
checkpointCoordinator.getChannelStateWriter().addInputData(
currentReceivedCheckpointId,
channelInfo,
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
ofElement(buffer, Buffer::recycleBuffer));
} else {
buffer.recycleBuffer();
}
}
SubtaskCheckpointCoordinatorImpl
该类是checkpoint流程的总协调器,和CheckpointCoordinator
不同的是,它只负责本篇Flink新特性的Unaligned checkpoint相关的协调工作。
SubtaskCheckpointCoordinatorImpl
中完整的checkpoint执行过程在checkpointState
方法。如下所示:
@Override
public void checkpointState(
CheckpointMetaData metadata,
CheckpointOptions options,
CheckpointMetrics metrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled) throws Exception {
checkNotNull(options);
checkNotNull(metrics);
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// 检查上一个checkpoint的 id是否比metadata的checkpoint id小
// 否则存在checkpoint barrier乱序的可能,终止掉metadata.getCheckpointId()对应的checkpoint操作
if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
}
// 下面开始正式的checkpoint流程
// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
// 第0步,更新lastCheckpointId变量
// 如果当前checkpoint被取消,广播CancelCheckpointMarker到下游,表明这个checkpoint被终止
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
return;
}
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
// 第1步,调用operatorChain的prepareSnapshotPreBarrier方法,执行checkpoint操作前的预处理逻辑
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
// 第2步,向下游广播CheckpointBarrier
// 这里有个关键点:第二个参数为isPriorityEvent,连续跟踪代码后发现调用的是PipelinedSubpartition中的add方法,
// 如果isPriorityEvent为true,表示把这个barrier插入到ResultSubpartition的头部
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
options.isUnalignedCheckpoint());
// Step (3): Prepare to spill the in-flight buffers for input and output
// 第3步是一个关键点,如果启用了unaligned checkpoint,将所有input channel中checkpoint barrier后的buffer写入到checkpoint中
// 这个方法稍后分析
if (options.isUnalignedCheckpoint()) {
prepareInflightDataSnapshot(metadata.getCheckpointId());
}
// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the
// streaming topology
// 第4步,异步执行checkpoint操作,checkpoint数据落地
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
try {
if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
finishAndReportAsync(snapshotFutures, metadata, metrics, options);
} else {
cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, ex);
throw ex;
}
}
Operator输出buffer写入到checkpoint的调用过程在SubtaskCheckpointCoordinatorImpl
的prepareInflightDataSnapshot
方法,代码如下:
private void prepareInflightDataSnapshot(long checkpointId) throws IOException {
// 获取ResultPartitionWriter
ResultPartitionWriter[] writers = env.getAllWriters();
for (ResultPartitionWriter writer : writers) {
for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) {
// 遍历每个writer的ResultSubpartition
ResultSubpartition subpartition = writer.getSubpartition(i);
// 把所有subpartition中的数据加入到channelStateWriter中,等待稍后写入到checkpoint
channelStateWriter.addOutputData(
checkpointId,
subpartition.getSubpartitionInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
subpartition.requestInflightBufferSnapshot().toArray(new Buffer[0]));
}
}
// 调用finishOutput方法表明完成了所有的addOutputData操作
channelStateWriter.finishOutput(checkpointId);
// 这里返回一个future,所有的input channel是否都收到了barrier,如果有channel没有收到barrier,下面的ex不为null
// apply调用的是StreamTask的prepareInputSnapshot方法
// prepareInputSnapshot在StreamTask类创建subtaskCheckpointCoordinator时被初始化
prepareInputSnapshot.apply(channelStateWriter, checkpointId)
.whenComplete((unused, ex) -> {
if (ex != null) {
// complete时如果存在input channel没有收到barrier,则调用abort(终止)方法
channelStateWriter.abort(checkpointId, ex, false /* result is needed and cleaned by getWriteResult */);
} else {
// complete时如果所有的input channel都收到了barrier, 调用finishInput方法
channelStateWriter.finishInput(checkpointId);
}
});
}
下面详细说一下prepareInputSnapshot.apply(channelStateWriter, checkpointId)
调用。这里实际调用的是StreamTask
的prepareInputSnapshot
方法,如下所示:
private CompletableFuture<Void> prepareInputSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
if (inputProcessor == null) {
return FutureUtils.completedVoidFuture();
}
// InputProcessor准备状态快照操作
return inputProcessor.prepareSnapshot(channelStateWriter, checkpointId);
}
这里调用了inputProcessor
的prepareSnapshot
方法。inputProcessor
有多个实现类,例如StreamOneInputProcessor
的prepareSnapshot
方法如下:
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
// 又调用了StreamTaskInput的prepareSnapShot方法
return input.prepareSnapshot(channelStateWriter, checkpointId);
}
对于非对齐checkpoint场景下更具有代表性的多输出流场景,我们查看下StreamTwoInputProcessor
的prepareSnapshot
方法:
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
return CompletableFuture.allOf(
input1.prepareSnapshot(channelStateWriter, checkpointId),
input2.prepareSnapshot(channelStateWriter, checkpointId));
}
我们发现它返回一个CompletionFuture
,只有两个input的prepareSnapshot
方法都执行完毕后才会complete。
对于input的prepareSnapshot
,我们查看下StreamTaskNetworkInput
的prepareSnapshot
方法,代码如下:
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
// 遍历所有的record反序列化器
// 将反序列化器中尚未消费完的buffer存入checkpoint
for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) {
final InputChannel channel = checkpointedInputGate.getChannel(channelIndex);
// Assumption for retrieving buffers = one concurrent checkpoint
RecordDeserializer<?> deserializer = recordDeserializers[channelIndex];
if (deserializer != null) {
// 将反序列化器中为消费完的buffer写入channelStateWriter
channelStateWriter.addInputData(
checkpointId,
channel.getChannelInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
deserializer.getUnconsumedBuffer());
}
// 将所有channel的inflight buffer写入checkpoint
checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, channelStateWriter);
}
// 返回一个CompletableFuture,表示是否所有的input channel都接收到了barrier
return checkpointedInputGate.getAllBarriersReceivedFuture(checkpointId);
}
我们继续查看CheckpointedInputGate
的spillInflightBuffers
方法。如下所示:
public void spillInflightBuffers(
long checkpointId,
int channelIndex,
ChannelStateWriter channelStateWriter) throws IOException {
InputChannel channel = inputGate.getChannel(channelIndex);
// 判断该channel是否有inflight buffer,如果有,执行channel的spillInflightBuffers方法
if (barrierHandler.hasInflightData(checkpointId, channel.getChannelInfo())) {
channel.spillInflightBuffers(checkpointId, channelStateWriter);
}
}
接下来分析RemoteInputChannel
的spillInflightBuffers
方法:
@Override
public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException {
synchronized (receivedBuffers) {
checkState(checkpointId > lastRequestedCheckpointId, "Need to request the next checkpointId");
final List<Buffer> inflightBuffers = new ArrayList<>(receivedBuffers.size());
// 遍历所有已接收的buffer
for (Buffer buffer : receivedBuffers) {
// 解析buffer承载的是数据还是event,如果是event是否是CheckpointBarrier类型,除了Checkpoint类型外一律返回null
CheckpointBarrier checkpointBarrier = parseCheckpointBarrierOrNull(buffer);
// 如果这个if成立,说明找到当前或下一个checkpoint barrier,添加inflightBuffers操作停止
if (checkpointBarrier != null && checkpointBarrier.getId() >= checkpointId) {
break;
}
// 如果buffer承载的是数据,则添加到inflightBuffers集合中
if (buffer.isBuffer()) {
inflightBuffers.add(buffer.retainBuffer());
}
}
// 更新lastRequestedCheckpointId,防止重复操作
lastRequestedCheckpointId = checkpointId;
// 将inflightBuffers中数据写入ChannelStateWriter中
channelStateWriter.addInputData(
checkpointId,
channelInfo,
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
CloseableIterator.fromList(inflightBuffers, Buffer::recycleBuffer));
}
}
本人分析到这个位置的时候有个疑问:RemoteInputChannel
有spillInflightBuffers
方法,可以将所有input channel的inflight buffer写入checkpoint,那么为何还需要ThreadSafeUnaligner
的notifyBufferReceived
,每个buffer到来的时候都立刻写入checkpoint?
本人思考下觉得应该是这么个理由,如果有错误希望各位读者纠正:
如果checkpoint时,barrier还没有进入某个input channel的缓存,首先调用spillInflightBuffers
将该channel所有的buffer写入checkpoint。那么后续到来的数据buffer交给notifyBufferReceived
方法写入checkpoint,直到barrier进入到input缓存中为止。
takeSnapshotSync
方法,负责写入checkpoint数据。方法内容如下:
private boolean takeSnapshotSync(
Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointMetrics checkpointMetrics,
CheckpointOptions checkpointOptions,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled) throws Exception {
// 遍历所有的operator,如果发现有operator已关闭,拒绝执行checkpoint
for (final StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
if (operatorWrapper.isClosed()) {
env.declineCheckpoint(checkpointMetaData.getCheckpointId(),
new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING));
return false;
}
}
long checkpointId = checkpointMetaData.getCheckpointId();
long started = System.nanoTime();
// 从channelStateWriter获取channelStateWriteResult
ChannelStateWriteResult channelStateWriteResult = checkpointOptions.isUnalignedCheckpoint() ?
channelStateWriter.getAndRemoveWriteResult(checkpointId) :
ChannelStateWriteResult.EMPTY;
// 根据配置项中的checkpoint目标路径创建用于持久化checkpoint数据的CheckpointStreamFactory
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());
// 触发持久化checkpoint操作
try {
for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
if (!operatorWrapper.isClosed()) {
operatorSnapshotsInProgress.put(
operatorWrapper.getStreamOperator().getOperatorID(),
buildOperatorSnapshotFutures(
checkpointMetaData,
checkpointOptions,
operatorChain,
operatorWrapper.getStreamOperator(),
isCanceled,
channelStateWriteResult,
storage));
}
}
} finally {
checkpointStorage.clearCacheFor(checkpointId);
}
LOG.debug(
"{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms",
taskName,
checkpointId,
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
checkpointMetrics.setSyncDurationMillis((System.nanoTime() - started) / 1_000_000);
return true;
}
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。
网友评论