概要
Flume是一个高可用的,分布式的实时的日志采集系统。
Flume分为三个组件,Ource组件,负责信息的采集,并将采集的信息发送诶Channel。Channel组件提供临时的存储,保存Source组件发送过来的信息,Sink负责读取Channel中临时存储的信息,保存到HDFS等,并删除Channel中的临时信息。
Source, Channel, Sink称为一个Agen。架构如下图所示:

Source组件官方提供了好多个:
Avro Source: 监听Avro端口并接受客户端的事件流。
JMS Source: 从JMS采集数据
Kafka Source, NetCat TCP Source , Syslog TCP Source,HTTP Source等等,还有好多具体可以看官网
sink作为将数据存储到目的地,官方提供了HDFS Sink, Hive Sink, Logger Sink, Avro Sink, Thrift Sink, HBaseSink, Kafka Sink等等。
Flume Channels作为一个临时存储容器,官方提供了Memory Channel内存通道这个是最常用的,JDBC Channel存储到关系型数据库,Kafka Channel, File Channel等
如果官方提供的agent组件不满足需求,可以自定义组件‘’
配置
下载Flume,并解压到tar包。 tar -zxvf apache-flume-1.7.0-bin.tar.gz -C 目录
环境变量可以配置,也可以不配置,我没有配置,执行的时候只需要到其bin目录下即可
例子
- 一个简单的例子
首先我们先按照官方给的例子,来监听一下本地的某一个端口,并将数据打印到命令行:
官方文档
# 这个Agent的名字,包含3个组件,source,channel和sink
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#source组件监听本地网络端口号444444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink sink组件只是打印出来
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory 缓存组件
#定义缓存的类型为内存
a1.channels.c1.type = memory
#最大存储量
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel 组装起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
将此文件放到flume的解压目录下,为了好管理,新建了一个文件夹agent,然后 vi a1.conf 将代码保存a1.conf中。
进到flume下,执行 :
./bin/flume-ng agent --conf conf
--conf-file myagent/a1.conf --name a1 -Dflume.root.logger=INFO,console
先看下面的已经监听了本地的端口了,

在浏览器里面访问以下44444端口,http://localhost:44444/test?test 将会有日志打出:

- 我们在写一个稍微复杂点的例子,监听本地的一个目录,并将日志保存到hdfs上:
#定义agent名, source、channel、sink的名称
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#具体定义source
a2.sources.r1.type = spooldir
a2.sources.r1.spoolDir = /root/temp/logs
#具体定义channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#具体定义sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://192.168.246.111:9000/flume/test
a2.sinks.k1.hdfs.filePrefix = events-
a2.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a2.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a2.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件达到60秒生成一个文件
a2.sinks.k1.hdfs.rollInterval = 60
#组装source、channel、sink
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
监控/root/temp/logs 这个目录下的所有的文件,并保存到flume文件夹下面hdfs://192.168.157.111:9000/flume/test,启动:
./bin/flume-ng agent -n a2 -f myagent/a2.conf -c conf -Dflume.root.logger=INFO,console

启动之后,我们领启动一个命令行,在/root/temp/logs 创建一个文件,test.txt,保存内容为 Hello world!flume会采集日志到hdfs的flume/test文件夹下。
欢迎关注我的微信公众号: manong_xiaodong

网友评论