一、概述
- Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,通常用来做日志的采集。
- Flume的数据源非常丰富,可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中;
- Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景;

二、原理


三、安装配置
启动 Flume
bin/flume-ng agent -n agent -c ./conf -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console
编写启动脚本并指定日志输出
nohup /usr/libra/flume/bin/flume-ng agent -n agent -c /usr/libra/flume/conf -f /usr/libra/flume/conf/kafka-flume-hdfs.properties -Dflume.root.logger=INFO,console >/usr/libra/flume/logs/flume.log 2>&1 &vi
cdh 版本下载
flume-ng-1.5.0-cdh5.3.6.tar.gz
在下面的网站中找到对应版本得到字符串填入上述链接中即可下载对应的版本
http://archive.cloudera.com/cdh5/cdh/5/
四、拦截器
-
Timestamp Interceptor
:在event的header中添加一个key叫:timestamp:value
为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到 -
Host Interceptor
:在event的header中添加一个key叫:host:value
为当前机器的hostname或者ip。 -
Static Interceptor
:可以在event的header中添加自定义的key:value
。 -
Regex Filtering Interceptor
:通过正则来清洗或包含匹配的events。 -
Regex Extractor Interceptor
:通过正则表达式来在header中添加指定的key:value
则为正则匹配的部分 - 自定义拦截器
五、Channel Selectors
-
Replicating Channel Selector
(default): Replicating 会将source过来的events发往所有channel -
Multiplexing Channel Selector
: 这两种selector的区别是: ,而Multiplexing 可以选择该发往哪些channel
六、特殊需求
- 前一段时间上线除了个事故,问题在于 flume 写入 HDFS 的时候产生大量的小文件,基本上2 个 json 就会生成一个文件,看文件的时候惊讶道我了,一个文件大部分都小于 3kb,造成存储系统的 NameNode 压力暴增,不断的 GC,告警。。。
- 事故就说到这里,问题来了:奈这么多的小文件何?就想怎么能够不让出现这么多的小文件?
- 于是开始探寻之路,到 flume 的官网,发现了几个参数:
参数 默认值 说明 hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval) hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size) hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events) hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files) - 咋一看,貌似能解决我的问题,但是老大发话了,一天写一个文件,于是再看看这几个参数,也翻了一下其他的,表示没有找到
- 到网上一搜,出现 0 点滚动文件的标题,于是点进去看了看,这个应该可以,打开看了看发现是修改源码的,于是就开始下载源码开始了修改源码的道路。。。
- 修改源码💞好累,就没有其他办法了吗?最后还是搞了出来:把这四个参数全部关闭,然后配置写入文件的分区路径按照时间分区,问题解决了,只要时间(分区就是按照业务时间分区的)变化,那么就会切换到新的分区中。终于告一段落
- 但是,紧接着发现一个问题:这个分区中的文件始终不会关闭,即文件的后缀始终是
.tmp
,但是不用紧张,只需要打开hdfs.idleTimeout这个属性即可,可以设置为 10 分钟,那么 10 分钟内不写文件就会关闭这个文件。
- 于是开始探寻之路,到 flume 的官网,发现了几个参数:
- 事故就说到这里,问题来了:奈这么多的小文件何?就想怎么能够不让出现这么多的小文件?
- 业务服务已然上线,但是数据服务由于各种问题,导致不能正常上线,但是软终端的领导撂话:数据不能丢,但是我的 Flume 还有问题没有解决就不能冒然启动,否则会造成各种问题。但是 Flume 的上游是 kafka,kafka 可以保留最近 7 天的数据,也就是说,我们还有 7 天的缓冲期,当然了不能死扣着这个 7 天,尽早上线为好
- 问题就说到这里,那么如何解决历史数据问题呢
网友评论