package com.cloudera.flume;
import com.google.common.collect.Lists;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class LogInterceptor implements Interceptor {
private final boolean preserveExisting;
private final String fields_separator;
public LogInterceptor(boolean preserveExisting, String fields_separator) {
this.preserveExisting = preserveExisting;
this.fields_separator = fields_separator;
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
// 获取flume接收消息头
Map<String, String> headers = event.getHeaders();
// 通过获取event数据,转化成字符串
String line = new String(event.getBody(), Charset.forName("UTF-8"));
// 通过分隔符分割数据
String[] fields_spilts = line.split(fields_separator);
// 不符合业务条件的,返回null
if (...){
return null;
}
headers.put(LogInterceptor.Constants.LOG_TYPE,fields_spilts[0]);
headers.put(LogInterceptor.Constants.TIMESTAMP, fields_spilts[1]);
return event;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> out = Lists.newArrayList();
Iterator i$ = events.iterator();
while (i$.hasNext()) {
Event event = (Event) i$.next();
Event outEvent = this.intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
@Override
public void close() {
}
public static class Constants {
public static String TIMESTAMP = "timestamp";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
public static final String FIELD_SEPARATOR = "fields_separator";
public static final String LOG_TYPE = "log_type";
public Constants() {
}
}
public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder {
private boolean preserveExisting;
private String fields_separator;
public Builder() {
this.preserveExisting = LogInterceptor.Constants.PRESERVE_DFLT;
this.fields_separator = Constants.FIELD_SEPARATOR;
}
public Interceptor build() {
return new LogInterceptor(this.preserveExisting, fields_separator);
}
public void configure(Context context) {
this.preserveExisting = context.getBoolean(LogInterceptor.Constants.PRESERVE, LogInterceptor.Constants.PRESERVE_DFLT);
this.fields_separator = context.getString(Constants.FIELD_SEPARATOR);
}
}
}
网友评论