6、Flume拦截器_Flume自定义拦截器

作者: 非勤能补拙 | 来源:发表于2019-07-05 00:16 被阅读0次

    1.背景介绍

    Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有各种自带的拦截器,比如:TimestampInterceptor、HostInterceptor等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的Flume处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理。

    2.自定义拦截器

    根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。

    3.功能实现

    1.定义一个类CustomParameterInterceptor实现Interceptor接口。
    2.在CustomParameterInterceptor类中定义变量,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)、多个下标使用的分隔符(indexs_separator)。
    3.添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串。
    4.写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理。
    5.接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。
    6.定义一个静态类,类中封装MD5加密方法


    image.png

    7.自定义拦截器的代码开发已完成,然后打包成jar, 放到Flume的根目录下的lib中
    8.修改Flume的配置信息

    新增配置文件spool-interceptor-hdfs.conf,内容为:
    a1.channels = c1
    a1.sources = r1
    a1.sinks = s1
    
    #channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity=100000
    a1.channels.c1.transactionCapacity=50000
    
    #source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/data/
    a1.sources.r1.batchSize= 50
    a1.sources.r1.inputCharset = UTF-8
    
    a1.sources.r1.interceptors =i1 i2
    a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
    a1.sources.r1.interceptors.i1.fields_separator=\\u0009
    a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
    a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
    a1.sources.r1.interceptors.i1.encrypted_field_index =0
    
    a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    
    
    #sink
    a1.sinks.s1.channel = c1
    a1.sinks.s1.type = hdfs
    a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d
    a1.sinks.s1.hdfs.filePrefix = event
    a1.sinks.s1.hdfs.fileSuffix = .log
    a1.sinks.s1.hdfs.rollSize = 10485760
    a1.sinks.s1.hdfs.rollInterval =20
    a1.sinks.s1.hdfs.rollCount = 0
    a1.sinks.s1.hdfs.batchSize = 1500
    a1.sinks.s1.hdfs.round = true
    a1.sinks.s1.hdfs.roundUnit = minute
    a1.sinks.s1.hdfs.threadsPoolSize = 25
    a1.sinks.s1.hdfs.useLocalTimeStamp = true
    a1.sinks.s1.hdfs.minBlockReplicas = 1
    a1.sinks.s1.hdfs.fileType =DataStream
    a1.sinks.s1.hdfs.writeFormat = Text
    a1.sinks.s1.hdfs.callTimeout = 60000
    a1.sinks.s1.hdfs.idleTimeout =60
    

    9启动:

    bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console   
    

    相关文章

      网友评论

        本文标题:6、Flume拦截器_Flume自定义拦截器

        本文链接:https://www.haomeiwen.com/subject/smcjhctx.html