我们都知道,使用flume收集数据,flume内部数据传输都是以event为基本单位进行传输的,他会把收集的数据划分为一个个的event进行传输。
Spooling Directory Source
我们一般情况下使用flume监控文件(文件夹)一类的数据并进行采集时,他会按行进行划分并创建event,如果一行数据超过2500(可以修改)个字节,他会把剩下的放入到下一个event里。
但有时候按行来创建event并不会满足我们的需求。我们就需要修改他的划分event的方式。具体方法直接看代码就一目了然。
1.下载flume源码包,将源码包导入IDEA。导入方法:将源码导入到IDEA
2.找到 org.apache.flume.serialization.LineDeserializer这个类
里面有readLine这样的一个方法
private StringreadLine()throws IOException {
StringBuilder sb =new StringBuilder();
int c;
int readChars =0;
while ((c =in.readChar()) != -1) {
readChars++;
// 按行分割
if (c =='\n') {
break;
}
sb.append((char)c);
if (readChars >=maxLineLength) {//event的最大字节数
logger.warn("Line length exceeds max ({}), truncating line!", maxLineLength);
break;
}
}
if (readChars >0) {
return sb.toString();
}else {
return null;
}
}
我们可以根据自己需求去修改源代码了。修改完成后将源码打包,替换原来的包即可。
Syslog Sources
但是以上的方式并不适用于flume收集TCP
UDP发来的数据。但是不要紧,两者的解决方式还是差不多的。
我们需要找到org.apache.flume.source.SyslogUtils这个类
里面有extractEvent这样的一个方法。
我们根据自己的需求修改源代码,再打包,替换即可。
安利一个特别热心的编程乐园群:624108656
超级热心的群
网友评论