美文网首页
flume 拦截器 interceptors

flume 拦截器 interceptors

作者: 祎休 | 来源:发表于2017-10-09 23:45 被阅读0次

有的时候希望通过Flume将读取的文件再细分存储,比如讲source的数据按照业务类型分开存储,具体一点比如类似:将source中web、wap、media等的内容分开存储;比如丢弃或修改一些数据。就可以考虑使用interceptors。Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。

interceptors 是 source 的拦截器 。主要的作用就是对于一个source 可以指定一个或者多个interceptors 按先后的顺序对数据进行处理 。比如在收集数据的event的hander中加入处理的时间戳,agent的主机或者IP,固定的key-valueg。

常见的interceptors  有: host interceptor,static interceptor,UUID interceptor, timestamp interceptor,morphline interceptor, search and replace interceptor,regex filtering interecptor,regex extractor interceptor

使用 host interceptor 的例子

1.第一步 将所用的flume 自带的拦截器copy出来

复制要用到的拦截器

 cp -r  conf conf_HostInterceptor

备注 :建议在使用flume的时候要复制出来,这样就是每个拦截器都可以有自己的log4j.properties  文件  ,有利于错误日志的排查 。

第二  修改名字  修改配置文件(log4j.properties)

修改名字

 命令 : cd_conf_HostInterceptor

              mv flume-conf.properties.template  flume-conf.properties

修改 log4j.properties

       命令 : vi log4j.properties

  第三  添加配置


    添加的配置文件

 agent1.sources =r1

agent1.sinks=k1

agent1.channels=c1

# Describe/configure the source

agent1.sources.r1.type=netcat

agent1.sources.r1.bind=localhost

agent1.sources.r1.port=44444

agent1.sources.r1.interceptors = i1

agent1.sources.r1.interceptors.i1.type = host

agent1.sources.r1.interceptors.i1.hostHeader =hostname

# Use a channel which buffers eventsinmemory

agent1.channels.c1.type=memory

agent1.channels.c1.capacity=1agent1.channels.c1.transactionCapacity=1

# Bind the source and sink to the channel

agent1.sources.r1.channels=c1

agent1.sinks.k1.channel=c1

# Describe the sink

agent1.sinks.k1.type= logger

第四 启动

启动

启动命令:bin/flume-ng agent --conf conf_HostInterceptor/  --conf-file conf_HostInterceptor/flume-conf.properties --name agent1  -Dflume.root.logger=INFO,console

成功的标志

第五 克隆这个虚拟机 创建端口:44444,开始发送数据。

  
开始处理数据

相关文章

网友评论

      本文标题:flume 拦截器 interceptors

      本文链接:https://www.haomeiwen.com/subject/owqmyxtx.html