Flink在读取JDBC表时,为了加快速度,通常可以并发的方式读取,只需要增加以下几个参数:
'connector.read.partition.column'='id','connector.read.partition.lower-bound'='1','connector.read.partition.upper-bound'='1000','connector.read.partition.num'='10'
column:分区字段
lower-bound:分区字段值的下界
upper-bound:分区字段值的上界
num:分区数
根据以上参数,可以确定总的记录数(maxElemCount=maxVal-minVal),然后再基于分区数计算每个分区将要fetch的记录数(batchSize=maxElemCount / batchNum),即分区步长,接下来就是计算每个分区数据边界,算法很简单:
需要注意的是,分区列的最大值和最小值会作为过滤条件,因此如果设置的不合理,会导致数据查询不完整。
在Sqoop中,从mysql import,如果指定了并发数,即map task数,也要按照一定的分区方法,将数据split到多个map里。sqoop的算法和Flink的算法类似,本质上都是要对数据进行合理分片、分到多个task。sqoop算法如下,numSplits是数据分区数,同时也是map task数:
JDBC Table Source执行时,会先把split绑定到task、设置PreparedStatement where条件,接下来就是循环遍历ResultSet结果集了:
我们还要关注下算子的并行度,因为算子的并行度和数据的分区还是不一样的,所以这里还有一步,怎么把分区数据分配给并行化的算子。算子没有设置并行度时,就用默认并行度:
int vertexParallelism = jobVertex.getParallelism();int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
如果算子并行度设置的比分区数大,会有subtask空跑的情况,如果并行度设置的比分区数小,会有一个或多个subtask读取多个分区的情况。
Sqoop和Flink不同的一个点是,分区列的最大、最小值是运行时决定的,不是指定的,就是说sqoop开始执行时,会根据指定的sql查询出最值;而且sqoop的map task数就是分区数,不会有一个map拉取多个分区数据或一个map没有拉取到分区数据的情况。
网友评论