美文网首页
自定义flume拦截器

自定义flume拦截器

作者: 嘻嘻是小猪 | 来源:发表于2021-03-09 15:58 被阅读0次

需求:日志目录是 /data/logs/{instanceId}/xxx.log, 保证倒数第二级为 实例ID
flume 为 taildir-source,要求事件按不同instanceId, sink到不同的hdfs目录下。

思路:先通过taildirsource的fileHeader配置,将日志完整路径写入Event-Header, 然后用拦截器截取path得到instanceId

  • 自定义拦截器
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

public class EventInterceptor implements Interceptor  {
    //单个事件拦截
    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String fileName = headers.getOrDefault("fileName", "");
//        System.out.println("file name is: "+fileName);

        //约定的fileName格式 e.g /data/logs/10086/xxx.log
        if (!fileName.isEmpty()) {
            String[] strs = fileName.split("/");
            if (strs.length-2<0) {
                return event;
            }else{
                headers.put("instanceId", strs[strs.length-2]);
            }
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    //内部类
    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new EventInterceptor();
        }
    }
}
  • maven打包,放入flume安装位置的lib目录下

  • properties关键配置如下


## taildir-source 向header写入采集file路径
rec.sources.r1.fileHeader = true
rec.sources.r1.fileHeaderKey = fileName

##使用拦截器
rec.sources.r1.interceptors = i1
rec.sources.r1.interceptors.i1.type = com.rec.EventInterceptor$Builder

## %{xxx}方式取出header中指定value, 此处为:instanceId
rec.sinks.k1.hdfs.path = hdfs://192.168.204.31:8020/flume/rec/%{instanceId}

相关文章

网友评论

      本文标题:自定义flume拦截器

      本文链接:https://www.haomeiwen.com/subject/sihbqltx.html