美文网首页
Flink CDC 源码学习(四)

Flink CDC 源码学习(四)

作者: 无色的叶 | 来源:发表于2023-11-14 11:32 被阅读0次

CDC全量阶段chunk划分实现

前面分析到其主要划分chunk入口在MySqlSourceEnumerator类的start方法中, 最终实现在MySqlSnapshotSplitAssigner类的open方法


image.png
 public void open() {
        chunkSplitter.open();
        discoveryCaptureTables();
        captureNewlyAddedTables();
        startAsynchronouslySplit();
    }

核心查看startAsynchronouslySplit方法

private void startAsynchronouslySplit() {
        if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
            if (executor == null) {
                ThreadFactory threadFactory =
                        new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
                this.executor = Executors.newSingleThreadExecutor(threadFactory);
            }
            // 创建单线程池, 异步执行splitChunksForRemainingTables进行chunk划分
            executor.submit(this::splitChunksForRemainingTables);
        }
    }

调用splitChunksForRemainingTables

private void splitChunksForRemainingTables() {
        try {
            // restore from a checkpoint and start to split the table from the previous
            // checkpoint
            // CDC任务重启从CK恢复处理
            if (chunkSplitter.hasNextChunk()) {
                LOG.info(
                        "Start splitting remaining chunks for table {}",
                        chunkSplitter.getCurrentSplittingTableId());
                splitTable(chunkSplitter.getCurrentSplittingTableId());
            }

            // split the remaining tables
            for (TableId nextTable : remainingTables) {
                // 进行chunk划分
                splitTable(nextTable);
            }
        } catch (Throwable e) {
            synchronized (lock) {
                if (uncaughtSplitterException == null) {
                    uncaughtSplitterException = e;
                } else {
                    uncaughtSplitterException.addSuppressed(e);
                }
                // Release the potential waiting getNext() call
                lock.notify();
            }
        }
    }

再调用splitTable 方法

 private void splitTable(TableId nextTable) {
       // 省略
        do {
            synchronized (lock) {
                List<MySqlSnapshotSplit> splits;
                try {
                    splits = chunkSplitter.splitChunks(partition, nextTable);
                } catch (Exception e) {
                    throw new IllegalStateException(
                            "Error when splitting chunks for " + nextTable, e);
                }

               
        } while (chunkSplitter.hasNextChunk());
         // 省略
    }

继续调用MySqlChunkSplitter类的splitChunks方法进行划分

@Override
    public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId tableId)
            throws Exception {
        if (!hasNextChunk()) {
            analyzeTable(partition, tableId);
            Optional<List<MySqlSnapshotSplit>> evenlySplitChunks =
                    trySplitAllEvenlySizedChunks(partition, tableId);
            if (evenlySplitChunks.isPresent()) {
                return evenlySplitChunks.get();
            } else {
                synchronized (lock) {
                    this.currentSplittingTableId = tableId;
                    this.nextChunkStart = ChunkSplitterState.ChunkBound.START_BOUND;
                    this.nextChunkId = 0;
                    return Collections.singletonList(
                            splitOneUnevenlySizedChunk(partition, tableId));
                }
            }
        } else {
            Preconditions.checkState(
                    currentSplittingTableId.equals(tableId),
                    "Can not split a new table before the previous table splitting finish.");
            if (currentSplittingTable == null) {
                /**
                 *  1.如果表没有主键,则必须设置chunkKeyColumn。
                 *  2.如果表有主键,则chunkKeyColumn必须是其中的一列,否则为空。
                 *  3.当参数chunkKeyColumn未设置且表具有主键时,返回主键的第一列(联合主键情况)。
                 *  4.根据拆分列, 查询其最大值, 最小值
                 *  5.执行SHOW TABLE STATUS LIKE 'TablaName'获取表数据量
                 */
                analyzeTable(partition, currentSplittingTableId);
            }
            synchronized (lock) {
                return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId));
            }
        }
    }

最终调用splitOneUnevenlySizedChunk方法, 划分后的MySqlSnapshotSplit列表保存到remainingSplits集合中

  • 均匀分布

主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前chunk 起始位置、chunkSize大小,直接计算chunk 的结束位置。

// 计算主键列数据区间
select min(order_id), max(order_id) from demo_orders;
// 将数据划分为 chunkSize 大小的切片
chunk-0: [min,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max,null)

  • 非均匀分布
    主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置

// 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。
chunkend = SELECT MAX(order_id) FROM (
SELECT order_id FROM demo_orders
WHERE order_id >= [前一个切片的起始位置]
ORDER BY order_id ASC
LIMIT [chunkSize]
) AS T

 private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, TableId tableId)
            throws SQLException {
        final int chunkSize = sourceConfig.getSplitSize();
        final Object chunkStartVal = nextChunkStart.getValue();
        LOG.info(
                "Use unevenly-sized chunks for table {}, the chunk size is {} from {}",
                tableId,
                chunkSize,
                nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
                        ? "null"
                        : chunkStartVal.toString());
        // we start from [null, min + chunk_size) and avoid [null, min)
        Object chunkEnd =
                nextChunkEnd(
                        jdbcConnection,
                        nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
                                ? minMaxOfSplitColumn[0]
                                : chunkStartVal,
                        tableId,
                        splitColumn.name(),
                        minMaxOfSplitColumn[1],
                        chunkSize);
        // may sleep a while to avoid DDOS on MySQL server
        maySleep(nextChunkId, tableId);
        if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) {
            nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
            return createSnapshotSplit(
                    jdbcConnection,
                    partition,
                    tableId,
                    nextChunkId++,
                    splitType,
                    chunkStartVal,
                    chunkEnd);
        } else {
            currentSplittingTableId = null;
            nextChunkStart = ChunkSplitterState.ChunkBound.END_BOUND;
            return createSnapshotSplit(
                    jdbcConnection,
                    partition,
                    tableId,
                    nextChunkId++,
                    splitType,
                    chunkStartVal,
                    null);
        }
    }

相关文章

网友评论

      本文标题:Flink CDC 源码学习(四)

      本文链接:https://www.haomeiwen.com/subject/rnpdwdtx.html