美文网首页大数据flume玩转大数据
Flume学习系列(四)---- Interceptors(拦截

Flume学习系列(四)---- Interceptors(拦截

作者: 小北觅 | 来源:发表于2018-08-21 09:40 被阅读80次

    前言:flume通过使用Interceptors(拦截器)实现修改和过滤事件的功能。举个栗子,一个网站每天产生海量数据,但是可能会有很多数据是不完整的(缺少重要字段),或冗余的,如果不对这些数据进行特殊处理,那么会降低系统的效率。这时候拦截器就派上用场了。


    一、flume内置的拦截器

    先列个flume内置拦截器的表:


    table1.png

        由于拦截器一般针对Event的Header进行处理,那我先介绍一Event吧。Event结构如下图:


    001.jpg
    • event是flume中处理消息的基本单元,由零个或者多个header和正文body组成。
    • Header 是 key/value 形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和 HTTP 头一样提供相同的功能——通过该方法来传输正文之外的额外信息。
    • Body是一个字节数组,包含了实际的内容。
    • flume提供的不同source会给其生成的event添加不同的header。

        这些拦截器实际都是往Event的header里插数据,比如Timestamp Interceptor拦截器就是可以往event的header中插入关键词为timestamp的时间戳。

    1.1 timestamp拦截器

    下面是官网的timestamp的配置:

    a1.sources = r1
    a1.channels = c1
    a1.sources.r1.channels =  c1
    a1.sources.r1.type = seq
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
    

    测试例子:
        在flume的conf目录下新建timestamp.conf文件,输入以下代码:

    #配置文件:timestamp.conf
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    # Describe/configure the source
    a1.sources.r1.type = syslogtcp
    a1.sources.r1.port = 50000
    a1.sources.r1.host = 0.0.0.0
    a1.sources.r1.channels = c1
     
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.preserveExisting= false
    a1.sources.r1.interceptors.i1.type = timestamp
     
     
    # Describe the sink
    a1.sinks.k1.type = logger
    a1.sinks.k1.channel = c1
     
    # Use a channel which buffers events inmemory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    

        打开一个终端,进入flume的bin目录,输入./flume-ng agent -c ../conf -f ../conf/timestamp.conf -Dflume.root.logger=INFO,console -n a1
        启动成功后,flume开始监控本主机上的所有IP地址,再开一个终端,向50000端口发TCP数据,命令如下:echo "TimestampInterceptor" | nc 127.0.0.1 50000
        此时,flume的终端会接收到这个消息,如下:

    002.png
    我们可以清楚地看到header中有timestamp的影子。成功了。 003.jpg

    1.2 host拦截器

        再说一个host interceptor,该拦截器可以往event的header中插入关键词默认为host的主机名或者ip地址(注意是agent运行的机器的主机名或者ip地址)。官网配置如下:

    a1.sources = r1
    a1.channels = c1
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = host
    

    测试例子

    #配置文件:host.conf
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    # Describe/configure the source
    a1.sources.r1.type = syslogtcp
    a1.sources.r1.port = 50000
    a1.sources.r1.host = 0.0.0.0
    a1.sources.r1.channels = c1
     
    a1.sources.r1.interceptors = i2
    # a1.sources.r1.interceptors.i1.preserveExisting= false
    # a1.sources.r1.interceptors.i1.type =timestamp
    a1.sources.r1.interceptors.i2.type = host
    a1.sources.r1.interceptors.i2.hostHeader =hostname
    a1.sources.r1.interceptors.i2.useIP = false
     
    # Describe the sink
    a1.sinks.k1.type = logger
    a1.sinks.k1.channel = c1
     
    # Use a channel which buffers events inmemory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    

        进入flume的bin目录,执行./flume-ng agent -c ../conf -f ../conf/host.conf -Dflume.root.logger=INFO,console -n a1
    然后在新的终端输入echo "HostInterceptor" | nc 127.0.0.1 50000
        结果如下:

    004.jpg
        可以发现header中增加了一个hostname的值:192.168.1.105。使用ifconfig命令,正是我电脑的IP地址,因为flume的agent运行在这个IP上嘛。

    1.3 Regex Filtering Interceptor拦截器

        还有比较重要的Regex Filtering Interceptor,Regex Filtering Interceptor拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件。常用于数据清洗,通过正则表达式把数据过滤出来。

    测试例子:

    #配置文件:regex_filter.conf
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    # Describe/configure the source
    a1.sources.r1.type = syslogtcp
    a1.sources.r1.port = 50000
    a1.sources.r1.host = 0.0.0.0
    a1.sources.r1.channels = c1
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type =regex_filter
    #全部是数字的数据
    a1.sources.r1.interceptors.i1.regex =^[0-9]*$
    #排除符合正则表达式的数据
    a1.sources.r1.interceptors.i1.excludeEvents =true
     
    # Describe the sink
    a1.sinks.k1.type = logger
     
    # Use a channel which buffers events inmemory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
     
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

        这样的配置将会过滤掉所有纯数字的数据。
        进入bin目录启动flume,执行./flume-ng agent -c ../conf -f ../conf/regex_filter.conf -Dflume.root.logger=INFO,console -n a1
        新开终端输入
    echo "1234" | nc 127.0.0.1 50000
    echo "1234" | nc 127.0.0.1 50000
    echo "1234" | nc 127.0.0.1 50000
        在flume的终端会输出如下内容,但是没有数据,因为被我们过滤掉了。

    005.png

    二、总结

        本文介绍了flume中的interceptor拦截器,并针对具体的三个拦截器进行了测试,验证其功能。后面我们还会编写符合自己需求的自定义拦截器。敬请期待。

    相关文章

      网友评论

        本文标题:Flume学习系列(四)---- Interceptors(拦截

        本文链接:https://www.haomeiwen.com/subject/ghldiftx.html