Flume实践
实例一:
单机,监控指定端口,输出到控制台
一、步骤:
- 1.编辑配置文件
- 2.启动flume
- 3.登录指定主机,指定端口,发送数据
- 4.查看控制台输出
二、过程记录
1.example.conf内容
# 给agent的三个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink
a1.sinks.k1.type = logger
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source、sink与channel之间的绑定连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.启动flume
命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
3.登录主机发送数据
![](https://img.haomeiwen.com/i3023966/52a425ec0c4603af.png)
4.查看控制台输出
![](https://img.haomeiwen.com/i3023966/ebffe6bc95dd5b23.png)
实例二:
示意图:
![](https://img.haomeiwen.com/i3023966/564211f5e4252c2f.png)
h1和h2监听指定端口的http请求,将数据发送给h3,h3把数据发送到HDFS
一、步骤:
- 1.分别编辑h1,h2,h3配置文件
- 2.分别启动三台机器的flume
- 3.发送http请求给h1,h2
- 4.查看HDFS目录
二、过程记录
1.h1和h2配置文件
# 给agent的三个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = http
a1.sources.r1.port = 8888
# 配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h3
a1.sinks.k1.port = 4141
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source、sink与channel之间的绑定连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.h3配置文件
# agent的三个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://h1:9000/flumeData
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source、sink与channel之间的绑定连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.分别启动flume
h3命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
h1和h2命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
4.发送http请求给h1,h2
![](https://img.haomeiwen.com/i3023966/058a06b6cdc51704.png)
![](https://img.haomeiwen.com/i3023966/b96b45bc2758ee39.png)
5.查看HDFS目录文件内容
![](https://img.haomeiwen.com/i3023966/a4e6de5fe2026e8f.png)
实例三:
示例图:
![](https://img.haomeiwen.com/i3023966/236da5d843ab373e.png)
h1数据源:监听指定文件内容的变化。
h1输出:h2和HDFS。
h2输出:落地到本地文件系统。
一、步骤:
- 1.分别编辑h1,h2配置文件
- 2.分别启动两台机器的flume
- 3.追加内容到被监听文件
- 4.查看HDFS目录和h2上文件目录
二、过程记录
1.h1配置文件
# 给agent的三个组件命名
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/vagrant/testdir/flumeTestData
a1.sources.r1.channels = c1 c2
# 配置flow1的channel和sink
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h2
a1.sinks.k1.port = 4141
a1.sinks.k1.channel = c1
# 配置flow2的channel和sink
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://h1:9000/flumeData
a1.sinks.k2.channel = c2
2.h2配置文件
# 给agent的三个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# 配置sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/vagrant/testdir/flumelog
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source、sink与channel之间的绑定连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.启动flume
h1和h2命令:flume-ng agent --conf conf --conf-file conf/example3.conf --name a1 -Dflume.root.logger=INFO,console
4.追加内容到被监听文件
[root@h1 testdir]# echo "123" >> flumeTestData
[root@h1 testdir]# echo "123456" >> flumeTestData
5.查看HDFS和h2文件目录变化
![](https://img.haomeiwen.com/i3023966/c2821793e418d019.png)
![](https://img.haomeiwen.com/i3023966/c4633802116456bc.png)
实例四:
flume收集数据发送到kafka集群
一、步骤:
- 1.编辑h1配置文件
- 2.分别启动h1的flume,启动h1和h2、h3组成的kafka集群,启动消费者
- 3.追加内容到被监听文件
- 4.观察消费者接收的数据
二、过程记录
1.h1配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/vagrant/testdir/flumeTestData
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first-topic
a1.sinks.k1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sinks.k1.channel = c1
2.启动h1的flume
flume-ng agent --conf conf --conf-file conf/kafka.conf --name a1 -Dflume.root.logger=INFO,console
3.追加数据到被监听文件,查看消费者
![](https://img.haomeiwen.com/i3023966/034458cb74b96670.png)
网友评论