备注:flume采集文件到hdfs还报错,后续在更新。
自己还是喜欢+适合用写文章的方式来学习,很就没更新文章了。加油成为更好的自己,努力学习、努力赚钱、努力理财
flume是一个分布式的、高可靠的、高可用的将大批量的不同数据源的日志数据收集、集合、移动到数据中心进行存储的系统。即
日志采集和汇总工具。
一、监控端口数据
案例需求:使用flume监控一个端口号,并打印到控制台
1、cd /usr/flume/conf 在conf目录下新建netcat-flume-logger.conf文件
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
整个配置文件内容分为三个大部分:
- 从整体上描述代理agent中sources、sinks、channels所涉及到的组件;
- 详细描述agent中每一个
source、sink与channel
的具体实现;Source使用了netcat,指定绑定的主机以及端口号;Sink按照logger的方式进行配置 - 通过channel将source与sink连接起来
2、启动Flume
cd /usr/flume,在flume目录下执行命名
bin/flume-ng agent --conf conf --name a1 --conf-file job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
参数说明:
--conf : 表示配置文件存储在conf目录
--name : 表示给agent取一个名字
--conf-file : flume本次启动读取的配置文件是在job文件夹下面的netcat-flume-logger.conf
文件
-Dflume.root.logger=INFO,console : -D
表示flume运行时动态修改flume.root.logger
参数属性值,并将控制台日志打印级别设置为INFO
级别。
日志级别包括:log、info 、warn
3、监听端口并输入一些测试信息
nc localhost 44444
二、实时监控单个追加的文件,并上传到HDFS
案例需求:实时监控hive日志并上传到hdfs
步骤操作分为两步:首先把动态的文件打印到控制台,因为打印到控制台比较熟悉;然后在监控动态的文件把数据传到HDFS
第一步:把动态的文件打印到控制台
1、cd /usr/flume/conf 在conf目录下新建file-flume-logger.conf文件
#name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source config
a1.sources.r1.type = exec
#监控本地文件tail -f,tail -f默认读取文件的后10行。
a1.sources.r1.command = tail -f /home/atguigu/data/flume_test.txt
#channels config
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink cinfig
a1.sinks.k1.type = logger
#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 可自行查看
tail -f
与tail -F
的区别
2 、启动flume
1)cd /usr/flume,在flume目录下执行命名
bin/flume-ng agent --conf conf --conf-file job/file-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console
3、往文件追加内容 ,并输入一些测试信息
echo "bbccdd" >> flume_test.txt
查看flume
第二步:监控动态的文件把数据传到HDFS
有问题,上传不到hdfs,还没找到原因
flume想把数据写到hdfs上,需要持有hadoop相关的jar包。flume启动时会加载lib目录下的所有jar包到环境变量中。
1、cd /usr/flume/conf 在conf目录下新建file-flume-hdfs.conf文件
#1)name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#2)sources configure
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/atguigu/data/flume_test.txt
#3)sinks configure
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata02:9092/flume/events/%y-%m-%d/%H
#文件的前缀
a1.sinks.k1.hdfs.filePrefix = log-
#是否按照时间滚动文件夹,设置为按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#积攒多少个event才刷新到hdfs
a1.sinks.k1.hdfs.batchSize = 10
#文件的压缩格式
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新文件,因为是测试环境,设置为60秒生成一个新文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.hdfs.rollSize = 13421770
#设置文件的滚动与数量无关
a1.sinks.k1.hdfs.rollCount = 0
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#4)channels configure
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#5)bind configure
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
hdfs sink配置参数
参考文章:hdfs sink 使用技巧
1)文件滚动参数设置
如果要配置文件滚动,hdfs.rollInterval
、hdfs.rollSize
、hdfs.rollCount
这三个参数需要一起配置
hdfs.rollInterval :
基于时间间隔来进行文件滚动(多久生成一个新文件),默认是30,即每隔30秒滚动一个文件。0就是不使用这个策略。
hdfs.rollSize :
基于文件大小进行文件滚动,默认是1024,即当文件大于1024个字节时,关闭当前文件,创建新的文件。0就是不使用这个策略。
一般文件大小设置要比块小
hdfs.rollCount
基于event数量进行文件滚动。默认是10,即event个数达到10时进行文件滚动。0就是不使用这个策略。
hdfs.idleTimeout
闲置N秒后,关闭当前文件(去掉.tmp后缀)
2)文件名策略
hdfs.filePrefix
文件前缀,默认是FlumeData
hdfs.fileSuffix
文件后缀,默认没有
三、实时监控文件夹,并上传到HDFS
使用组件:taildir source、memroy channel、hdfs sink
1、cd /usr/flume/conf 在conf目录下新建taildir-file-hdfs.conf文件
#每行注释在代码下方
a3.sources = r3
#定义sources
a3.sinks = k3
#定义sinks
a3.channels = c3
#定义channels
# Describe/configure the source
a3.sources.r3.type = spooldir
#表示定义source的类型是目录类型
a3.sources.r3.spoolDir = /home/study/hive_project/logstart
#定义监控目录的具体位置
a3.sources.r3.fileSuffix = .COMPLETED
#文件上传完毕后的后缀
a3.sources.r3.fileHeader = true
#表示是否有文件头
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
#忽略所有以tmp结尾的文件,不上传
# Describe the sink
a3.sinks.k3.type = hdfs
#sink的类型是hdfs
a3.sinks.k3.hdfs.path = /origin_data/mall/logstart/%Y-%m-%d
#数据目的地的具体路径
a3.sinks.k3.hdfs.filePrefix = logstart-
#上传的文件以logstart前缀
a3.sinks.k3.hdfs.round = true
#是否按照时间滚动文件
a3.sinks.k3.hdfs.roundValue = 1
#多少时间单位创建一个新的文件
a3.sinks.k3.hdfs.roundUnit = hour
#时间的单位 小时
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#是否使用本地的时间戳
a3.sinks.k3.hdfs.batchSize = 100
#积累了多少event才能刷写到hdfs一次
a3.sinks.k3.hdfs.fileType = DataStream
#设置文件类型
a3.sinks.k3.hdfs.rollInterval = 600
#多久生成新文件
a3.sinks.k3.hdfs.rollSize = 134217700
#生成多大的新文件
a3.sinks.k3.hdfs.rollCount = 0
#多少enevt生成新文件
a3.sinks.k3.hdfs.minBlockReplicas = 1
#多少副本数
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
#表示a3的类型是channel类型是menory(内存)类型
a3.channels.c3.capacity = 1000
#表示a3的channel的总容量为1000个enevt
a3.channels.c3.transactionCapacity = 100
#表示a3的channel在传输的时候收集到100个enevt再去提交事务
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
#表示把r3和c3连接起来
a3.sinks.k3.channel = c3
#表示将K3和c3连接起来
2 、启动flume
cd /usr/flume,在flume目录下执行命名
bin/flume-ng agent --conf conf --conf-file job/taildir-file-hdfs.conf --name a3
3、查看采集信息
1)监控文件夹
2)HDFS上传数据
网友评论