美文网首页
Flink CDC 源码学习(五)

Flink CDC 源码学习(五)

作者: 无色的叶 | 来源:发表于2023-11-15 20:27 被阅读0次

    MySqlSourceReader类核心功能

    数据分片

    执行start方法向MySqlSourceEnumerator发起分片请求, 触发MySqlSourceEnumerator类的handleSplitRequest方法, 并向reader分配分片

     public void start() {
            if (getNumberOfCurrentlyAssignedSplits() <= 1) {
                context.sendSplitRequest();
            }
        }
    

    分片处理

    执行addSplits方法, 处理MySqlSourceEnumerator分配的chunk分片

     private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlogSplit) {
            // restore for finishedUnackedSplits
            // 存放未处理的分片
            List<MySqlSplit> unfinishedSplits = new ArrayList<>();
            for (MySqlSplit split : splits) {
                LOG.info("Source reader {} adds split {}", subtaskId, split);
                // 全量阶段
                if (split.isSnapshotSplit()) {
                    MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
                    // 判断该分片是否已完成, 如已完成存放到finishedUnackedSplits map中
                    if (snapshotSplit.isSnapshotReadFinished()) {
                        finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
                    } else if (sourceConfig
                            .getTableFilters()
                            .dataCollectionFilter()
                            .isIncluded(split.asSnapshotSplit().getTableId())) {
                        // 如匹配到配置的同步表, 放入未处理分片的列表中
                        unfinishedSplits.add(split);
                    } else {
                        LOG.debug(
                                "The subtask {} is skipping split {} because it does not match new table filter.",
                                subtaskId,
                                split.splitId());
                    }
                } else {
                    // Binlog阶段
                    MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
                    // When restore from a checkpoint, the finished split infos may contain some splits
                    // for the deleted tables.
                    // We need to remove these splits for the deleted tables at the finished split
                    // infos.
                    // 如果从CK恢复, 需移除可能被删除的同步表
                    if (checkTableChangeForBinlogSplit) {
                        binlogSplit =
                                filterOutdatedSplitInfos(
                                        binlogSplit,
                                        sourceConfig
                                                .getMySqlConnectorConfig()
                                                .getTableFilters()
                                                .dataCollectionFilter());
                    }
    
                    // Try to discovery table schema once for newly added tables when source reader
                    // start or restore
                    // 是否有新增同步表
                    boolean checkNewlyAddedTableSchema =
                            !mySqlSourceReaderContext.isHasAssignedBinlogSplit()
                                    && sourceConfig.isScanNewlyAddedTableEnabled();
                    mySqlSourceReaderContext.setHasAssignedBinlogSplit(true);
    
                    // the binlog split is suspended
                    if (binlogSplit.isSuspended()) {
                        suspendedBinlogSplit = binlogSplit;
                    } else if (!binlogSplit.isCompletedSplit()) {
                        uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
                        requestBinlogSplitMetaIfNeeded(binlogSplit);
                    } else {
                        uncompletedBinlogSplits.remove(binlogSplit.splitId());
                        MySqlBinlogSplit mySqlBinlogSplit =
                                discoverTableSchemasForBinlogSplit(
                                        binlogSplit, sourceConfig, checkNewlyAddedTableSchema);
                        unfinishedSplits.add(mySqlBinlogSplit);
                    }
                    LOG.info(
                            "Source reader {} received the binlog split : {}.", subtaskId, binlogSplit);
                    context.sendSourceEventToCoordinator(new BinlogSplitAssignedEvent());
                }
            }
            // notify split enumerator again about the finished unacked snapshot splits
            reportFinishedSnapshotSplitsIfNeed();
            // add all un-finished splits (including binlog split) to SourceReaderBase
            // 调用父类SourceReaderBase.addSplits方法, 添加 un-finished splits
            if (!unfinishedSplits.isEmpty()) {
                super.addSplits(unfinishedSplits);
            } else if (suspendedBinlogSplit
                    != null) { // only request new snapshot split if the binlog split is suspended
                context.sendSplitRequest();
            }
        }
    
    

    继续调用SourceReaderBase.addSplits方法

     @Override
        public void addSplits(List<SplitT> splits) {
            LOG.info("Adding split(s) to reader: {}", splits);
            // Initialize the state for each split.
            splits.forEach(
                    s ->
                            splitStates.put(
                                    s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
            // Hand over the splits to the split fetcher to start fetch.
            splitFetcherManager.addSplits(splits);
        }
    

    继续调用splitFetcherManager.addSplits方法, 其中splitFetcherManager为SingleThreadFetcherManager对象, 在MySqlSourceReader构造函数中进行的初始化

     @Override
        public void addSplits(List<SplitT> splitsToAdd) {
            // 获取正在运行的fetcher
            SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
            if (fetcher == null) {
                /**
                 *  创建SplitFetcher, 并加入到fetchers集合中
                 * */
                fetcher = createSplitFetcher();
                // Add the splits to the fetchers.
                // 创建AddSplitsTask任务, 并加入到taskQueue中待执行
                fetcher.addSplits(splitsToAdd);
                // 启动执行fetcher
                startFetcher(fetcher);
            } else {
                // 创建AddSplitsTask任务, 并加入到taskQueue中待执行
                fetcher.addSplits(splitsToAdd);
            }
        }
    

    继续查看SplitFetcher run方法

     @Override
        public void run() {
            LOG.info("Starting split fetcher {}", id);
            try {
                while (runOnce()) {
                    // nothing to do, everything is inside #runOnce.
                }
            } catch (Throwable t) {
                errorHandler.accept(t);
            } finally {
                try {
                    splitReader.close();
                } catch (Exception e) {
                    errorHandler.accept(e);
                } finally {
                    LOG.info("Split fetcher {} exited.", id);
                    // This executes after possible errorHandler.accept(t). If these operations bear
                    // a happens-before relation, then we can checking side effect of
                    // errorHandler.accept(t)
                    // to know whether it happened after observing side effect of shutdownHook.run().
                    shutdownHook.run();
                }
            }
        }
    

    继续调用runOnce()

    boolean runOnce() {
            // first blocking call = get next task. blocks only if there are no active splits and queued
            // tasks.
            SplitFetcherTask task;
            lock.lock();
            try {
                if (closed) {
                    return false;
                }
                // 从taskQueue获取待执行的任务
                task = getNextTaskUnsafe();
                if (task == null) {
                    // (spurious) wakeup, so just repeat
                    return true;
                }
    
                LOG.debug("Prepare to run {}", task);
                // store task for #wakeUp
                this.runningTask = task;
            } finally {
                lock.unlock();
            }
    
            // execute the task outside of lock, so that it can be woken up
            boolean taskFinished;
            try {
                // 执行SplitFetcherTask run方法
                taskFinished = task.run();
            } catch (Exception e) {
                throw new RuntimeException(
                        String.format(
                                "SplitFetcher thread %d received unexpected exception while polling the records",
                                id),
                        e);
            }
    
            // re-acquire lock as all post-processing steps, need it
            lock.lock();
            try {
                this.runningTask = null;
                processTaskResultUnsafe(task, taskFinished);
            } finally {
                lock.unlock();
            }
            return true;
        }
    

    继续调用AddSplitsTask run方法, 这里的splitReader对象是MySqlSplitReader, 执行其handleSplitsChanges方法,

     @Override
        public boolean run() {
            for (SplitT s : splitsToAdd) {
                assignedSplits.put(s.splitId(), s);
            }
            splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
            return true;
        }
    

    handleSplitsChanges方法只是把MySqlSplit添加到对应的snapshotSplits或者binlogSplits列表中

    @Override
        public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
            if (!(splitsChanges instanceof SplitsAddition)) {
                throw new UnsupportedOperationException(
                        String.format(
                                "The SplitChange type of %s is not supported.",
                                splitsChanges.getClass()));
            }
    
            LOG.info("Handling split change {}", splitsChanges);
            for (MySqlSplit mySqlSplit : splitsChanges.splits()) {
                if (mySqlSplit.isSnapshotSplit()) {
                    snapshotSplits.add(mySqlSplit.asSnapshotSplit());
                } else {
                    binlogSplits.add(mySqlSplit.asBinlogSplit());
                }
            }
        }
    

    在前面createSplitFetcher方法创建SplitFetcher对象时, 其构造函数中会创建FetchTask

    SplitFetcher(
                int id,
                FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
                SplitReader<E, SplitT> splitReader,
                Consumer<Throwable> errorHandler,
                Runnable shutdownHook,
                Consumer<Collection<String>> splitFinishedHook,
                boolean allowUnalignedSourceSplits) {
            this.id = id;
            this.elementsQueue = checkNotNull(elementsQueue);
            this.splitReader = checkNotNull(splitReader);
            this.errorHandler = checkNotNull(errorHandler);
            this.shutdownHook = checkNotNull(shutdownHook);
            this.allowUnalignedSourceSplits = allowUnalignedSourceSplits;
    
            // 创建fetchTask, 其中run 方法中会调用splitReader.fetch()方法
            this.fetchTask =
                    new FetchTask<>(
                            splitReader,
                            elementsQueue,
                            ids -> {
                                ids.forEach(assignedSplits::remove);
                                splitFinishedHook.accept(ids);
                                LOG.info("Finished reading from splits {}", ids);
                            },
                            id);
        }
    

    最终调用MySqlSplitReader.fetch()方法去拉取数据, 由DebeziumReader读取数据

    相关文章

      网友评论

          本文标题:Flink CDC 源码学习(五)

          本文链接:https://www.haomeiwen.com/subject/mbgswdtx.html