美文网首页
Flume 自定义拦截器

Flume 自定义拦截器

作者: lei_charles | 来源:发表于2019-10-16 15:30 被阅读0次
package com.cloudera.flume;

import com.google.common.collect.Lists;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Map;


public class LogInterceptor implements Interceptor {

    private final boolean preserveExisting;

    private final String fields_separator;


    public LogInterceptor(boolean preserveExisting, String fields_separator) {
        this.preserveExisting = preserveExisting;
        this.fields_separator = fields_separator;
    }

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        if (event == null) {
            return null;
        }
        try {
            // 获取flume接收消息头
            Map<String, String> headers = event.getHeaders();
            // 通过获取event数据,转化成字符串
            String line = new String(event.getBody(), Charset.forName("UTF-8"));
            // 通过分隔符分割数据
            String[] fields_spilts = line.split(fields_separator);
            // 不符合业务条件的,返回null
            if (...){
                return null;
            }
            headers.put(LogInterceptor.Constants.LOG_TYPE,fields_spilts[0]);
            headers.put(LogInterceptor.Constants.TIMESTAMP, fields_spilts[1]);
            return event;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> out = Lists.newArrayList();
        Iterator i$ = events.iterator();
        while (i$.hasNext()) {
            Event event = (Event) i$.next();
            Event outEvent = this.intercept(event);
            if (outEvent != null) {
                out.add(outEvent);
            }
        }
        return out;
    }

    @Override
    public void close() {

    }

    public static class Constants {
        public static String TIMESTAMP = "timestamp";
        public static String PRESERVE = "preserveExisting";
        public static boolean PRESERVE_DFLT = false;
        public static final String FIELD_SEPARATOR = "fields_separator";
        public static final String LOG_TYPE = "log_type";

        public Constants() {
        }
    }

    public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder {
        private boolean preserveExisting;
        private String fields_separator;

        public Builder() {
            this.preserveExisting = LogInterceptor.Constants.PRESERVE_DFLT;
            this.fields_separator = Constants.FIELD_SEPARATOR;
        }

        public Interceptor build() {
            return new LogInterceptor(this.preserveExisting, fields_separator);
        }

        public void configure(Context context) {
            this.preserveExisting = context.getBoolean(LogInterceptor.Constants.PRESERVE, LogInterceptor.Constants.PRESERVE_DFLT);
            this.fields_separator = context.getString(Constants.FIELD_SEPARATOR);
        }
    }
}

相关文章

网友评论

      本文标题:Flume 自定义拦截器

      本文链接:https://www.haomeiwen.com/subject/qoxvmctx.html