美文网首页Flink
Flink在加载文件数据源时,如何创建分片呢?

Flink在加载文件数据源时,如何创建分片呢?

作者: LZhan | 来源:发表于2019-12-01 17:17 被阅读0次

    主要分析FileInputFormat类的createInputSplits方法

    参数minNumSplits,通常是readFile等读取文件操作的并行度决定的。

    (1)minNumSplits = Math.max(minNumSplits, this.numSplits);
    先获取这个minNumSplits变量,在并行度与numSplits中取最大值(numSplits通常是在FileInputFormat类的configure方法中定义)

    (2)获取这个路径下的所有文件的总长度 totalLength


    (3)之后进行unsplittable分支判断
    这个unsplittable用来判断文件格式是否能够spliit,有些文件格式如avro,deflate等在块级别上是不能分割的(are not splittable),只能整个文件读取。


    • 文件格式不支持在块级别上分割

      final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
      一个文件可能占据多个block块。
      FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, len, hosts.toArray(new String[hosts.size()]));
      一个文件就是一个split,这个split可能是占据多个block块的。

    所以,如果该文件格式是不支持split的,那么分片的数目就等于该路径下文件的数目。


    • 文件格式支持在块级别上分割
      a、final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);
      先计算分片最大大小maxSplitSize ,totalLength/minNumSplits(不能够整除就加1)。
      b、获取splitSize即分片大小

    this.minSplitSize是在FileInputFormat类configure方法中定义的,如果该值小于等于block大小,
    那么minSplitSize就是configure中定义的minSplitSize;
    否则的话minSplitSize就是block size。

    splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
    halfSplit就是splitSize右移1位(相当于除以2)

    定义最后一个分片的大小,可以是一般的1.1倍
    final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);

    c、开始对一个文件开始分片

    • 文件长度大于0
      <1> 先获取当前文件所占据的block,并进行排序
      final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len); Arrays.sort(blocks);
      <2> 进行循环,文件的总字节为bytesUnassigned,每个split的字节为splitSize,最后一个split的最大字节为maxBytesForLastSplit

      <2>的第一部分:
      先分析下getBlockIndexForPosition方法:
      该方法检索BlockLocation的索引,该索引包含给定文件描述的文件部分偏移量。
      四个参数:
      blocks:该文件的所有block块(按照offset排序)
      offset:文件当前内容位置的offset
      startIndex:block的index顺序

    先去遍历该文件的每一个block(按照offset排序过),获取该block的start位置和end位置;
    如果当前文件内容offset在start和end之间,则进入if分支;
    接下来再去判断当前block的分片内容是不是超过一半了,没有超过一半在返回下一个block的索引位置,否则返回当前索引位置(即返回包含大部分数据的block索引位置)。

    <2>的第二部分:
    blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
    返回当前split超过一半内容所在的block,并进行初始化分片即FileInputSplit实例

    <3> 分配最后一个分片split


    相关文章

      网友评论

        本文标题:Flink在加载文件数据源时,如何创建分片呢?

        本文链接:https://www.haomeiwen.com/subject/aupgwctx.html