有的时候希望通过Flume将读取的文件再细分存储,比如讲source的数据按照业务类型分开存储,具体一点比如类似:将source中web、wap、media等的内容分开存储;比如丢弃或修改一些数据。就可以考虑使用interceptors。Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。
interceptors 是 source 的拦截器 。主要的作用就是对于一个source 可以指定一个或者多个interceptors 按先后的顺序对数据进行处理 。比如在收集数据的event的hander中加入处理的时间戳,agent的主机或者IP,固定的key-valueg。
常见的interceptors 有: host interceptor,static interceptor,UUID interceptor, timestamp interceptor,morphline interceptor, search and replace interceptor,regex filtering interecptor,regex extractor interceptor
使用 host interceptor 的例子
1.第一步 将所用的flume 自带的拦截器copy出来
复制要用到的拦截器cp -r conf conf_HostInterceptor
备注 :建议在使用flume的时候要复制出来,这样就是每个拦截器都可以有自己的log4j.properties 文件 ,有利于错误日志的排查 。
第二 修改名字 修改配置文件(log4j.properties)
修改名字命令 : cd_conf_HostInterceptor
mv flume-conf.properties.template flume-conf.properties
修改 log4j.properties命令 : vi log4j.properties
第三 添加配置
添加的配置文件
agent1.sources =r1
agent1.sinks=k1
agent1.channels=c1
# Describe/configure the source
agent1.sources.r1.type=netcat
agent1.sources.r1.bind=localhost
agent1.sources.r1.port=44444
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = host
agent1.sources.r1.interceptors.i1.hostHeader =hostname
# Use a channel which buffers eventsinmemory
agent1.channels.c1.type=memory
agent1.channels.c1.capacity=1agent1.channels.c1.transactionCapacity=1
# Bind the source and sink to the channel
agent1.sources.r1.channels=c1
agent1.sinks.k1.channel=c1
# Describe the sink
agent1.sinks.k1.type= logger
第四 启动
启动启动命令:bin/flume-ng agent --conf conf_HostInterceptor/ --conf-file conf_HostInterceptor/flume-conf.properties --name agent1 -Dflume.root.logger=INFO,console
成功的标志第五 克隆这个虚拟机 创建端口:44444,开始发送数据。
开始处理数据
网友评论