美文网首页
Flume 自定义拦截器

Flume 自定义拦截器

作者: lei_charles | 来源:发表于2019-10-16 15:30 被阅读0次
    package com.cloudera.flume;
    
    import com.google.common.collect.Lists;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    
    
    public class LogInterceptor implements Interceptor {
    
        private final boolean preserveExisting;
    
        private final String fields_separator;
    
    
        public LogInterceptor(boolean preserveExisting, String fields_separator) {
            this.preserveExisting = preserveExisting;
            this.fields_separator = fields_separator;
        }
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
            if (event == null) {
                return null;
            }
            try {
                // 获取flume接收消息头
                Map<String, String> headers = event.getHeaders();
                // 通过获取event数据,转化成字符串
                String line = new String(event.getBody(), Charset.forName("UTF-8"));
                // 通过分隔符分割数据
                String[] fields_spilts = line.split(fields_separator);
                // 不符合业务条件的,返回null
                if (...){
                    return null;
                }
                headers.put(LogInterceptor.Constants.LOG_TYPE,fields_spilts[0]);
                headers.put(LogInterceptor.Constants.TIMESTAMP, fields_spilts[1]);
                return event;
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            List<Event> out = Lists.newArrayList();
            Iterator i$ = events.iterator();
            while (i$.hasNext()) {
                Event event = (Event) i$.next();
                Event outEvent = this.intercept(event);
                if (outEvent != null) {
                    out.add(outEvent);
                }
            }
            return out;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Constants {
            public static String TIMESTAMP = "timestamp";
            public static String PRESERVE = "preserveExisting";
            public static boolean PRESERVE_DFLT = false;
            public static final String FIELD_SEPARATOR = "fields_separator";
            public static final String LOG_TYPE = "log_type";
    
            public Constants() {
            }
        }
    
        public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder {
            private boolean preserveExisting;
            private String fields_separator;
    
            public Builder() {
                this.preserveExisting = LogInterceptor.Constants.PRESERVE_DFLT;
                this.fields_separator = Constants.FIELD_SEPARATOR;
            }
    
            public Interceptor build() {
                return new LogInterceptor(this.preserveExisting, fields_separator);
            }
    
            public void configure(Context context) {
                this.preserveExisting = context.getBoolean(LogInterceptor.Constants.PRESERVE, LogInterceptor.Constants.PRESERVE_DFLT);
                this.fields_separator = context.getString(Constants.FIELD_SEPARATOR);
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Flume 自定义拦截器

          本文链接:https://www.haomeiwen.com/subject/qoxvmctx.html