美文网首页大数据精进之路
flume自定义拦截器学习

flume自定义拦截器学习

作者: CarsonCao | 来源:发表于2017-03-28 14:23 被阅读1633次

    备注:本文简单实现了一个计数功能的拦截器,针对每个event用线程安全的AtomicLong类进行计数,并将计数count写入到输出的header中。
    flume版本:1.6
    JDK:1.7

    1编码

    编写拦截器,只需要写一个实现Interceptor接口类,在该类中还要实现一个Builder的静态类,builder类用来实例化interceptor,并将Context实例配置给拦截器。
    在idea中新建项目,依赖包导入:

    Jdk要设置成1.7版本的,否则运行的时候会报错,因为flume1.6是jdk1.7编译的。
    代码如下,实现Interceptor,然后实现静态类Builder:

    package com.open01.flume.interceptors;
    
    import org.apache.flume.interceptor.Interceptor;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.List;
    /**
     * Created by caolch on 2017/3/9.
     */
    public class TestInterceptor implements Interceptor{
        private final String headerKey;
        private static final String CONF_HEADER_KEY = "header";
        private static final String DEFAULT_HEADER = "count";
        private final AtomicLong currentCount;
    
        private TestInterceptor(Context ctx) {
            headerKey = ctx.getString(CONF_HEADER_KEY,DEFAULT_HEADER);
            currentCount = new AtomicLong(0);
        }
    
        @Override
        public void initialize() {
        }
    
        @Override
        public Event intercept(Event event) {
            long count = currentCount.incrementAndGet();
            event.getHeaders().put(headerKey,String.valueOf(count));
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            for (Event e:events) {
                intercept(e);
            }
            return events;
        }
    
        @Override
        public void close() {
        }
        public static class CounterInterceptorBuilder implements Builder {
            private Context ctx;
    
            @Override
            public Interceptor build() {
                return new TestInterceptor(ctx);
            }
    
            @Override
            public void configure(Context context) {
                this.ctx = context;
            }
        }
    }
    
    

    方法intercept(Event event)是具体执行解析的方法,将count自增1,然后写入到该条event的headers中。

    2配置

    编译项目生成jar包,将jar包放入到flume的lib目录下。
    配置conf文件如下:

    为sources指定自定义的拦截器,配置的时候一定要写类的全路径,并且用”$“符号分割加上自定义Builder的类名。
    执行flume的agent,在telnet端输入数据,可以看到每条event的header中都会添加count。

    相关文章

      网友评论

        本文标题:flume自定义拦截器学习

        本文链接:https://www.haomeiwen.com/subject/rqujottx.html