flume作用
- 从磁盘采集文件发送到HDFS
- 数据采集来源:系统日志文件、Python爬虫数据、端口数据
- 数据发送目标:HDFS、Kafka
flume 组成
- agent 是一个独立的Flume进程,包含组件Source、Channel、Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。)
- source Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中
- channel Channel是一个缓冲区,它将保存事件直到Sink处理完该事件
- sink Sink负责持久化日志或者把事件推向另一个Source。
- Client:Client生产数据,运行在一个独立的线程。
- Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
- Flow: Event从源点到达目的点的迁移的抽象。
flume 读取源
- Taildir 本地目录数据
- Avro 微型rpc框架,用来flume和flume对接。
- Kafka
- NetCat linux上的一个通讯工具,
nc
命令 - Exec 命令行,例如 tail -f 数据
flume 发送源
- Logger 控制台,用来调试较多
- Avro 微型rpc框架,用来flume和flume对接。
- Kafka
flume channel
- Memory
- File
- kafka
- JDBC
flume 安装
- flume1.9.0下载链接
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/flume
cp flume-env.sh.template flume-env.sh
- 添加环境变量:
vim /etc/profile
添加内容后source ~/.bashrc
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
- 查看flume 版本:
flume-ng version
案例一:监听端口,通过控制台输出
- 创建agent配置文件
touch /usr/local/job/netcat-flume-logger.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动agent
flume-ng agent -n a1 -/usr/local/flume/conf/nf -f /usr/local/flume/job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
- 配置flume source
yum -y install netcat
nc localhost 44444
输入内容后发送,在flume控制台,查看接收到的内容是否一致。
案例二:监控单个文件追加内容,通过控制台输出
- 创建agent配置文件
touch /usr/local/job/file-flume-logger.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command= tail -f /var/log/hive.log
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动agent:
flume-ng agent -n a1 -c /usr/local/flume/conf -f /usr/local/flume/job/file-flume-logger.conf -Dflume.root.logger=INFO,console
- 往日志文件写入数据进行测试
案例三:监控单个动态变化的文件,输出到hdfs
-
添加如下jar包到flume的lib目录
依赖包 - 创建agent配置文件
touch /usr/local/job/file-flume-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -f /var/log/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop01:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件,这里是30秒才会滚动一个文件到hdfs中,不到30秒是临时文件
a2.sinks.k2.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
- 启动agent:
flume-ng agent -n a2 -c /usr/local/flume/conf -f /usr/local/flume/job/file-flume-hdfs.conf
案例四:监控目录内的新文件,输出到hdfs
-
创建agent配置文件
touch /usr/local/flume/job/dir-flume-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /home/hadoop/upload
a2.sources.r2.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop01:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
- 启动agent:
flume-ng agent -n a2 -c /usr/local/flume/conf -f /usr/local/flume/job/dir-flume-hdfs.conf
网友评论