美文网首页
Flume Taildir Source源码修改---监听目录

Flume Taildir Source源码修改---监听目录

作者: 吃货大米饭 | 来源:发表于2019-10-31 15:09 被阅读0次

    一、需求

    flume taildir源码只支持监控一级目录下的文件,能不能支持监听子目录下的所有文件
    flume版本:1.9.0

    二、思路

    源码中org.apache.flume.source.taildir.TaildirMatcher

    private List<File> getMatchingFilesNoCache() {
        List<File> result = Lists.newArrayList();
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {
          for (Path entry : stream) {
            result.add(entry.toFile());
          }
        } catch (IOException e) {
          logger.error("I/O exception occurred while listing parent directory. " +
                       "Files already matched will be returned. " + parentDir.toPath(), e);
        }
        return result;
      }
    

    这段代码就是获取满足条件的文件,并添加到result中。但是Files.newDirectoryStream方法只能获取当前目录下的。
    所有我们需要实现目录递归,获取子目录下的文件

    三、实现

    try {
                Files.walkFileTree(parentDir.toPath(),  new SimpleFileVisitor<Path>() {
                    @Override
                    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                        DirectoryStream<Path> stream = Files.newDirectoryStream(dir, fileFilter);
                        for (Path entry : stream) {
                            result.add(entry.toFile());
                        }
    
                        return FileVisitResult.CONTINUE;
                    }
                });
            } catch (IOException e) {
                logger.error("I/O exception occurred while listing parent directory. " +
                        "Files already matched will be returned. " + parentDir.toPath(), e);
            }
    

    四、测试

    • 编译
    cd apache-flume-1.9.0-src\flume-ng-sources\flume-taildir-source
    mvn clean package
    
    • 测试
    #define agent
    taildir-hdfs-agent.sources=taildir-source
    taildir-hdfs-agent.channels=taildir-memory-channel
    taildir-hdfs-agent.sinks=hdfs-sink
    
    #define source
    taildir-hdfs-agent.sources.taildir-source.type=TAILDIR
    taildir-hdfs-agent.sources.taildir-source.filegroups=f1
    taildir-hdfs-agent.sources.taildir-source.filegroups.f1=/root/data/.*log.*
    taildir-hdfs-agent.sources.taildir-source.positionFile=/root/position/taildir_position.json
    
    #define channel
    taildir-hdfs-agent.channels.taildir-memory-channel.type=memory
    
    #define sink 
    taildir-hdfs-agent.sinks.hdfs-sink.type=logger
    
    
    #bind source and sink to channel
    taildir-hdfs-agent.sources.taildir-source.channels=taildir-memory-channel
    taildir-hdfs-agent.sinks.hdfs-sink.channel=taildir-memory-channel
    
    ./bin/flume-ng agent -n taildir-hdfs-agent -f /root/apache-flume-1.9.0-bin/bin/taildir-memory-logger.conf -c /root/apache-flume-1.9.0-bin/bin/conf -Dflume.root.logger=INFO,console
    .
    ├── a.log
    └── sub
        └── b.log
    echo "2" >> b.log 
    2019-10-31 15:08:17,071 INFO sink.LoggerSink: Event: { headers:{} body: 32                                              2 }
    

    相关文章

      网友评论

          本文标题:Flume Taildir Source源码修改---监听目录

          本文链接:https://www.haomeiwen.com/subject/drezvctx.html