flume安装比较简单,直接解压就好。
注意点:
1,flume必须持有hadoop相关的包才能将数据输出到hdfs, 将如下包上传到flume/lib下
涉及到的包如下, 以hadoop-2.9.2为例:
commons-configuration-1.6.jar
commons-io-2.4.jar
hadoop-auth-2.9.2.jar
hadoop-common-2.9.2.jar
hadoop-hdfs-2.9.2.jar
hadoop-hdfs-client-2.9.2.jar
htrace-core4-4.1.0-incubating.jar
stax2-api-3.1.4.jar
woodstox-core-5.0.3.jar
2,修改/etc/hosts, 加入hadoop的地址
3,暂不支持snappy的压缩形式
官网:File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
配置文件内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /script/flume/logdata/random_log.log
a1.sources.r1.headers.g1.x = y
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = filepath
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = timestamp
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop1:9000/flumedata/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = cc-log-
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k1.hdfs.useLocalTimeStamp = false
启动:
bin/flume-ng agent -c conf -f /script/flume/conf/titan_flumn.conf -n a1 -Dflume.root.logger=DEBUG,console
多目录日志文件采集配置:
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2
配置了两个source根据不同的路径采集文件
a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /script/flume/logdata/random_log.log
a1.sources.r1.headers.g1.x = y
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = filepath
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = timestamp
a1.sources.r1.channels = c2
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g2
a1.sources.r1.filegroups.g2 = /script/flume/logdata/random_log_b.log
a1.sources.r1.headers.g2.x = y
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = filepath
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = timestamp
channel也是如此
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 1000
sink也是如此
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop1:9000/flumedata/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = cc-log-
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = false
a1.sinks.k1.channel = c2
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop1:9000/flumedata_b/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = cc-log-
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = false
网友评论