美文网首页
Flink中与流广播配合使用的readFile解析

Flink中与流广播配合使用的readFile解析

作者: LZhan | 来源:发表于2019-12-01 18:10 被阅读0次
1. 前言

最近有需求要定时监控文件,如果文件内容发生变化,就要动态地获取新内容,于是就准备使用 env.readFile方法,
(1)当你监控一个文件时,当文件内容发生变化,会将文件的整个内容作为流输出。
(2)当你监控一个目录的时候,发现当你复制一个文件,而没有改变这个文件的内容,并不会监控到也不会输出内容,只有某文件内容发生改变时,才会将该文件的所有内容输出(也不是这个目录下的所有文件内容输出)

因此,上述内容底层到底是如何实现的呢?是根据什么判定该文件或者该目录发生了变化呢?

2.源码分析

(1) env.readFile方法
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation)
具体实现中是调用方法
createFileInput(FileInputFormat<OUT> inputFormat, TypeInformation<OUT> typeInfo, String sourceName, FileProcessingMode monitoringMode, long interval)


(2)ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction(inputFormat, monitoringMode, this.getParallelism(), interval);
ContinuousFileMonitoringFunction类的作用:
这是一个单个(非并行)监视任务,主要负责:
a、监视用户提供的路径
b、确定应该进一步读取和处理哪些文件
c、创建与那些文件相对应的文件输入片
d、将它们分配给下游任务进一步处理(分片分配到下游任务,可以超过单个并行度)
注意:分片被转发到下游时,基于它们所属文件的修改时间,以便按修改时间升序进行读取。

2.1 该类的构造器


其中,定义了分片后下游读取的并行度readerParallelism(算子定义的并行度和1中取最大值);
还有默认的全局修改时间globalModificationTime(初始是无穷小值)
另外,一次只能监控一个路径,不能同时监控多个不同的路径。

2.2 run方法
run方法是实现接口SourceFunction实现的方法,是用来向下游输出元素的。

a、首先是根据文件路径进行初始化,获取checkpoint锁
这个锁的作用是保证对state进行checkpoint和update的操作,与元素的输出不是同步完成的,需要在synchronized块中。

@Override
    public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
        Path p = new Path(path);
        FileSystem fileSystem = FileSystem.get(p.toUri());
        if (!fileSystem.exists(p)) {
            throw new FileNotFoundException("The provided file path " + path + " does not exist.");
        }

b、根据watchType,决定如何向下游输出元素。

  • PROCESS_CONTINUOUSLY类型:
    while循环,在获得checkpointLock时,调用monitorDirAndForwardSplits方法,并且线程睡眠interval毫秒,再去再次调用monitorDirAndForwardSplits方法。

  • PROCESS_ONCE类型:
    调用一次monitorDirAndForwardSplits方法,将isRunning置为false。

switch (watchType) {
            case PROCESS_CONTINUOUSLY:
                while (isRunning) {
                    synchronized (checkpointLock) {
                        monitorDirAndForwardSplits(fileSystem, context);
                    }
                    Thread.sleep(interval);
                }

                break;
            case PROCESS_ONCE:
                synchronized (checkpointLock) {
                    if (globalModificationTime == Long.MIN_VALUE) {
                        monitorDirAndForwardSplits(fileSystem, context);
                        globalModificationTime = Long.MAX_VALUE;
                    }
                    isRunning = false;
                }
                break;
            default:
                isRunning = false;
                throw new RuntimeException("Unknown WatchType" + watchType);

2.3 monitorDirAndForwardSplits方法

        Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(path));
        Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);

        for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
            long modificationTime = splits.getKey();
            for (TimestampedFileInputSplit split: splits.getValue()) {
                LOG.info("Forwarding split: " + split);
                context.collect(split);
            }
            // update the global modification time
            globalModificationTime = Math.max(globalModificationTime, modificationTime);
        }

a、首先调用listEligibleFiles方法
FileStatus[] statuses=fileSystem.listStatus(path)获取路径下所有文件夹/文件的状态;

  • 文件类型,获取最新的修改时间,调用shouldIgnore方法(该方法里判定modificationTime <= globalModificationTime时,返回true,应该忽略)
    就是说某个文件的修改时间,是小于等于全局最近修改时间的时候,就会忽略了,认为该文件并没有再次被修改。
    所以说,在路径下新复制一个文件,取决于这个被复制的文件的上次修改时间,与全局修改时间的大小,如果更大,则不会被忽略,相反会被忽略。

  • 目录类型,递归调用listEligibleFiles方法


b、从a步骤中获取内容又发生变化的文件后,调用getInputSplitsSortedByModTime


注意这里使用的数据结果为TreeMap结构,key就是修改时间的时间戳,value是文件输入分片 TimestampedFileInputSplit实例的列表。
因为TreeMap存储,所以会将最新时间戳放在最前面。

c、获取到TreeMap之后,进行遍历,将分片分配给下游,更新全局更新时间。



(3)根据文件格式创建ContinuousFileReaderOperator实例
读取从前面ContinuousFileMonitoringFunction返回的的修改的文件分片(TimestampedFileInputSplit实例);
与ContinuousFileMonitoringFunction相反并行度为1,此运算符的并行度>1;
一旦收到文件splits描述符,它就会被放入队列中,并具有另一个线程读取拆分的实际数据。 这种架构允许将从发出检查点障碍的那一侧读取线程,从而消除任何潜在的背压。

(4)最后返回数据源


3.总结

flink定时监控文件在某些场景十分有用,比如说动态修改某配置文件,在风险监控场景动态修改告警规则等,再与流广播配合使用那就更香了,达到不需要停止实时任务而能够改变某些配置。
至于问题所说,作者在进行实践中,复制某个文件之后,目录下明明多了一个文件,却并没有将该文件的内容读取输出,相信大家看了源码分析的内容应该有了答案:
因为复制的文件,该文件的最新修改时间是与源文件保持一致的,并不是你复制的时间,所以这个修改时间可能就小于全局更新时间(globalModificationTime),而不被认为是新文件,所以在listEligibleFiles那一步就被忽略了。
如果你是
(1)新创建的文件,
(2)或者说复制的源文件的修改时间是大于globalModificationTime,
(3)再或者你复制文件后,编辑下文件,更新下文件修改时间
最后肯定会输出该文件的内容。

相关文章

网友评论

      本文标题:Flink中与流广播配合使用的readFile解析

      本文链接:https://www.haomeiwen.com/subject/wtvbwctx.html