一、需求
flume taildir源码只支持监控一级目录下的文件,能不能支持监听子目录下的所有文件
flume版本:1.9.0
二、思路
源码中org.apache.flume.source.taildir.TaildirMatcher
private List<File> getMatchingFilesNoCache() {
List<File> result = Lists.newArrayList();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {
for (Path entry : stream) {
result.add(entry.toFile());
}
} catch (IOException e) {
logger.error("I/O exception occurred while listing parent directory. " +
"Files already matched will be returned. " + parentDir.toPath(), e);
}
return result;
}
这段代码就是获取满足条件的文件,并添加到result中。但是Files.newDirectoryStream方法只能获取当前目录下的。
所有我们需要实现目录递归,获取子目录下的文件
三、实现
try {
Files.walkFileTree(parentDir.toPath(), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
DirectoryStream<Path> stream = Files.newDirectoryStream(dir, fileFilter);
for (Path entry : stream) {
result.add(entry.toFile());
}
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
logger.error("I/O exception occurred while listing parent directory. " +
"Files already matched will be returned. " + parentDir.toPath(), e);
}
四、测试
- 编译
cd apache-flume-1.9.0-src\flume-ng-sources\flume-taildir-source
mvn clean package
- 测试
#define agent
taildir-hdfs-agent.sources=taildir-source
taildir-hdfs-agent.channels=taildir-memory-channel
taildir-hdfs-agent.sinks=hdfs-sink
#define source
taildir-hdfs-agent.sources.taildir-source.type=TAILDIR
taildir-hdfs-agent.sources.taildir-source.filegroups=f1
taildir-hdfs-agent.sources.taildir-source.filegroups.f1=/root/data/.*log.*
taildir-hdfs-agent.sources.taildir-source.positionFile=/root/position/taildir_position.json
#define channel
taildir-hdfs-agent.channels.taildir-memory-channel.type=memory
#define sink
taildir-hdfs-agent.sinks.hdfs-sink.type=logger
#bind source and sink to channel
taildir-hdfs-agent.sources.taildir-source.channels=taildir-memory-channel
taildir-hdfs-agent.sinks.hdfs-sink.channel=taildir-memory-channel
./bin/flume-ng agent -n taildir-hdfs-agent -f /root/apache-flume-1.9.0-bin/bin/taildir-memory-logger.conf -c /root/apache-flume-1.9.0-bin/bin/conf -Dflume.root.logger=INFO,console
.
├── a.log
└── sub
└── b.log
echo "2" >> b.log
2019-10-31 15:08:17,071 INFO sink.LoggerSink: Event: { headers:{} body: 32 2 }
网友评论