美文网首页
Flume之自定义Intercept

Flume之自定义Intercept

作者: 阿坤的博客 | 来源:发表于2018-08-17 10:51 被阅读65次

    Flume 有各种自带的拦截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor 等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,通过自定义Interceptor来实现对字段的控制

    主要内容:

    • 1.需求
    • 2.编码实现

    相关文章:
    1.CentOS7安装Nginx
    2.Flume之采集Nginx的日志
    3.Flume之自定义Intercept

    1.需求

    原始Nginx日志

    139.199.86.244|2018-08-17T05:35:45+08:00|104.194.100.87|GET /mysqladmin/index.php HTTP/1.1|0.003|404|76|104.128.100.46:8085|0.002|404|-|Mozilla/5.0
    

    采集后

    server1|nginx|access.log|139.199.86.244|2018-08-17T05:35:45+08:00|104.194.100.87|GET /mysqladmin/index.php HTTP/1.1|0.003|404|76|104.128.100.46:8085|0.002|404|-|Mozilla/5.0
    

    其中server1|nginx|access.log是自定义添加的信息(主机、应用名、监听的文件)

    2.编码实现

    2.1.定义配置常量

    public static class Constants {
        public static final String HOST = "host";
        public static final String HOST_DEFAULT = "UNKNOW";
        public static final String APP = "app";
        public static final String APP_DEFAULT = "UNKNOW";
        public static final String FILE_NAME = "fileName";
        public static final String FILE_NAME_DEFAULT = "UNKNOW";
        public static final String DELIMITER = "delimiter";
        public static final String DELIMITER_DEFAULT = "|";
    }
    

    其中host=主机名(默认UNKNOW),app=应用名(默认UNKNOW),fileName=文件名(默认UNKNOW),delimiter=分隔符(默认 "|")

    2.2.定义MyIntercepter实现Intercepter接口

    public class MyIntercepter implements Interceptor {
        private String host;
        private String app;
        private String fileName;
        private String delimiter;
    
        public MyIntercepter(String host, String app, String fileName, String delimiter) {
            this.host = host;
            this.app = app;
            this.fileName = fileName;
            this.delimiter = delimiter;
        }
    
        public void initialize() {
    
        }
    
        public Event intercept(Event event) {
            if (event == null) {
                return null;
            }
            try {
                String line = new String(event.getBody(), Charsets.UTF_8);
                StringBuilder sb = new StringBuilder();
                sb.append(host);
                sb.append(delimiter);
                sb.append(app);
                sb.append(delimiter);
                sb.append(fileName);
                sb.append(delimiter);
                sb.append(line);
                event.setBody(sb.toString().getBytes(Charsets.UTF_8));
                return event;
            } catch (Exception e) {
                return event;
            }
        }
    
        public List<Event> intercept(List<Event> list) {
            List<Event> out = new ArrayList<Event>(list.size());
            for (Event event : list) {
                Event outEvent = intercept(event);
                if (outEvent != null) {
                    out.add(outEvent);
                }
            }
            return out;
        }
    
        public void close() {
    
        }
    }
    

    重写intercept(Event event)和intercept(List<Event> list)方法

    2.3.MyIntercepter内部定义Builder

    /**
     * 获取配置文件
     */
    public static class Builder implements Interceptor.Builder {
    
      private String host;
      private String app;
      private String fileName;
      private String delimiter;
    
      /*
       * @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
       */
      public void configure(Context context) {
        host = context.getString(Constants.HOST, Constants.HOST_DEFAULT);
        app = context.getString(Constants.APP, Constants.APP_DEFAULT);
        fileName = context.getString(Constants.FILE_NAME, Constants.FILE_NAME_DEFAULT);
        delimiter = context.getString(Constants.DELIMITER, Constants.DELIMITER_DEFAULT);
      }
    
      /*
       * @see org.apache.flume.interceptor.Interceptor.Builder#build()
       */
      public Interceptor build() {
        return new MyIntercepter(host, app, fileName, delimiter);
      }
    }
    

    接口中定义了一个内部接口 Builder,在 configure 方法中,进行一些参数
    配置。并给出,在 flume 的 conf 中没配置一些参数时,给出其默认值。通
    过其 builder 方法,返回一个 MyIntercepter对象。

    完整代码如下:

    package me.jinkun.flume;
    
    import com.google.common.base.Charsets;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 自定义插件用于指定host和app
     * a1.sources.r1.interceptors = i1
     * a1.sources.r1.interceptors.i1.type = me.jinkun.flume.MyIntercepter$Builder
     * a1.sources.r1.interceptors.i1.host=server1
     * a1.sources.r1.interceptors.i1.app=nginx
     * a1.sources.r1.interceptors.i1.fileName=access.log
     * a1.sources.r1.interceptors.i1.delimiter=|
     *
     */
    public class MyIntercepter implements Interceptor {
        private String host;
        private String app;
        private String fileName;
        private String delimiter;
    
        public MyIntercepter(String host, String app, String fileName, String delimiter) {
            this.host = host;
            this.app = app;
            this.fileName = fileName;
            this.delimiter = delimiter;
        }
    
        public void initialize() {
    
        }
    
        public Event intercept(Event event) {
            if (event == null) {
                return null;
            }
            try {
                String line = new String(event.getBody(), Charsets.UTF_8);
                StringBuilder sb = new StringBuilder();
                sb.append(host);
                sb.append(delimiter);
                sb.append(app);
                sb.append(delimiter);
                sb.append(fileName);
                sb.append(delimiter);
                sb.append(line);
                event.setBody(sb.toString().getBytes(Charsets.UTF_8));
                return event;
            } catch (Exception e) {
                return event;
            }
        }
    
        public List<Event> intercept(List<Event> list) {
            List<Event> out = new ArrayList<Event>(list.size());
            for (Event event : list) {
                Event outEvent = intercept(event);
                if (outEvent != null) {
                    out.add(outEvent);
                }
            }
            return out;
        }
    
        public void close() {
    
        }
    
        /**
         *
         */
        public static class Constants {
            public static final String HOST = "host";
            public static final String HOST_DEFAULT = "UNKNOW";
            public static final String APP = "app";
            public static final String APP_DEFAULT = "UNKNOW";
            public static final String FILE_NAME = "fileName";
            public static final String FILE_NAME_DEFAULT = "UNKNOW";
            public static final String DELIMITER = "delimiter";
            public static final String DELIMITER_DEFAULT = "|";
        }
    
        /**
         * 获取配置文件
         */
        public static class Builder implements Interceptor.Builder {
    
            private String host;
            private String app;
            private String fileName;
            private String delimiter;
    
            /*
             * @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
             */
            public void configure(Context context) {
                host = context.getString(Constants.HOST, Constants.HOST_DEFAULT);
                app = context.getString(Constants.APP, Constants.APP_DEFAULT);
                fileName = context.getString(Constants.FILE_NAME, Constants.FILE_NAME_DEFAULT);
                delimiter = context.getString(Constants.DELIMITER, Constants.DELIMITER_DEFAULT);
            }
    
            /*
             * @see org.apache.flume.interceptor.Interceptor.Builder#build()
             */
            public Interceptor build() {
                return new MyIntercepter(host, app, fileName, delimiter);
            }
        }
    }
    

    2.4.打成jar文件

    将编写好的代码打成jar文件,并上传到flume/lib目录下


    jar文件
    flume/lib

    3.使用示例

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/soft/nginx-1.14.0/logs/access.log
    
    #自定义拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = me.jinkun.flume.MyIntercepter$Builder
    a1.sources.r1.interceptors.i1.host=server1
    a1.sources.r1.interceptors.i1.app=nginx
    a1.sources.r1.interceptors.i1.fileName=access.log
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = 0.0.0.0
    a1.sinks.k1.port = 41414
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    

    相关文章

      网友评论

          本文标题:Flume之自定义Intercept

          本文链接:https://www.haomeiwen.com/subject/zllzbftx.html