CDC全量阶段chunk划分实现
前面分析到其主要划分chunk入口在MySqlSourceEnumerator类的start方法中, 最终实现在MySqlSnapshotSplitAssigner类的open方法
![](https://img.haomeiwen.com/i8024939/e61fb45a4dd66ab2.png)
public void open() {
chunkSplitter.open();
discoveryCaptureTables();
captureNewlyAddedTables();
startAsynchronouslySplit();
}
核心查看startAsynchronouslySplit方法
private void startAsynchronouslySplit() {
if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
if (executor == null) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
}
// 创建单线程池, 异步执行splitChunksForRemainingTables进行chunk划分
executor.submit(this::splitChunksForRemainingTables);
}
}
调用splitChunksForRemainingTables
private void splitChunksForRemainingTables() {
try {
// restore from a checkpoint and start to split the table from the previous
// checkpoint
// CDC任务重启从CK恢复处理
if (chunkSplitter.hasNextChunk()) {
LOG.info(
"Start splitting remaining chunks for table {}",
chunkSplitter.getCurrentSplittingTableId());
splitTable(chunkSplitter.getCurrentSplittingTableId());
}
// split the remaining tables
for (TableId nextTable : remainingTables) {
// 进行chunk划分
splitTable(nextTable);
}
} catch (Throwable e) {
synchronized (lock) {
if (uncaughtSplitterException == null) {
uncaughtSplitterException = e;
} else {
uncaughtSplitterException.addSuppressed(e);
}
// Release the potential waiting getNext() call
lock.notify();
}
}
}
再调用splitTable 方法
private void splitTable(TableId nextTable) {
// 省略
do {
synchronized (lock) {
List<MySqlSnapshotSplit> splits;
try {
splits = chunkSplitter.splitChunks(partition, nextTable);
} catch (Exception e) {
throw new IllegalStateException(
"Error when splitting chunks for " + nextTable, e);
}
} while (chunkSplitter.hasNextChunk());
// 省略
}
继续调用MySqlChunkSplitter类的splitChunks方法进行划分
@Override
public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId tableId)
throws Exception {
if (!hasNextChunk()) {
analyzeTable(partition, tableId);
Optional<List<MySqlSnapshotSplit>> evenlySplitChunks =
trySplitAllEvenlySizedChunks(partition, tableId);
if (evenlySplitChunks.isPresent()) {
return evenlySplitChunks.get();
} else {
synchronized (lock) {
this.currentSplittingTableId = tableId;
this.nextChunkStart = ChunkSplitterState.ChunkBound.START_BOUND;
this.nextChunkId = 0;
return Collections.singletonList(
splitOneUnevenlySizedChunk(partition, tableId));
}
}
} else {
Preconditions.checkState(
currentSplittingTableId.equals(tableId),
"Can not split a new table before the previous table splitting finish.");
if (currentSplittingTable == null) {
/**
* 1.如果表没有主键,则必须设置chunkKeyColumn。
* 2.如果表有主键,则chunkKeyColumn必须是其中的一列,否则为空。
* 3.当参数chunkKeyColumn未设置且表具有主键时,返回主键的第一列(联合主键情况)。
* 4.根据拆分列, 查询其最大值, 最小值
* 5.执行SHOW TABLE STATUS LIKE 'TablaName'获取表数据量
*/
analyzeTable(partition, currentSplittingTableId);
}
synchronized (lock) {
return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId));
}
}
}
最终调用splitOneUnevenlySizedChunk方法, 划分后的MySqlSnapshotSplit列表保存到remainingSplits集合中
- 均匀分布
主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前chunk 起始位置、chunkSize大小,直接计算chunk 的结束位置。
// 计算主键列数据区间
select min(order_id
), max(order_id
) from demo_orders;
// 将数据划分为 chunkSize 大小的切片
chunk-0: [min,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max,null)
- 非均匀分布
主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置
// 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。
chunkend = SELECT MAX(order_id
) FROM (
SELECTorder_id
FROMdemo_orders
WHEREorder_id
>= [前一个切片的起始位置]
ORDER BYorder_id
ASC
LIMIT [chunkSize]
) AS T
private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, TableId tableId)
throws SQLException {
final int chunkSize = sourceConfig.getSplitSize();
final Object chunkStartVal = nextChunkStart.getValue();
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {} from {}",
tableId,
chunkSize,
nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
? "null"
: chunkStartVal.toString());
// we start from [null, min + chunk_size) and avoid [null, min)
Object chunkEnd =
nextChunkEnd(
jdbcConnection,
nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
? minMaxOfSplitColumn[0]
: chunkStartVal,
tableId,
splitColumn.name(),
minMaxOfSplitColumn[1],
chunkSize);
// may sleep a while to avoid DDOS on MySQL server
maySleep(nextChunkId, tableId);
if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) {
nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
return createSnapshotSplit(
jdbcConnection,
partition,
tableId,
nextChunkId++,
splitType,
chunkStartVal,
chunkEnd);
} else {
currentSplittingTableId = null;
nextChunkStart = ChunkSplitterState.ChunkBound.END_BOUND;
return createSnapshotSplit(
jdbcConnection,
partition,
tableId,
nextChunkId++,
splitType,
chunkStartVal,
null);
}
}
网友评论