美文网首页
FLINK CDC 源码 & 时序图 一

FLINK CDC 源码 & 时序图 一

作者: loukey_j | 来源:发表于2022-03-24 14:09 被阅读0次

    FLINK CDC 源码
    时序文件在 https://www.processon.com/view/623d93751efad40756c5ab8b

    FLINK CDC 源码时序图-对外.png

    SourceEvent

    • SuspendBinlogReaderEvent
    • FinishedSnapshotSplitsRequestEvent
    • WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.BINLOG_READER)) WakeUpTarget: BINLOG_READER / SNAPSHOT_READER
    • FinishedSnapshotSplitsReportEvent
    • FinishedSnapshotSplitsAckEvent
    • BinlogSplitMetaRequestEvent
    • BinlogSplitMetaEvent
    • SuspendBinlogReaderAckEvent
    • LatestFinishedSplitsSizeRequestEvent
    • LatestFinishedSplitsSizeEvent

    SourceSplit

    SplitEnumerator

    • SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable, CheckpointListener

    • 功能

      • 产生 splits 供 SourceReader 进行去读

      • 分配 split 给 SourceReader

    • start()

    • handleSplitRequest(int subtaskId, @Nullable String requesterHostname) 处理 reader 的 split 请求,请求是通过 SourceReaderContext#sendSplitRequest() 发出的

    • addSplitsBack(List<SplitT> splits, int subtaskId) 仅仅在当某个 SourceReader 失败时,会从上一个成功的 checkpoint 把失败 taskid 的 SourceReader 对应的 split 集合放回 SplitEnumerater。

    • void addReader(int subtaskId) 添加一个新的 reader

    • CheckpointT snapshotState(long checkpointId) throws Exception

    • void close() throws IOException

    • default void notifyCheckpointComplete(long checkpointId) throws Exception {}

    • default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {} 处理来自 SourceReader 的请求,一般情况下不需要覆盖次方法,只有 Reader 和 Enumerate 有某种约定的时候才需要重写次方法,通常 SourceReader 的新增和 SourceReader 请求 Split 都不会走这个方法,新增 Reader 走的是 addReader 请求 Split 走的是 handleSplitRequest

    MySqlSourceEnumerator

    • MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, PendingSplitsState>

    • start()

      • splitAssigner.open()
      • suspendBinlogReaderIfNeed() 判断 Assigner 的状态是否为 AssignerStatus#SUSPENDED 如果是则从 SplitEnumeratorContext 获取所有的 SoruceReader 每个下发一个 SuspendBinlogReaderEvent 时间阻塞 binlog 读取, 将 binlogReaderIsSuspended = true 设置为 true 默认是 false
      • 周期调度
        • 定期去获取注册的 SourceReader,
        • 并且判断 splitAssigner 是否还有未上报完成的 split 如果有 给每个 subtask 下发一个 FinishedSnapshotSplitsRequestEvent 要求 reader 上报 finish 状态.
        • suspendBinlogReaderIfNeed()逻辑同上
        • wakeupBinlogReaderIfNeed 判断 Assigner 的状态是否为 AssignerStatus#INITIAL_ASSIGNING_FINISHED || NEWLY_ADDED_ASSIGNING_FINISHED 如果完成并且之前 binlogReaderIsSuspended = true 则给 subtask 发送唤醒 binlog 事件 WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.BINLOG_READER))
    • handleSplitRequest(int subtaskId, @Nullable String requesterHostname)

      • 判断这个 subtaskid 是否注册了,如果注册了,则将 subtaskId 添加到 readersAwaitingSplit = new TreeSet<>() 中,
      • assignSplits()
        • 循环 readersAwaitingSplit 拿出 taskid 判断 taskid 是否注册了,如果没注册就移除掉,如果注册了就从 splitAssigner.getNext() 获取一个 split
        • 如果还有 split 则 context.assignSplit(SplitT split, int subtask) 把 split 分发下去. 并且把获得 split 的 taskid 移除掉。
        • 如果没有 split 则退出循环
    • addSplitsBack(List<MySqlSplit> splits, int subtaskId)

      • splitAssigner.addSplits(splits); 把恢复的 split 信息添加到 splitAssigner
    • addReader(int subtaskId) 空实现

    • handleSourceEvent(int subtaskId, SourceEvent sourceEvent)

      • 处理四种 event FinishedSnapshotSplitsReportEventBinlogSplitMetaRequestEventSuspendBinlogReaderAckEventLatestFinishedSplitsSizeRequestEvent
      • FinishedSnapshotSplitsReportEvent: 把完成的 split 告诉 Assigner,尝试唤醒阻塞的 binlogReader wakeupBinlogReaderIfNeed 同上
        • 发送 FinishedSnapshotSplitsAckEvent 告诉相应的 Reader 已经收到 split 完结消息
      • BinlogSplitMetaRequestEvent:
        • 第一次 List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta 为空, 初始值为 splitAssigner.getFinishedSplitInfos()
        • binlogSplitMeta 的序号就是 MetaGroupId
        • 把 MetaGroupId 对应的 List<FinishedSnapshotSplitInfo> 封装成 BinlogSplitMetaEvent 发送给 Reader.
      • SuspendBinlogReaderAckEvent: reader 告诉 enumerate 已经收到 binlog 阻塞消息,
        • splitAssigner.wakeup() 唤醒 splitAssigner
        • 如果 Assigner 是 MySqlHybridSplitAssigner 则给 Reader 发送 WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER) 消息进行获取 split 等
      • LatestFinishedSplitsSizeRequestEvent: 如果 Assigner 是 MySqlHybridSplitAssigner 则给 Reader 发送 LatestFinishedSplitsSizeEvent 告诉 Reader 完成了多少 split
    • PendingSplitsState snapshotState(long checkpointId)

      • splitAssigner.snapshotState(checkpointId)
    • notifyCheckpointComplete(long checkpointId)

      • splitAssigner.notifyCheckpointComplete(checkpointId);
      • assignSplits() 同上给 reader 发送 split 如果有的话
    • close()

      • splitAssigner.close()

    MySqlSplitAssigner

    • split 的产生 和分配
      • open()MySqlSourceEnumerator#start 方法中调用
      • Optional<MySqlSplit> getNext()MySqlSourceEnumerator#assignSplits 方法中调用 获取以一个 split
      • boolean waitingForFinishedSplits() 是否存在有未上报完成的 split, 如果有的化MySqlSourceEnumerator 会给 reader 发送上报 split 完成的消息
      • List<FinishedSnapshotSplitInfo> getFinishedSplitInfos(); 获取已经完成的 split
      • void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets)MySqlSourceEnumerator#handleSourceEvent 处理FinishedSnapshotSplitsReportEvent 时调用
      • addSplits(Collection<MySqlSplit> splits) 当 reader 处理失败,有 split 需要重处理时由 MySqlSourceEnumerator#addSplitsBack 被调用
      • PendingSplitsState snapshotState(long checkpointId);
      • notifyCheckpointComplete(long checkpointId);
      • AssignerStatus getAssignerStatus()
      • suspend()AssignerStatus#INITIAL_ASSIGNING_FINISHED 或 AssignerStatus#NEWLY_ADDED_ASSIGNING_FINISHED 下挂起 Assigner
      • wakeup()AssignerStatus#SUSPENDED 下唤醒 Assigner ,在 MySqlSourceEnumerator#handleSourceEvent 处理 SuspendBinlogReaderAckEvent 时调用
      • close()MySqlSourceEnumerator#close 中调用

    MySqlBinlogSplitAssigner

    • 构造方法有两个
      • MySqlBinlogSplitAssigner(MySqlSourceConfig sourceConfig) 直接 new的
        • boolean isBinlogSplitAssigned = false
      • MySqlBinlogSplitAssigner(MySqlSourceConfig sourceConfig, BinlogPendingSplitsState checkpoint)MySqlSource#restoreEnumerator 中恢复
        • boolean isBinlogSplitAssigned = checkpoint.isBinlogSplitAssigned()
    • Optional<MySqlSplit> getNext()
      • 如果 isBinlogSplitAssigned true 则返回空
      • 否则通过createBinlogSplit() 创建一个MySqlBinlogSplit 并将 isBinlogSplitAssigned = true;
    • waitingForFinishedSplits() { return false; }
    • List<FinishedSnapshotSplitInfo> getFinishedSplitInfos(){ return Collections.EMPTY_LIST; }
    • addSplits(Collection<MySqlSplit> splits){ isBinlogSplitAssigned = false; }
    • snapshotState(long checkpointId){ return new BinlogPendingSplitsState(isBinlogSplitAssigned); }
    • AssignerStatus getAssignerStatus(){ return AssignerStatus.INITIAL_ASSIGNING_FINISHED }

    MySqlSnapshotSplitAssigner

    • final boolean isRemainingTablesCheckpointed 貌似一直都为true

    • final List<TableId> alreadyProcessedTables

    • final List<TableId> remainingTables

    • final List<MySqlSnapshotSplit> remainingSplits

    • final Map<String, MySqlSnapshotSplit> assignedSplits

    • Map<String, BinlogOffset> splitFinishedOffsets

    • open()

      • 初始化 chunkSplitter
      • 发现新表
      • 起一个异步线程对remainingTables 的表进行 split 切分
      • 把切分好的 splits 放入 remainingSplits
      • 把切分好的 tableId 从 remainingTables 移除
    • Optional<MySqlSplit> getNext()

      • 如果 remainingSplits 有数据,有现成的 split
        • remainingSplits 里移除一个 split
        • 放到 assignedSplits 里面
        • 把 tableId 放到 alreadyProcessedTables 里面
      • 如果remainingSplits 没有数据,则等待,等被 notify 之后就机继续调用 getNext()
      • 否则就代表表需要切分 split,把异步线程销毁掉,返回 Optional.empty()
    • boolean waitingForFinishedSplits()

      • remainingTables.isEmpty() && remainingSplits.isEmpty() && assignedSplits.size() == splitFinishedOffsets.size()
    • List<FinishedSnapshotSplitInfo> getFinishedSplitInfos()

      • 如果上面 waitingForFinishedSplits() 不为 true 则报错
      • assignedSplits 里面获取所有的 split
      • splitFinishedOffsets 里面获取每个 split 的 BinlogOffset
      • 根据 MySqlSnapshotSplitBinlogOffset 进行包装成 FinishedSnapshotSplitInfo
    • onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets)

      • 当有reader 上报 split 完成时 会把上报的 split 信息存入 splitFinishedOffsets.putAll(splitFinishedOffsets)
      • 如果所有的split 都上报完成了 并且 assigner 的状态为 assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING;
      • 如果并行度为1 则设置 assignerStatusINITIAL_ASSIGNING_FINISHED 或者 NEWLY_ADDED_ASSIGNING_FINISHED
      • 如果并行度不为1 则要等到 notifyCheckpointComplete的时候再设置
    • addSplits(Collection<MySqlSplit> splits) 把恢复的 split 加入 remainingSplits 并且从 assignedSplits 里移除 splitFinishedOffsets 里也移除

    • SnapshotPendingSplitsState snapshotState(long checkpointId)

      • 对现有的一些变量封装成 SnapshotPendingSplitsState
      • 如果所有的split 都上报完成了 并且 assigner 的状态为 assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING
      • 并且 checkpointIdToFinish == null 则设置 checkpointIdToFinish = checkpointId
    • notifyCheckpointComplete(long checkpointId

      • 如果所有的split 都上报完成了 并且 assigner 的状态为 assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING
      • 并且 checkpointId >= checkpointIdToFinish
      • 设置 assignerStatusINITIAL_ASSIGNING_FINISHED 或者 NEWLY_ADDED_ASSIGNING_FINISHED
    • AssignerStatus getAssignerStatus() { return assignerStatus;}

    • suspend() 检测并设置 SUSPENDED

    • wakeup() 检测并设置 NEWLY_ADDED_ASSIGNING

    MySqlHybridSplitAssigner

    final int splitMetaGroupSize; 来源于 chunk-meta.group.size 这个配置 默认是 1000
    boolean isBinlogSplitAssigned;
    final MySqlSnapshotSplitAssigner snapshotSplitAssigner;
    
    • open()MySqlSnapshotSplitAssigner

    • Optional<MySqlSplit> getNext()

      • 如果 snapshotSplitAssigner.getAssignerStatus() 状态是 SUSPENDED 则返回 Optional.empty()
      • 如果snapshotSplitAssigner.noMoreSplits(){remainingTables.isEmpty() && remainingSplits.isEmpty()}
        • 如果isBinlogSplitAssigned 为真 则返回 Optional.empty()
        • 如果状态是 INITIAL_ASSIGNING_FINISHED
          • isBinlogSplitAssigned = true;
          • 创建返回 MySqlBinlogSplit
        • 如果状态是 NEWLY_ADDED_ASSIGNING_FINISHED
          • isBinlogSplitAssigned = true;
          • 返回 Optional.empty()
        • 否则返回 Optional.empty()
      • 如果有 table 还要处理
        • return snapshotSplitAssigner.getNext();
    • boolean waitingForFinishedSplits()MySqlSnapshotSplitAssigner

    • List<FinishedSnapshotSplitInfo> getFinishedSplitInfos()MySqlSnapshotSplitAssigner

    • onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets)MySqlSnapshotSplitAssigner

    • addSplits(Collection<MySqlSplit> splits)

      • 对于MySqlSnapshotSplit 的split 处理逻辑同 MySqlSnapshotSplitAssigner
      • 对于MySqlBinlogSplit的split 处理逻辑同 MySqlBinlogSplitAssigner
    • SnapshotPendingSplitsState snapshotState(long checkpointId)

      • 对现有的一些变量封装成 HybridPendingSplitsState
    • notifyCheckpointComplete(long checkpointIdMySqlSnapshotSplitAssigner

    • AssignerStatus getAssignerStatus()MySqlSnapshotSplitAssigner

    • suspend()MySqlSnapshotSplitAssigner

    • wakeup()MySqlSnapshotSplitAssigner

    createBinlogSplit 异同

    private MySqlBinlogSplit createBinlogSplit() {
            try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) {
                return new MySqlBinlogSplit(
                        BINLOG_SPLIT_ID,
                        currentBinlogOffset(jdbc),
                        BinlogOffset.NO_STOPPING_OFFSET,
                        new ArrayList<>(),
                        new HashMap<>(),
                        0);
            } catch (Exception e) {
                throw new FlinkRuntimeException("Read the binlog offset error", e);
            }
        }
    
     private MySqlBinlogSplit createBinlogSplit() {
            final List<MySqlSnapshotSplit> assignedSnapshotSplit =
                    snapshotSplitAssigner.getAssignedSplits().values().stream()
                            .sorted(Comparator.comparing(MySqlSplit::splitId))
                            .collect(Collectors.toList());
    
            Map<String, BinlogOffset> splitFinishedOffsets =
                    snapshotSplitAssigner.getSplitFinishedOffsets();
            final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
    
            BinlogOffset minBinlogOffset = null;
            for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
                // find the min binlog offset
                BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
                if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                    minBinlogOffset = binlogOffset;
                }
                finishedSnapshotSplitInfos.add(
                        new FinishedSnapshotSplitInfo(
                                split.getTableId(),
                                split.splitId(),
                                split.getSplitStart(),
                                split.getSplitEnd(),
                                binlogOffset));
            }
    
            // the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and
            // then transfer them
    
            boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
            return new MySqlBinlogSplit(
                    BINLOG_SPLIT_ID,
                    minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
                    BinlogOffset.NO_STOPPING_OFFSET,
                    divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
                    new HashMap<>(),
                    finishedSnapshotSplitInfos.size());
        }
    

    SourceReader

    • 负责读取 SplitEnumerator 分配的 SourceSplit

    • start()

    • InputStatus pollNext(ReaderOutput<T> output)

      • 实现必须确保此方法是非阻塞的。

      • 尽管实现可以将多条记录发送到给定的SourceOutput中,但它是

        建议不要这样做。相反,将一条记录发送到SourceOutput并返回一个{@link

        InputStatus#MORE_AVAILABLE}让调用者线程知道有更多可用记录

    • List<SplitT> snapshotState(long checkpointId)

    • CompletableFuture<Void> isAvailable()

      • Future 标识 reader 有可用的数据
      • 当 Future 完成时,flink 会继续 调用 pollNext 直到 pollNext 返回一个 非 MORE_AVAILABLE 状态
    • addSplits(List<SplitT> splits)

      • 给 reader 添加 split, 当 SplitEnumeratorContext#assignSplit(SourceSplit, int) 调用时就会触发此方法
    • void notifyNoMoreSplits();

      • 当SplitEnumeratorContext#signalNoMoreSplits(int) 调用时会触发此方法来通知 reader 在未来将不会接收到其他的 split
    • default void handleSourceEvents(SourceEvent sourceEvent) {}

      • 处理来自SplitEnumerator 的时间,事件通过SplitEnumeratorContext#sendEventToSourceReader(int, SourceEvent) 来发送
    • default void notifyCheckpointComplete(long checkpointId) {}

    InputStatus

    • 异步数据可用性的状态

    • MORE_AVAILABLE : 有数据可用

    • NOTHING_AVAILABLE: 意味着此刻没有数据可用,不代表将来没有可用的数据,当有数据可用时通常会通知发出新的数据可用通知

    • END_OF_RECOVERY: 表明数据成功恢复

    • END_OF_INPUT: 数据不再可用,已经结束了

    SourceReaderBase

    • FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue

      • 缓存 fetcher 线程获取的数据
    • Map<String, SplitContext<T, SplitStateT>> splitStates

    • RecordEmitter<E, T, SplitStateT> recordEmitter 发射器,对SplitReaders读到的数据进行输出

    • SplitFetcherManager<E, SplitT> splitFetcherManager

      • fetcher manager 来运行 split fetchers
    • RecordsWithSplitIds<E> currentFetch 从SplitReader 里面获取的最新的一批数据

    • boolean noMoreSplitsAssignment 是否将为SourceReader分配更多 split

    • InputStatus pollNext(ReaderOutput<T> output)

      • 当 currentFetch 为空则尝试 getNextFetch
        • getNextFetch
          • 从 elementsQueue里面拿一个 split
          • 如果 elementsQueue 里面拿到的 null 或者 将对列拿到的 split moveToNextSplit 返回 false 则 getNextFetch 返回 null
            • moveToNextSplit
              • 如果 split 的 nextSplitId == null 说明 Current fetch is finished
                • 把 currentFetch,currentSplitContext, currentSplitOutput 置空
                • 从 split 中获取 finishedSplits
                • 在 state 中移除 splitStates
                • 释放 output
                • 调用子类 的 onSplitFinished
                • fetch.recycle()
                • 返回 false
              • 如果不为空则从 splitStates 中拿出 currentSplitContext
              • 设置 currentSplitContext,创建新的 currentSplitOutput
              • 返回 true
            • 否则用对列中拿到的 fetch 置换 currentFetch
            • 返回 currentFetch
      • 如果 getNextFetch 为空
        • 如果 noMoreSplitsAssignment == false || splitFetcherManager 的 fetch 还有未完成的
          • 返回 InputStatus.NOTHING_AVAILABLE
        • 如果elementsQueue 为空 返回 InputStatus.END_OF_INPUT
        • 否则返回 InputStatus.MORE_AVAILABLE
      • 如果 currentFetch 或者 getNextFetch 不为空
        • 从 fetch 里面取一个 record
          • record 不为空 则把 record 发射出去 返回 InputStatus.MORE_AVAILABLE
          • 如果 moveToNextSplit 为 false 则 递归调用 pollNext
          • 否则 moveToNextSplit 为真 ,怎么退出循环???????????????????????????????????????????
    • CompletableFuture<Void> isAvailable()

      • currentFetch != null 返回 FutureCompletingBlockingQueue.AVAILABLE 否则 elementsQueue.getAvailabilityFuture()
    • List<SplitT> snapshotState(long checkpointId)

      • 对 splitStates 的 SplitT 进行存储
    • addSplits(List<SplitT> splits)

      • 把 splits 放入 splitStates
      • 把 splits 放入 splitFetcherManager 启动 fetch
    • notifyNoMoreSplits()

      • noMoreSplitsAssignment = true
      • 没有 split 的时候对列是可用的 设置对列的 future 为 AVAILABLE 并完成 future

    SingleThreadMultiplexSourceReaderBase

    • SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT>

    MySqlSourceReader

    • MySqlSourceReader<T> extends SingleThreadMultiplexSourceReaderBase< SourceRecord, T, MySqlSplit, MySqlSplitState>

    • Map<String, MySqlSnapshotSplit> finishedUnackedSplits = new HashMap<>();

    • Map<String, MySqlBinlogSplit> uncompletedBinlogSplits = new HashMap<>();

    • MySqlBinlogSplit suspendedBinlogSplit = null;

    • splitFetcherManager 是 SingleThreadFetcherManager

    • start()

      • 当 splitStates 为 0 的时候请求 SplitEnumerator 获取 split
    • MySqlSplitState initializedState(MySqlSplit split)

      • 根据 split 的类型进行包装成 MySqlSnapshotSplitState 或者 MySqlBinlogSplitState
    • snapshotState(long checkpointId)

      • 调用 SourceReaderBase 的 snapshotState 方法返回 stateSplits
      • stateSplits 中过滤出不在 finishedUnackedSplits 集合中的 得到 unfinishedSplits
      • 然后 又把 finishedUnackedSplits 添加到 unfinishedSplits 这里先排除后加入,可能是优先 finishedUnackedSplits 的一个逻辑
      • uncompletedBinlogSplits 也添加到 unfinishedSplits
      • 如果 suspendedBinlogSplit 不为空也添加到 unfinishedSplits
      • 返回 unfinishedSplits
    • onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds)

      • 在 SourceReaderBase finishCurrentFetch 时被调用
      • 对 finishedSplitIds 进行遍历
        • 如果是 isBinlogSplit
          • 代表 由于新添加了表,binlog split reader 已暂停
          • MySqlSourceReaderContext.stopBinlogSplitReader = false
          • 把这个 binlogSplit 设置成 suspendedBinlogSplit 。 在 snapshotState 加入 state
          • 发送 SuspendBinlogReaderAckEvent 给到 SplitEnumerator 标识已收到暂停 binlog
        • 如果不是 BinlogSplit
          • 把完成的 mySqlSplit 加入 finishedUnackedSplits
      • 把 finishedUnackedSplits 的所有 split 的高水位 BinlogOffset 信息 包装成 FinishedSnapshotSplitsReportEvent 报告给 SplitEnumerator
      • 发送请求 split 请求 sendSplitRequest
    • addSplits(List<MySqlSplit> splits)

      • 遍历split
      • 如果 isSnapshotSplit
        • 如果 split 完成 ; highWatermark != null SnapshotSplit 拿到了高水位就说明搞完了
          • 把 split 加入 finishedUnackedSplits
        • 否则就加入临时变量 unfinishedSplits
      • 如果 isBinlogSplit
        • isSuspended
          • suspendedBinlogSplit = binlogSplit
        • 如果 BinlogSplit 里面的 totalFinishedSplitSize != finishedSnapshotSplitInfos.size()
          • 把 binlogSplit 加入 uncompletedBinlogSplits
          • requestBinlogSplitMetaIfNeeded
            • 如果BinlogSplit 里面的 totalFinishedSplitSize != finishedSnapshotSplitInfos.size()

    RecordsWithSplitIds

    fetchers 和 source reader 之间数据传递的接口

    • String nextSplit() 下一个 split, 获取第一个 split 时 也需要调用此方法,如果没有 split 返回 null
    • E nextRecordFromSplit() 获取 split 的下一个 record, 返回 null 代表 split 数据读取完成
    • Set<String> finishedSplits() 完成的 split
    • default void recycle() {} 当这个 split 的记录都已发出时将会被调用,可用做对 split 进行重置。

    MySqlRecords

    • private String splitId
    • MySqlRecords implements RecordsWithSplitIds<SourceRecord>
    • Iterator<SourceRecord> recordsForCurrentSplit
    • Iterator<SourceRecord> recordsForSplit
    • final Set<String> finishedSnapshotSplits

    MySqlSplitState

    • final MySqlSplit split

    MySqlBinlogSplitState

    • MySqlBinlogSplitState extends MySqlSplitState
    • private BinlogOffset startingOffset
    • private BinlogOffset endingOffset
    • Map<TableId, TableChange> tableSchemas

    MySqlSnapshotSplitState

    • MySqlSnapshotSplitState extends MySqlSplitState
    • private BinlogOffset highWatermark;

    SourceSplit

    • String splitId()

    MySqlSplit

    • MySqlSplit implements SourceSplit
    • Map<TableId, TableChanges.TableChange> getTableSchemas()

    MySqlBinlogSplit

    • MySqlBinlogSplit extends MySqlSplit
    • final BinlogOffset startingOffset;final BinlogOffset endingOffset;
    • final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;
    • final Map<TableId, TableChange> tableSchemas;
    • final int totalFinishedSplitSize;final boolean isSuspended;
    • transient byte[] serializedFormCache;

    MySqlSnapshotSplit

    • MySqlSnapshotSplit extends MySqlSplit
    • final TableId tableId
    • final RowType splitKeyType;
    • final Map<TableId, TableChange> tableSchemas;
    • private final Object[] splitStart;
    • private final Object[] splitEnd;
    • private final BinlogOffset highWatermark;
    • transient byte[] serializedFormCache;

    SplitReader

    SplitFetcher

    Source

    • Source<T, SplitT extends SourceSplit, EnumChkT>
      • 用来创建 SplitEnumeratorSourceReader、 序列化器
      • T : source 发出去的数据类型
      • SplitT: source 处理的 split 类型
      • EnumChkT: enumerator checkpoints 的数据类型
    • Boundedness getBoundedness()
    • SourceReader<T, SplitT> createReader(SourceReaderContext readerContext)
      • 创建 reader
    • SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext)
      • 创建 SplitEnumerator
    • SplitEnumerator<SplitT, EnumChkT> restoreEnumerator( SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint)
      • 从 checkpoint 中恢复 SplitEnumerator
    • SimpleVersionedSerializer<SplitT> getSplitSerializer()
      • 创建 split 的序列化器
    • SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer()
      • 创建 SplitEnumerator checkpoint 数据的序列化器

    MySqlSource

    • MySqlSource<T> implements Source<T, MySqlSplit, PendingSplitsState>,ResultTypeQueryable<T>
    • SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
      • 创建 MySqlSourceReader 里面包含
        • FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>>
        • MySqlSplitReader
          • MySqlSourceConfig
          • MySqlSourceReaderContext
            • SourceReaderContext
        • MySqlRecordEmitter
          • DebeziumDeserializationSchema
          • MySqlSourceReaderMetrics
        • MySqlSourceReaderContext
        • MySqlSourceConfig
    • SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator( SplitEnumeratorContext<MySqlSplit> enumContext)
      • 创建 splitAssigner
        • 如果 StartupMode.INITIAL
          • discoverCapturedTables 获取要同步的表
          • 创建 MySqlHybridSplitAssigner
        • 创建 MySqlBinlogSplitAssigner
      • 创建 MySqlSourceEnumerator 里面包含
        • SplitEnumeratorContext<MySqlSplit>
        • MySqlSourceConfig
        • MySqlSplitAssigner
    • SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator( SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint)
      • 根据 checkpoint 数据来恢复 splitAssigner
        • HybridPendingSplitsState -> MySqlHybridSplitAssigner
        • BinlogPendingSplitsState -> MySqlBinlogSplitAssigner
      • new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner)
    • SimpleVersionedSerializer<MySqlSplit> getSplitSerializer()
      • MySqlSplitSerializer.INSTANCE
    • SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer()
      • new PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE))

    WatermarkOutput

    • emitWatermark(Watermark watermark)
      • 发出水印也会隐式地将流标记为 active,结束之前标记为空闲状态
    • markIdle()
      • 将此输出标记为空闲,这意味着下游操作不需要等待来自此 output 的水印。

    SourceOutput

    • SourceOutput<T> extends WatermarkOutput

    • 对 SourceReader 产生的数据进行发送 一个 SourceReader 可以有多个 SourceOutputs, 每 SourceOutputs 作用于一个 Source Splits, 所以不同的 split 流可以被区别对待,例如水印生成或事件时间偏移处理

    • void collect(T record)

      • 如果源系统没有时间戳的概念,请使用此方法
      • 事件可以通过{@link TimestampAssigner} 产生时间附加到记录上。例如,JOSN 格式的记录,在JSON解析过程中没有通用的时间戳,因此可以使用这方法初始化生成一个没有时间戳的记录。 在下一步中,将使用 TimestampAssigner 从JSON对象的字段中提取时间戳。
    • void collect(T record, long timestamp)

      • 如果源系统有时间戳的概念。典型例子可以是日志、PubSub或消息队列,比如Kafka或Kinesis,它们自带时间戳

    ReaderOutput

    • ReaderOutput<T> extends SourceOutput<T>
    • 对于只有一个 split 或者不需要切 split 的 SourceReader 建议使用这种方法
    • 对于大多数的流,有 split 的场景,应该 split 分配特定的 SourceOutput,用来处理每个 split watermark等的生成逻辑
    • 可以通过 ReaderOutput#createOutputForSplit(String)来给一个 split 创建一个ReaderOutput,当 source 处理完整个 split 的时候要确保释放这个ReaderOutput
    • void collect(T record);
    • void collect(T record, long timestamp);
    • void emitWatermark(Watermark watermark);
    • void markIdle();
    • SourceOutput<T> createOutputForSplit(String splitId)
      • 给 Source Split 创建特定的 SourceOutput 如果已经给 splitId 分配过 SourceOutput 就会直接返回之前创建的 SourceOutput,所以一个 splitId 只有一个 SourceOutput ,当 split 完成后需要释放 SourceOutput,否则,它将继续像一个永远停滞的 source split 一样,生产 watermark ,并可能无限期地抑制 watermark
    • void releaseOutputForSplit(String splitId); 释放 SourceOutput

    MySqlSplitSerializer

    • 序列化和反序列化 MySqlSplit
    • implements SimpleVersionedSerializer<MySqlSplit>
    • public byte[] serialize(MySqlSplit split)
    • MySqlSplit deserialize(int version, byte[] serialized)

    SplitEnumeratorContext

    PendingSplitsState

    BinlogPendingSplitsState

    • private final boolean isBinlogSplitAssigned;

    SnapshotPendingSplitsState

    • final List<TableId> remainingTables;
    • final List<TableId> alreadyProcessedTables
    • final List<MySqlSnapshotSplit> remainingSplits
    • final Map<String, MySqlSnapshotSplit> assignedSplits
    • final Map<String, BinlogOffset> splitFinishedOffsets
    • AssignerStatus assignerStatus

    HybridPendingSplitsState

    • BinlogPendingSplitsStateSnapshotPendingSplitsState 的混合

    • final SnapshotPendingSplitsState snapshotPendingSplits

    • final boolean isBinlogSplitAssigned

    ChunkSplitter

    AssignerStatus

    • AssignerStatus 状态流转
    /**
     * The state of split assigner finite state machine, tips: we use word status instead of word state
     * to avoid conflict with Flink state keyword. The assigner finite state machine goes this way.
     *
     * <pre>
     *        INITIAL_ASSIGNING(start)
     *              |
     *              |
     *          onFinish()
     *              |
     *              ↓
     *    INITIAL_ASSIGNING_FINISHED(end)
     *              |
     *              |
     *        suspend() // found newly added tables
     *              |
     *              ↓
     *          SUSPENDED --- wakeup() --→ NEWLY_ADDED_ASSIGNING --- onFinish() --→ NEWLY_ADDED_ASSIGNING_FINISHED(end)
     *              ↑                                                                  |
     *              |                                                                  |
     *              |----------------- suspend() //found newly added tables -----------|
     * </pre>
     */
    

    相关文章

      网友评论

          本文标题:FLINK CDC 源码 & 时序图 一

          本文链接:https://www.haomeiwen.com/subject/jtnkjrtx.html