美文网首页
flume实践

flume实践

作者: 狼牙战士 | 来源:发表于2018-07-10 09:59 被阅读0次

Flume实践

实例一:

单机,监控指定端口,输出到控制台

一、步骤:

  • 1.编辑配置文件
  • 2.启动flume
  • 3.登录指定主机,指定端口,发送数据
  • 4.查看控制台输出

二、过程记录

1.example.conf内容

# 给agent的三个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# source、sink与channel之间的绑定连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.启动flume

命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console

3.登录主机发送数据

image

4.查看控制台输出

image

实例二:

示意图:

image

h1和h2监听指定端口的http请求,将数据发送给h3,h3把数据发送到HDFS

一、步骤:

  • 1.分别编辑h1,h2,h3配置文件
  • 2.分别启动三台机器的flume
  • 3.发送http请求给h1,h2
  • 4.查看HDFS目录

二、过程记录

1.h1和h2配置文件

# 给agent的三个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = http
a1.sources.r1.port = 8888

# 配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h3
a1.sinks.k1.port = 4141

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# source、sink与channel之间的绑定连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.h3配置文件

# agent的三个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://h1:9000/flumeData

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# source、sink与channel之间的绑定连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.分别启动flume

h3命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console

h1和h2命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console

4.发送http请求给h1,h2

image
image

5.查看HDFS目录文件内容

image

实例三:

示例图:

image

h1数据源:监听指定文件内容的变化。
h1输出:h2和HDFS。
h2输出:落地到本地文件系统。

一、步骤:

  • 1.分别编辑h1,h2配置文件
  • 2.分别启动两台机器的flume
  • 3.追加内容到被监听文件
  • 4.查看HDFS目录和h2上文件目录

二、过程记录

1.h1配置文件

# 给agent的三个组件命名
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/vagrant/testdir/flumeTestData
a1.sources.r1.channels = c1 c2

# 配置flow1的channel和sink
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h2
a1.sinks.k1.port = 4141
a1.sinks.k1.channel = c1


# 配置flow2的channel和sink
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://h1:9000/flumeData
a1.sinks.k2.channel = c2

2.h2配置文件

# 给agent的三个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

# 配置sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/vagrant/testdir/flumelog

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# source、sink与channel之间的绑定连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.启动flume

h1和h2命令:flume-ng agent --conf conf --conf-file conf/example3.conf --name a1 -Dflume.root.logger=INFO,console

4.追加内容到被监听文件

[root@h1 testdir]# echo "123" >> flumeTestData
[root@h1 testdir]# echo "123456" >> flumeTestData

5.查看HDFS和h2文件目录变化

image image

实例四:

flume收集数据发送到kafka集群

一、步骤:

  • 1.编辑h1配置文件
  • 2.分别启动h1的flume,启动h1和h2、h3组成的kafka集群,启动消费者
  • 3.追加内容到被监听文件
  • 4.观察消费者接收的数据

二、过程记录

1.h1配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/vagrant/testdir/flumeTestData
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first-topic
a1.sinks.k1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sinks.k1.channel = c1

2.启动h1的flume

flume-ng agent --conf conf --conf-file conf/kafka.conf --name a1 -Dflume.root.logger=INFO,console

3.追加数据到被监听文件,查看消费者

image

相关文章

  • flume实践

    Flume实践 实例一: 单机,监控指定端口,输出到控制台 一、步骤: 1.编辑配置文件 2.启动flume 3....

  • Flume实践

    Flume简介 flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume...

  • Flume架构与实践

    Flume架构与实践 Flume是一款在线数据采集的系统,典型的应用场景是作为数据的总线,在线的进行日志的采集、分...

  • Flume01

    Flume架构组成 Flume 负载均衡 Flume Agent内部原理 启动 Flume 监听

  • Flume

    总结 一、Flume的定义 1、flume的优势 2、flume的组成 3、flume的架构 二、 flume部署...

  • 玩转大数据计算之Flume

    Flume版本:我们使用Flume最新的版本:Flume NG 1.7.0 Flume架构Flume是一个分布式的...

  • Flume 入门

    一:Flume是什么: 二:特点: 三:Flume版本介绍 四:Flume NG基本架构 五:Flume NG核心...

  • flume的部署和测试

    1 flume 安装 flume下载:http://flume.apache.org/download.htmlf...

  • 数据采集工具Flume实践

    Flume简介 Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送 方, 用于...

  • 091-BigData-19Flume与Flume之间数据传递

    上一篇:090-BigData-18Flume Flume与Flume之间数据传递 一、单Flume多Channe...

网友评论

      本文标题:flume实践

      本文链接:https://www.haomeiwen.com/subject/uqqfpftx.html