1、需求1:采集指定文件内容到HDFS
技术选型:exec --- memory --- hdfs
agent的写法(官网上查对应的) vi exec-hdfs.conf
#agent_name
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#source的配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop001:9000/data/flume/tail
#channel的配置
a1.channels.c1.type = memory
#用channel链接source和sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel =c1
运行
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/exec-memory-hdfs.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
2、小文件处理
以上案例运行完后发现hdfs中产生了很多小文件,由于hdfs的block大小默认是128M,这样就造成了空间浪费
通过修改参数 :三个参数当满足其中一项时进行回滚操作,通常情况下三者搭配使用
rollInterval = 0 :滚动当前文件前等待的秒数,设置为零不根据时间间隔滚动
rollSize = 10485760 :根据文件大小触发滚动,以字节为单位,设置零表示不根据大小来触发滚动
rollCount = 0 :根据event个数来触发滚动,设置零表示不根据event个数来触发
3、需求2:采集指定文件夹的内容到控制台
选型:spooling - memory - logger
#agent_name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#configure the sources
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/tmp/flume
a1.sources.r1.fileHeader = true
#configure the sinks
a1.sinks.k1.type = logger
#configure the channels
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行:
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/spooling-memory-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
然后在~/tmp/flume目录下执行 cp ~/data/ruozeinput.txt 1 、 cp ~/data/ruozeinput.txt 2 。。。。
在控制台就可以看到有信息输出
4、需求3 采集指定文件夹和文件的内容===>tailDir(生产上95%以上)
关键词:filegroups :以空格分隔的文件组列表。每个文件组表示要跟踪的一组文件。
filegroups.<filegroupName>:文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名
positionFile:~/.flume/taildir_position.json JSON格式的文件,以记录每个尾随文件的偏移量,支持断点续传
选型:taildir - memory - logger
配置文件:
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile=/home/hadoop/tmp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=//home/hadoop/tmp/flume//test1/example.log
a1.sources.r1.headers.f1.headerKey1=value1
a1.sources.r1.filegroups.f2=//home/hadoop/tmp/flume//test2/.*log.*
a1.sources.r1.headers.f2.headerKey1=value2
a1.sources.r1.headers.f2.headerKey2=value2-2
#configure the sinks
a1.sinks.k1.type = logger
#configure the channels
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行命令
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/taildir-memory-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=http \
-Dflume.monitoring.port=34343
查看偏移量 cat ~/tmp/position/taildir_position.json
坑 Apache flume 1.6版本没有偏移量这个功能
网友评论