为什么要有Flume?
大量的日志在不同的服务器上,要对这些日志进行分析的话,需要通过编写脚本将这些日志文件传输到HDFS上,太麻烦了。
什么是Flume?
Flume基于流式架构的是一个分布式海量日志采集、传输系统,具有容错性强、可靠、灵活、简单等优点在。
Flume可以采集文件,Socket数据包等各种形式的源数据,将这些数据输出到HDFS、HBase等存储系统,或传输到Spark、Storm等进行实时分析。
说白就就是通过监控某个东西,然后设置一堆的条件,达到条件就传输到指定的地方。
Flume最核心的就是agent,每一个agent相当于数据传递员。
![](https://img.haomeiwen.com/i15547715/f769836931906d17.png)
Agent组成
Source
数据源的产生地,同时Source会将产生的数据流传输到Channel。
就是要数据产生源头,可以是端口、文件、文件夹等,端口的话则可以监控访问(进出)的日志等,文件则可以通过配置,文件达到了一定的大小就上传到那里去等…
Sink
从Channel中收集数据,将数据写出到目的地,目的地可以是下一个Source或Hdfs、Storm、Spark等…
Channel
就是Source和Sink的传输通道。
传输过程
![](https://img.haomeiwen.com/i15547715/7cf7b1cb1ba7ef79.png)
案例
官方案例:http://flume.apache.org/
基础配置
flume-env.sh
telnet案例
$ vi flume-telnet. conf
#内部调用命名,多个用空格隔开
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#源数据监控类型
a1.sources.r1.type = netcat
#监控地址
a1.sources.r1.bind = localhost
#端口号
a1.sources.r1.port = 44444
#输出类型
a1.sinks.k1.type = logger
#传输通道
a1.channels.c1.type = memory
#通道最大Event数量
a1.channels.c1.capacity = 1000
#通道数量达到100刷新出去
a1.channels.c1.transactionCapacity = 100
#绑定使用的通道
a1.sources.r1.channels = c1
$ bin/flume-ng agent --conf conf/ --name a1--conf-file conf/flume-telnet.conf -Dflume.root.logger==INFO,console
监听文件案例
$ vi flume-hadoop.properties
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
#设置类型为执行
a2.sources.r2.type = exec
#要执行的命令
a2.sources.r2.command = tail -f /opt/module/apache-flume-1.5.0-cdh5.3.6-bin/test.log
#shell执行
a2.sources.r2.shell = /bin/bash -c
#目的地类型
a2.sinks.k2.type = hdfs
#目的地址
a2.sinks.k2.hdfs.path =hdfs://hadoop-senior00-levi.com:8082/flume/%Y%m%d/%H
#文件的前缀
a2.sinks.k2.hdfs.filePrefix = my-log-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#上传的时间
a2.sinks.k2.hdfs.roundUnit = minute
#使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#缓存中最大的事件存放数量则刷新到hdfs
a2.sinks.k2.hdfs.batchSize = 1000
#文件类型
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 1024
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events inmemory
#通道类型,缓存
a2.channels.c2.type = memory
#在通道中最大的事件数量
a2.channels.c2.capacity = 1000
#从源接收或发送给接收方的事件的最大数量
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
$ bin/flume-ng agent --conf conf/ --name a2--conf-file conf/flume- hadoop.conf -Dflume.root.logger==INFO,console
监听文件文件案例
$ vi flume-dir.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
#设置类型为文件夹
a2.sources.r2.type = spooldir
#要执行的命令
a2.sources.r2.spoolDir =/opt/module/apache-flume-1.5.0-cdh5.3.6-bin/testdir
#文件
a2.sources.r2.fileHeader = true
#忽略上传的文件,因为正在操作的文件一般后缀为.tmp
a3.sources.r3.ignorePattern = ([^]*\.COMPLETED)
#目的地类型
a2.sinks.k2.type = hdfs
#目的地址
a2.sinks.k2.hdfs.path =hdfs://hadoop-senior00-levi.com:8082/flume/dir2/%Y%m%d/%H
#文件的前缀
a2.sinks.k2.hdfs.filePrefix = my-log-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#上传的时间
a2.sinks.k2.hdfs.roundUnit = minute
#使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#缓存中最大的事件存放数量则刷新到hdfs
a2.sinks.k2.hdfs.batchSize = 1000
#文件类型
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 1024
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events inmemory
#通道类型,缓存
a2.channels.c2.type = memory
#在通道中最大的事件数量
a2.channels.c2.capacity = 1000
#从源接收或发送给接收方的事件的最大数量
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
$ bin/flume-ng agent --conf conf/ --name a2--conf-file conf/flume- dir.conf -Dflume.root.logger==INFO,console
备注:
1.不要再监控目录下创建并持续修改文件
2.上传完成的文件会以.COMPLETED结束
3.被监控的文件夹每600毫秒扫描一次
4.只要不同时运行,agent的名重复都可以,同时运行就要重命名为,如a2/a3等不同名,同一个配置文件里面有多个就用空格隔开(a1 a2 a3)
网友评论