美文网首页大数据精进之路
flume自定义拦截器学习

flume自定义拦截器学习

作者: CarsonCao | 来源:发表于2017-03-28 14:23 被阅读1633次

备注:本文简单实现了一个计数功能的拦截器,针对每个event用线程安全的AtomicLong类进行计数,并将计数count写入到输出的header中。
flume版本:1.6
JDK:1.7

1编码

编写拦截器,只需要写一个实现Interceptor接口类,在该类中还要实现一个Builder的静态类,builder类用来实例化interceptor,并将Context实例配置给拦截器。
在idea中新建项目,依赖包导入:

Jdk要设置成1.7版本的,否则运行的时候会报错,因为flume1.6是jdk1.7编译的。
代码如下,实现Interceptor,然后实现静态类Builder:

package com.open01.flume.interceptors;

import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
/**
 * Created by caolch on 2017/3/9.
 */
public class TestInterceptor implements Interceptor{
    private final String headerKey;
    private static final String CONF_HEADER_KEY = "header";
    private static final String DEFAULT_HEADER = "count";
    private final AtomicLong currentCount;

    private TestInterceptor(Context ctx) {
        headerKey = ctx.getString(CONF_HEADER_KEY,DEFAULT_HEADER);
        currentCount = new AtomicLong(0);
    }

    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        long count = currentCount.incrementAndGet();
        event.getHeaders().put(headerKey,String.valueOf(count));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event e:events) {
            intercept(e);
        }
        return events;
    }

    @Override
    public void close() {
    }
    public static class CounterInterceptorBuilder implements Builder {
        private Context ctx;

        @Override
        public Interceptor build() {
            return new TestInterceptor(ctx);
        }

        @Override
        public void configure(Context context) {
            this.ctx = context;
        }
    }
}

方法intercept(Event event)是具体执行解析的方法,将count自增1,然后写入到该条event的headers中。

2配置

编译项目生成jar包,将jar包放入到flume的lib目录下。
配置conf文件如下:

为sources指定自定义的拦截器,配置的时候一定要写类的全路径,并且用”$“符号分割加上自定义Builder的类名。
执行flume的agent,在telnet端输入数据,可以看到每条event的header中都会添加count。

相关文章

  • 自定义flume拦截器-练习1

    参考文章1:Flume 自定义 Interceptor(拦截器)[https://www.cnblogs.com/...

  • flume:一个例子的分析(二)

    在 上篇中,flume 使用的是自定义拦截器:LogAnalysisInterceptor ,下面看下代码:

  • 【flume2】Flume拦截器

    Flume拦截器 1)拦截器注意事项项目中自定义了:ETL拦截器。采用两个拦截器的优缺点:优点,模块化开发和可移植...

  • flume自定义拦截器,步骤详解

    文章还没写好,哈哈哈。先别看喔-------------------- 一、flume自定义拦截器步骤如下 1)继...

  • 092-BigData-20Flume拦截器

    上一篇:091-BigData-19Flume与Flume之间数据传递 一、Flume拦截器 时间戳拦截器 Tim...

  • flume自定义拦截器学习

    备注:本文简单实现了一个计数功能的拦截器,针对每个event用线程安全的AtomicLong类进行计数,并将计数c...

  • flume 到 kafka 多个topic

    1. 日志数据分多个topic 2.2 实现 2.2.1 自定义拦截器 2.2.2 编写flume1的配置文件 2...

  • 6、Flume拦截器_Flume自定义拦截器

    1.背景介绍 Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有各种自带的拦截器...

  • flume自定义拦截器

    案例需求: 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后...

  • 自定义flume拦截器

    需求:日志目录是 /data/logs/{instanceId}/xxx.log, 保证倒数第二级为 实例ID。f...

网友评论

    本文标题:flume自定义拦截器学习

    本文链接:https://www.haomeiwen.com/subject/rqujottx.html