美文网首页
自定义flume拦截器

自定义flume拦截器

作者: 嘻嘻是小猪 | 来源:发表于2021-03-09 15:58 被阅读0次

    需求:日志目录是 /data/logs/{instanceId}/xxx.log, 保证倒数第二级为 实例ID
    flume 为 taildir-source,要求事件按不同instanceId, sink到不同的hdfs目录下。

    思路:先通过taildirsource的fileHeader配置,将日志完整路径写入Event-Header, 然后用拦截器截取path得到instanceId

    • 自定义拦截器
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.List;
    import java.util.Map;
    
    public class EventInterceptor implements Interceptor  {
        //单个事件拦截
        @Override
        public Event intercept(Event event) {
            Map<String, String> headers = event.getHeaders();
            String fileName = headers.getOrDefault("fileName", "");
    //        System.out.println("file name is: "+fileName);
    
            //约定的fileName格式 e.g /data/logs/10086/xxx.log
            if (!fileName.isEmpty()) {
                String[] strs = fileName.split("/");
                if (strs.length-2<0) {
                    return event;
                }else{
                    headers.put("instanceId", strs[strs.length-2]);
                }
            }
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            for (Event event : list) {
                intercept(event);
            }
            return list;
        }
    
        //内部类
        public static class Builder implements Interceptor.Builder{
            @Override
            public Interceptor build() {
                return new EventInterceptor();
            }
        }
    }
    
    • maven打包,放入flume安装位置的lib目录下

    • properties关键配置如下

    
    ## taildir-source 向header写入采集file路径
    rec.sources.r1.fileHeader = true
    rec.sources.r1.fileHeaderKey = fileName
    
    ##使用拦截器
    rec.sources.r1.interceptors = i1
    rec.sources.r1.interceptors.i1.type = com.rec.EventInterceptor$Builder
    
    ## %{xxx}方式取出header中指定value, 此处为:instanceId
    rec.sinks.k1.hdfs.path = hdfs://192.168.204.31:8020/flume/rec/%{instanceId}
    

    相关文章

      网友评论

          本文标题:自定义flume拦截器

          本文链接:https://www.haomeiwen.com/subject/sihbqltx.html