FLINK CDC 源码
时序文件在 https://www.processon.com/view/623d93751efad40756c5ab8b
SourceEvent
SuspendBinlogReaderEvent
FinishedSnapshotSplitsRequestEvent
-
WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.BINLOG_READER))
WakeUpTarget: BINLOG_READER / SNAPSHOT_READER
FinishedSnapshotSplitsReportEvent
FinishedSnapshotSplitsAckEvent
BinlogSplitMetaRequestEvent
BinlogSplitMetaEvent
SuspendBinlogReaderAckEvent
LatestFinishedSplitsSizeRequestEvent
LatestFinishedSplitsSizeEvent
SourceSplit
SplitEnumerator
-
SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable, CheckpointListener
-
功能
-
产生 splits 供 SourceReader 进行去读
-
分配 split 给 SourceReader
-
-
start()
-
handleSplitRequest(int subtaskId, @Nullable String requesterHostname)
处理 reader 的 split 请求,请求是通过SourceReaderContext#sendSplitRequest()
发出的 -
addSplitsBack(List<SplitT> splits, int subtaskId)
仅仅在当某个 SourceReader 失败时,会从上一个成功的 checkpoint 把失败 taskid 的 SourceReader 对应的 split 集合放回 SplitEnumerater。 -
void addReader(int subtaskId)
添加一个新的 reader -
CheckpointT snapshotState(long checkpointId) throws Exception
-
void close() throws IOException
-
default void notifyCheckpointComplete(long checkpointId) throws Exception {}
-
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}
处理来自 SourceReader 的请求,一般情况下不需要覆盖次方法,只有 Reader 和 Enumerate 有某种约定的时候才需要重写次方法,通常 SourceReader 的新增和 SourceReader 请求 Split 都不会走这个方法,新增 Reader 走的是addReader
请求 Split 走的是handleSplitRequest
MySqlSourceEnumerator
-
MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, PendingSplitsState>
-
start()
splitAssigner.open()
-
suspendBinlogReaderIfNeed()
判断 Assigner 的状态是否为AssignerStatus#SUSPENDED
如果是则从SplitEnumeratorContext
获取所有的 SoruceReader 每个下发一个SuspendBinlogReaderEvent
时间阻塞 binlog 读取, 将binlogReaderIsSuspended = true
设置为 true 默认是 false - 周期调度
- 定期去获取注册的 SourceReader,
- 并且判断 splitAssigner 是否还有未上报完成的 split 如果有 给每个 subtask 下发一个
FinishedSnapshotSplitsRequestEvent
要求 reader 上报 finish 状态. -
suspendBinlogReaderIfNeed()
逻辑同上 -
wakeupBinlogReaderIfNeed
判断 Assigner 的状态是否为AssignerStatus#INITIAL_ASSIGNING_FINISHED || NEWLY_ADDED_ASSIGNING_FINISHED
如果完成并且之前binlogReaderIsSuspended = true
则给 subtask 发送唤醒 binlog 事件WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.BINLOG_READER))
-
handleSplitRequest(int subtaskId, @Nullable String requesterHostname)
- 判断这个 subtaskid 是否注册了,如果注册了,则将 subtaskId 添加到
readersAwaitingSplit = new TreeSet<>()
中, -
assignSplits()
- 循环 readersAwaitingSplit 拿出 taskid 判断 taskid 是否注册了,如果没注册就移除掉,如果注册了就从
splitAssigner.getNext()
获取一个 split - 如果还有 split 则
context.assignSplit(SplitT split, int subtask)
把 split 分发下去. 并且把获得 split 的 taskid 移除掉。 - 如果没有 split 则退出循环
- 循环 readersAwaitingSplit 拿出 taskid 判断 taskid 是否注册了,如果没注册就移除掉,如果注册了就从
- 判断这个 subtaskid 是否注册了,如果注册了,则将 subtaskId 添加到
-
addSplitsBack(List<MySqlSplit> splits, int subtaskId)
-
splitAssigner.addSplits(splits);
把恢复的 split 信息添加到 splitAssigner
-
-
addReader(int subtaskId)
空实现 -
handleSourceEvent(int subtaskId, SourceEvent sourceEvent)
- 处理四种 event
FinishedSnapshotSplitsReportEvent
、BinlogSplitMetaRequestEvent
、SuspendBinlogReaderAckEvent
、LatestFinishedSplitsSizeRequestEvent
-
FinishedSnapshotSplitsReportEvent
: 把完成的 split 告诉 Assigner,尝试唤醒阻塞的 binlogReaderwakeupBinlogReaderIfNeed
同上- 发送
FinishedSnapshotSplitsAckEvent
告诉相应的 Reader 已经收到 split 完结消息
- 发送
-
BinlogSplitMetaRequestEvent
:- 第一次
List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta
为空, 初始值为splitAssigner.getFinishedSplitInfos()
- binlogSplitMeta 的序号就是 MetaGroupId
- 把 MetaGroupId 对应的
List<FinishedSnapshotSplitInfo>
封装成BinlogSplitMetaEvent
发送给 Reader.
- 第一次
-
SuspendBinlogReaderAckEvent
: reader 告诉 enumerate 已经收到 binlog 阻塞消息,-
splitAssigner.wakeup()
唤醒 splitAssigner - 如果 Assigner 是
MySqlHybridSplitAssigner
则给 Reader 发送WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER)
消息进行获取 split 等
-
-
LatestFinishedSplitsSizeRequestEvent
: 如果 Assigner 是MySqlHybridSplitAssigner
则给 Reader 发送LatestFinishedSplitsSizeEvent
告诉 Reader 完成了多少 split
- 处理四种 event
-
PendingSplitsState snapshotState(long checkpointId)
splitAssigner.snapshotState(checkpointId)
-
notifyCheckpointComplete(long checkpointId)
splitAssigner.notifyCheckpointComplete(checkpointId);
-
assignSplits()
同上给 reader 发送 split 如果有的话
-
close()
splitAssigner.close()
MySqlSplitAssigner
- split 的产生 和分配
-
open()
在MySqlSourceEnumerator#start
方法中调用 -
Optional<MySqlSplit> getNext()
在MySqlSourceEnumerator#assignSplits
方法中调用 获取以一个 split -
boolean waitingForFinishedSplits()
是否存在有未上报完成的 split, 如果有的化MySqlSourceEnumerator
会给 reader 发送上报 split 完成的消息 -
List<FinishedSnapshotSplitInfo> getFinishedSplitInfos();
获取已经完成的 split -
void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets)
在MySqlSourceEnumerator#handleSourceEvent
处理FinishedSnapshotSplitsReportEvent
时调用 -
addSplits(Collection<MySqlSplit> splits)
当 reader 处理失败,有 split 需要重处理时由MySqlSourceEnumerator#addSplitsBack
被调用 PendingSplitsState snapshotState(long checkpointId);
notifyCheckpointComplete(long checkpointId);
AssignerStatus getAssignerStatus()
-
suspend()
在AssignerStatus#INITIAL_ASSIGNING_FINISHED 或 AssignerStatus#NEWLY_ADDED_ASSIGNING_FINISHED
下挂起 Assigner -
wakeup()
在AssignerStatus#SUSPENDED
下唤醒 Assigner ,在MySqlSourceEnumerator#handleSourceEvent
处理SuspendBinlogReaderAckEvent
时调用 -
close()
在MySqlSourceEnumerator#close
中调用
-
MySqlBinlogSplitAssigner
- 构造方法有两个
-
MySqlBinlogSplitAssigner(MySqlSourceConfig sourceConfig)
直接 new的boolean isBinlogSplitAssigned = false
-
MySqlBinlogSplitAssigner(MySqlSourceConfig sourceConfig, BinlogPendingSplitsState checkpoint)
从MySqlSource#restoreEnumerator
中恢复boolean isBinlogSplitAssigned = checkpoint.isBinlogSplitAssigned()
-
-
Optional<MySqlSplit> getNext()
- 如果
isBinlogSplitAssigned true
则返回空 - 否则通过
createBinlogSplit()
创建一个MySqlBinlogSplit
并将isBinlogSplitAssigned = true;
- 如果
waitingForFinishedSplits() { return false; }
List<FinishedSnapshotSplitInfo> getFinishedSplitInfos(){ return Collections.EMPTY_LIST; }
addSplits(Collection<MySqlSplit> splits){ isBinlogSplitAssigned = false; }
snapshotState(long checkpointId){ return new BinlogPendingSplitsState(isBinlogSplitAssigned); }
AssignerStatus getAssignerStatus(){ return AssignerStatus.INITIAL_ASSIGNING_FINISHED }
MySqlSnapshotSplitAssigner
-
final boolean isRemainingTablesCheckpointed
貌似一直都为true -
final List<TableId> alreadyProcessedTables
-
final List<TableId> remainingTables
-
final List<MySqlSnapshotSplit> remainingSplits
-
final Map<String, MySqlSnapshotSplit> assignedSplits
-
Map<String, BinlogOffset> splitFinishedOffsets
-
open()
- 初始化 chunkSplitter
- 发现新表
- 起一个异步线程对
remainingTables
的表进行 split 切分 - 把切分好的 splits 放入
remainingSplits
- 把切分好的 tableId 从
remainingTables
移除
-
Optional<MySqlSplit> getNext()
- 如果
remainingSplits
有数据,有现成的 split- 从
remainingSplits
里移除一个 split - 放到
assignedSplits
里面 - 把 tableId 放到
alreadyProcessedTables
里面
- 从
- 如果
remainingSplits
没有数据,则等待,等被 notify 之后就机继续调用getNext()
- 否则就代表表需要切分 split,把异步线程销毁掉,返回
Optional.empty()
- 如果
-
boolean waitingForFinishedSplits()
remainingTables.isEmpty() && remainingSplits.isEmpty() && assignedSplits.size() == splitFinishedOffsets.size()
-
List<FinishedSnapshotSplitInfo> getFinishedSplitInfos()
- 如果上面
waitingForFinishedSplits()
不为 true 则报错 - 从
assignedSplits
里面获取所有的 split - 从
splitFinishedOffsets
里面获取每个 split 的BinlogOffset
- 根据
MySqlSnapshotSplit
和BinlogOffset
进行包装成FinishedSnapshotSplitInfo
- 如果上面
-
onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets)
- 当有reader 上报 split 完成时 会把上报的 split 信息存入
splitFinishedOffsets.putAll(splitFinishedOffsets)
- 如果所有的split 都上报完成了 并且 assigner 的状态为
assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING;
- 如果并行度为1 则设置
assignerStatus
为INITIAL_ASSIGNING_FINISHED
或者NEWLY_ADDED_ASSIGNING_FINISHED
- 如果并行度不为1 则要等到
notifyCheckpointComplete
的时候再设置
- 当有reader 上报 split 完成时 会把上报的 split 信息存入
-
addSplits(Collection<MySqlSplit> splits)
把恢复的 split 加入remainingSplits
并且从assignedSplits
里移除splitFinishedOffsets
里也移除 -
SnapshotPendingSplitsState snapshotState(long checkpointId)
- 对现有的一些变量封装成
SnapshotPendingSplitsState
- 如果所有的split 都上报完成了 并且 assigner 的状态为
assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING
- 并且
checkpointIdToFinish == null
则设置checkpointIdToFinish = checkpointId
- 对现有的一些变量封装成
-
notifyCheckpointComplete(long checkpointId
- 如果所有的split 都上报完成了 并且 assigner 的状态为
assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING
- 并且
checkpointId >= checkpointIdToFinish
- 设置
assignerStatus
为INITIAL_ASSIGNING_FINISHED
或者NEWLY_ADDED_ASSIGNING_FINISHED
- 如果所有的split 都上报完成了 并且 assigner 的状态为
-
AssignerStatus getAssignerStatus() { return assignerStatus;}
-
suspend()
检测并设置SUSPENDED
-
wakeup()
检测并设置NEWLY_ADDED_ASSIGNING
MySqlHybridSplitAssigner
final int splitMetaGroupSize; 来源于 chunk-meta.group.size 这个配置 默认是 1000
boolean isBinlogSplitAssigned;
final MySqlSnapshotSplitAssigner snapshotSplitAssigner;
-
open()
同MySqlSnapshotSplitAssigner
-
Optional<MySqlSplit> getNext()
- 如果
snapshotSplitAssigner.getAssignerStatus() 状态是 SUSPENDED
则返回Optional.empty()
- 如果
snapshotSplitAssigner.noMoreSplits(){remainingTables.isEmpty() && remainingSplits.isEmpty()}
- 如果
isBinlogSplitAssigned
为真 则返回Optional.empty()
- 如果状态是
INITIAL_ASSIGNING_FINISHED
isBinlogSplitAssigned = true;
- 创建返回
MySqlBinlogSplit
- 如果状态是
NEWLY_ADDED_ASSIGNING_FINISHED
isBinlogSplitAssigned = true;
- 返回
Optional.empty()
- 否则返回
Optional.empty()
- 如果
- 如果有 table 还要处理
return snapshotSplitAssigner.getNext();
- 如果
-
boolean waitingForFinishedSplits()
同MySqlSnapshotSplitAssigner
-
List<FinishedSnapshotSplitInfo> getFinishedSplitInfos()
同MySqlSnapshotSplitAssigner
-
onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets)
同MySqlSnapshotSplitAssigner
-
addSplits(Collection<MySqlSplit> splits)
- 对于
MySqlSnapshotSplit
的split 处理逻辑同MySqlSnapshotSplitAssigner
- 对于
MySqlBinlogSplit
的split 处理逻辑同MySqlBinlogSplitAssigner
- 对于
-
SnapshotPendingSplitsState snapshotState(long checkpointId)
- 对现有的一些变量封装成
HybridPendingSplitsState
- 对现有的一些变量封装成
-
notifyCheckpointComplete(long checkpointId
同MySqlSnapshotSplitAssigner
-
AssignerStatus getAssignerStatus()
同MySqlSnapshotSplitAssigner
-
suspend()
同MySqlSnapshotSplitAssigner
-
wakeup()
同MySqlSnapshotSplitAssigner
createBinlogSplit 异同
private MySqlBinlogSplit createBinlogSplit() {
try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) {
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
currentBinlogOffset(jdbc),
BinlogOffset.NO_STOPPING_OFFSET,
new ArrayList<>(),
new HashMap<>(),
0);
} catch (Exception e) {
throw new FlinkRuntimeException("Read the binlog offset error", e);
}
}
private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSnapshotSplit> assignedSnapshotSplit =
snapshotSplitAssigner.getAssignedSplits().values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
Map<String, BinlogOffset> splitFinishedOffsets =
snapshotSplitAssigner.getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
BinlogOffset minBinlogOffset = null;
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
minBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset));
}
// the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and
// then transfer them
boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
BinlogOffset.NO_STOPPING_OFFSET,
divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
new HashMap<>(),
finishedSnapshotSplitInfos.size());
}
SourceReader
-
负责读取 SplitEnumerator 分配的 SourceSplit
-
start()
-
InputStatus pollNext(ReaderOutput<T> output)
-
实现必须确保此方法是非阻塞的。
-
尽管实现可以将多条记录发送到给定的SourceOutput中,但它是
建议不要这样做。相反,将一条记录发送到SourceOutput并返回一个{@link
InputStatus#MORE_AVAILABLE}让调用者线程知道有更多可用记录
-
-
List<SplitT> snapshotState(long checkpointId)
-
CompletableFuture<Void> isAvailable()
- Future 标识 reader 有可用的数据
- 当 Future 完成时,flink 会继续 调用 pollNext 直到 pollNext 返回一个 非 MORE_AVAILABLE 状态
-
addSplits(List<SplitT> splits)
- 给 reader 添加 split, 当 SplitEnumeratorContext#assignSplit(SourceSplit, int) 调用时就会触发此方法
-
void notifyNoMoreSplits();
- 当SplitEnumeratorContext#signalNoMoreSplits(int) 调用时会触发此方法来通知 reader 在未来将不会接收到其他的 split
-
default void handleSourceEvents(SourceEvent sourceEvent) {}
- 处理来自SplitEnumerator 的时间,事件通过SplitEnumeratorContext#sendEventToSourceReader(int, SourceEvent) 来发送
-
default void notifyCheckpointComplete(long checkpointId) {}
InputStatus
-
异步数据可用性的状态
-
MORE_AVAILABLE : 有数据可用
-
NOTHING_AVAILABLE: 意味着此刻没有数据可用,不代表将来没有可用的数据,当有数据可用时通常会通知发出新的数据可用通知
-
END_OF_RECOVERY: 表明数据成功恢复
-
END_OF_INPUT: 数据不再可用,已经结束了
SourceReaderBase
-
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue
- 缓存 fetcher 线程获取的数据
-
Map<String, SplitContext<T, SplitStateT>> splitStates
-
RecordEmitter<E, T, SplitStateT> recordEmitter 发射器,对SplitReaders读到的数据进行输出
-
SplitFetcherManager<E, SplitT> splitFetcherManager
- fetcher manager 来运行 split fetchers
-
RecordsWithSplitIds<E> currentFetch 从SplitReader 里面获取的最新的一批数据
-
boolean noMoreSplitsAssignment 是否将为SourceReader分配更多 split
-
InputStatus pollNext(ReaderOutput<T> output)
- 当 currentFetch 为空则尝试 getNextFetch
- getNextFetch
- 从 elementsQueue里面拿一个 split
- 如果 elementsQueue 里面拿到的 null 或者 将对列拿到的 split moveToNextSplit 返回 false 则 getNextFetch 返回 null
- moveToNextSplit
- 如果 split 的 nextSplitId == null 说明 Current fetch is finished
- 把 currentFetch,currentSplitContext, currentSplitOutput 置空
- 从 split 中获取 finishedSplits
- 在 state 中移除 splitStates
- 释放 output
- 调用子类 的 onSplitFinished
- fetch.recycle()
- 返回 false
- 如果不为空则从 splitStates 中拿出 currentSplitContext
- 设置 currentSplitContext,创建新的 currentSplitOutput
- 返回 true
- 如果 split 的 nextSplitId == null 说明 Current fetch is finished
- 否则用对列中拿到的 fetch 置换 currentFetch
- 返回 currentFetch
- moveToNextSplit
- getNextFetch
- 如果 getNextFetch 为空
- 如果 noMoreSplitsAssignment == false || splitFetcherManager 的 fetch 还有未完成的
- 返回 InputStatus.NOTHING_AVAILABLE
- 如果elementsQueue 为空 返回 InputStatus.END_OF_INPUT
- 否则返回 InputStatus.MORE_AVAILABLE
- 如果 noMoreSplitsAssignment == false || splitFetcherManager 的 fetch 还有未完成的
- 如果 currentFetch 或者 getNextFetch 不为空
- 从 fetch 里面取一个 record
- record 不为空 则把 record 发射出去 返回 InputStatus.MORE_AVAILABLE
- 如果 moveToNextSplit 为 false 则 递归调用 pollNext
- 否则 moveToNextSplit 为真 ,怎么退出循环???????????????????????????????????????????
- 从 fetch 里面取一个 record
- 当 currentFetch 为空则尝试 getNextFetch
-
CompletableFuture<Void> isAvailable()
- currentFetch != null 返回 FutureCompletingBlockingQueue.AVAILABLE 否则 elementsQueue.getAvailabilityFuture()
-
List<SplitT> snapshotState(long checkpointId)
- 对 splitStates 的 SplitT 进行存储
-
addSplits(List<SplitT> splits)
- 把 splits 放入 splitStates
- 把 splits 放入 splitFetcherManager 启动 fetch
-
notifyNoMoreSplits()
- noMoreSplitsAssignment = true
- 没有 split 的时候对列是可用的 设置对列的 future 为 AVAILABLE 并完成 future
SingleThreadMultiplexSourceReaderBase
- SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT>
MySqlSourceReader
-
MySqlSourceReader<T> extends SingleThreadMultiplexSourceReaderBase< SourceRecord, T, MySqlSplit, MySqlSplitState>
-
Map<String, MySqlSnapshotSplit> finishedUnackedSplits = new HashMap<>();
-
Map<String, MySqlBinlogSplit> uncompletedBinlogSplits = new HashMap<>();
-
MySqlBinlogSplit suspendedBinlogSplit = null;
-
splitFetcherManager 是 SingleThreadFetcherManager
-
start()
- 当 splitStates 为 0 的时候请求 SplitEnumerator 获取 split
-
MySqlSplitState initializedState(MySqlSplit split)
- 根据 split 的类型进行包装成 MySqlSnapshotSplitState 或者 MySqlBinlogSplitState
-
snapshotState(long checkpointId)
- 调用 SourceReaderBase 的 snapshotState 方法返回 stateSplits
- stateSplits 中过滤出不在 finishedUnackedSplits 集合中的 得到 unfinishedSplits
- 然后 又把 finishedUnackedSplits 添加到 unfinishedSplits 这里先排除后加入,可能是优先 finishedUnackedSplits 的一个逻辑
- uncompletedBinlogSplits 也添加到 unfinishedSplits
- 如果 suspendedBinlogSplit 不为空也添加到 unfinishedSplits
- 返回 unfinishedSplits
-
onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds)
- 在 SourceReaderBase finishCurrentFetch 时被调用
- 对 finishedSplitIds 进行遍历
- 如果是 isBinlogSplit
- 代表 由于新添加了表,binlog split reader 已暂停
- MySqlSourceReaderContext.stopBinlogSplitReader = false
- 把这个 binlogSplit 设置成 suspendedBinlogSplit 。 在 snapshotState 加入 state
- 发送 SuspendBinlogReaderAckEvent 给到 SplitEnumerator 标识已收到暂停 binlog
- 如果不是 BinlogSplit
- 把完成的 mySqlSplit 加入 finishedUnackedSplits
- 如果是 isBinlogSplit
- 把 finishedUnackedSplits 的所有 split 的高水位 BinlogOffset 信息 包装成 FinishedSnapshotSplitsReportEvent 报告给 SplitEnumerator
- 发送请求 split 请求 sendSplitRequest
-
addSplits(List<MySqlSplit> splits)
- 遍历split
- 如果 isSnapshotSplit
- 如果 split 完成 ; highWatermark != null SnapshotSplit 拿到了高水位就说明搞完了
- 把 split 加入 finishedUnackedSplits
- 否则就加入临时变量 unfinishedSplits
- 如果 split 完成 ; highWatermark != null SnapshotSplit 拿到了高水位就说明搞完了
- 如果 isBinlogSplit
- isSuspended
- suspendedBinlogSplit = binlogSplit
- 如果 BinlogSplit 里面的 totalFinishedSplitSize != finishedSnapshotSplitInfos.size()
- 把 binlogSplit 加入 uncompletedBinlogSplits
- requestBinlogSplitMetaIfNeeded
-
如果BinlogSplit 里面的 totalFinishedSplitSize != finishedSnapshotSplitInfos.size()
-
- isSuspended
RecordsWithSplitIds
fetchers 和 source reader 之间数据传递的接口
- String nextSplit() 下一个 split, 获取第一个 split 时 也需要调用此方法,如果没有 split 返回 null
- E nextRecordFromSplit() 获取 split 的下一个 record, 返回 null 代表 split 数据读取完成
- Set<String> finishedSplits() 完成的 split
- default void recycle() {} 当这个 split 的记录都已发出时将会被调用,可用做对 split 进行重置。
MySqlRecords
- private String splitId
- MySqlRecords implements RecordsWithSplitIds<SourceRecord>
- Iterator<SourceRecord> recordsForCurrentSplit
- Iterator<SourceRecord> recordsForSplit
- final Set<String> finishedSnapshotSplits
MySqlSplitState
- final MySqlSplit split
MySqlBinlogSplitState
- MySqlBinlogSplitState extends MySqlSplitState
- private BinlogOffset startingOffset
- private BinlogOffset endingOffset
- Map<TableId, TableChange> tableSchemas
MySqlSnapshotSplitState
- MySqlSnapshotSplitState extends MySqlSplitState
- private BinlogOffset highWatermark;
SourceSplit
- String splitId()
MySqlSplit
- MySqlSplit implements SourceSplit
- Map<TableId, TableChanges.TableChange> getTableSchemas()
MySqlBinlogSplit
- MySqlBinlogSplit extends MySqlSplit
- final BinlogOffset startingOffset;final BinlogOffset endingOffset;
- final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;
- final Map<TableId, TableChange> tableSchemas;
- final int totalFinishedSplitSize;final boolean isSuspended;
- transient byte[] serializedFormCache;
MySqlSnapshotSplit
- MySqlSnapshotSplit extends MySqlSplit
- final TableId tableId
- final RowType splitKeyType;
- final Map<TableId, TableChange> tableSchemas;
- private final Object[] splitStart;
- private final Object[] splitEnd;
- private final BinlogOffset highWatermark;
- transient byte[] serializedFormCache;
SplitReader
SplitFetcher
Source
- Source<T, SplitT extends SourceSplit, EnumChkT>
- 用来创建
SplitEnumerator
、SourceReader
、 序列化器 - T : source 发出去的数据类型
- SplitT: source 处理的 split 类型
- EnumChkT: enumerator checkpoints 的数据类型
- 用来创建
- Boundedness getBoundedness()
- SourceReader<T, SplitT> createReader(SourceReaderContext readerContext)
- 创建 reader
- SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext)
- 创建 SplitEnumerator
- SplitEnumerator<SplitT, EnumChkT> restoreEnumerator( SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint)
- 从 checkpoint 中恢复 SplitEnumerator
- SimpleVersionedSerializer<SplitT> getSplitSerializer()
- 创建 split 的序列化器
- SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer()
- 创建 SplitEnumerator checkpoint 数据的序列化器
MySqlSource
- MySqlSource<T> implements Source<T, MySqlSplit, PendingSplitsState>,ResultTypeQueryable<T>
- SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
- 创建 MySqlSourceReader 里面包含
- FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>>
- MySqlSplitReader
- MySqlSourceConfig
- MySqlSourceReaderContext
- SourceReaderContext
- MySqlRecordEmitter
- DebeziumDeserializationSchema
- MySqlSourceReaderMetrics
- MySqlSourceReaderContext
- MySqlSourceConfig
- 创建 MySqlSourceReader 里面包含
- SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator( SplitEnumeratorContext<MySqlSplit> enumContext)
- 创建 splitAssigner
- 如果 StartupMode.INITIAL
- discoverCapturedTables 获取要同步的表
- 创建 MySqlHybridSplitAssigner
- 创建 MySqlBinlogSplitAssigner
- 如果 StartupMode.INITIAL
- 创建 MySqlSourceEnumerator 里面包含
- SplitEnumeratorContext<MySqlSplit>
- MySqlSourceConfig
- MySqlSplitAssigner
- 创建 splitAssigner
- SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator( SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint)
- 根据 checkpoint 数据来恢复 splitAssigner
- HybridPendingSplitsState -> MySqlHybridSplitAssigner
- BinlogPendingSplitsState -> MySqlBinlogSplitAssigner
- new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner)
- 根据 checkpoint 数据来恢复 splitAssigner
- SimpleVersionedSerializer<MySqlSplit> getSplitSerializer()
- MySqlSplitSerializer.INSTANCE
- SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer()
- new PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE))
WatermarkOutput
- emitWatermark(Watermark watermark)
- 发出水印也会隐式地将流标记为 active,结束之前标记为空闲状态
- markIdle()
- 将此输出标记为空闲,这意味着下游操作不需要等待来自此 output 的水印。
SourceOutput
-
SourceOutput<T> extends WatermarkOutput
-
对 SourceReader 产生的数据进行发送 一个 SourceReader 可以有多个 SourceOutputs, 每 SourceOutputs 作用于一个 Source Splits, 所以不同的 split 流可以被区别对待,例如水印生成或事件时间偏移处理
-
void collect(T record)
- 如果源系统没有时间戳的概念,请使用此方法
- 事件可以通过{@link TimestampAssigner} 产生时间附加到记录上。例如,JOSN 格式的记录,在JSON解析过程中没有通用的时间戳,因此可以使用这方法初始化生成一个没有时间戳的记录。 在下一步中,将使用 TimestampAssigner 从JSON对象的字段中提取时间戳。
-
void collect(T record, long timestamp)
- 如果源系统有时间戳的概念。典型例子可以是日志、PubSub或消息队列,比如Kafka或Kinesis,它们自带时间戳
ReaderOutput
- ReaderOutput<T> extends SourceOutput<T>
- 对于只有一个 split 或者不需要切 split 的 SourceReader 建议使用这种方法
- 对于大多数的流,有 split 的场景,应该 split 分配特定的 SourceOutput,用来处理每个 split watermark等的生成逻辑
- 可以通过 ReaderOutput#createOutputForSplit(String)来给一个 split 创建一个ReaderOutput,当 source 处理完整个 split 的时候要确保释放这个ReaderOutput
- void collect(T record);
- void collect(T record, long timestamp);
- void emitWatermark(Watermark watermark);
- void markIdle();
- SourceOutput<T> createOutputForSplit(String splitId)
- 给 Source Split 创建特定的 SourceOutput 如果已经给 splitId 分配过 SourceOutput 就会直接返回之前创建的 SourceOutput,所以一个 splitId 只有一个 SourceOutput ,当 split 完成后需要释放 SourceOutput,否则,它将继续像一个永远停滞的 source split 一样,生产 watermark ,并可能无限期地抑制 watermark
- void releaseOutputForSplit(String splitId); 释放 SourceOutput
MySqlSplitSerializer
- 序列化和反序列化 MySqlSplit
- implements SimpleVersionedSerializer<MySqlSplit>
- public byte[] serialize(MySqlSplit split)
- MySqlSplit deserialize(int version, byte[] serialized)
SplitEnumeratorContext
PendingSplitsState
BinlogPendingSplitsState
private final boolean isBinlogSplitAssigned;
SnapshotPendingSplitsState
final List<TableId> remainingTables;
final List<TableId> alreadyProcessedTables
final List<MySqlSnapshotSplit> remainingSplits
final Map<String, MySqlSnapshotSplit> assignedSplits
final Map<String, BinlogOffset> splitFinishedOffsets
AssignerStatus assignerStatus
HybridPendingSplitsState
-
BinlogPendingSplitsState
与SnapshotPendingSplitsState
的混合 -
final SnapshotPendingSplitsState snapshotPendingSplits
-
final boolean isBinlogSplitAssigned
ChunkSplitter
AssignerStatus
-
AssignerStatus
状态流转
/**
* The state of split assigner finite state machine, tips: we use word status instead of word state
* to avoid conflict with Flink state keyword. The assigner finite state machine goes this way.
*
* <pre>
* INITIAL_ASSIGNING(start)
* |
* |
* onFinish()
* |
* ↓
* INITIAL_ASSIGNING_FINISHED(end)
* |
* |
* suspend() // found newly added tables
* |
* ↓
* SUSPENDED --- wakeup() --→ NEWLY_ADDED_ASSIGNING --- onFinish() --→ NEWLY_ADDED_ASSIGNING_FINISHED(end)
* ↑ |
* | |
* |----------------- suspend() //found newly added tables -----------|
* </pre>
*/
网友评论