美文网首页
Flume 示例

Flume 示例

作者: 歌哥居士 | 来源:发表于2019-03-29 16:08 被阅读0次

示例1 指定端口采集数据输出到控制台

# 命名组件
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# 设置sources
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = host000
agent1.sources.source1.port = 44444

# 设置channel
agent1.channels.channel1.type = memory

# 设置sink
agent1.sinks.sink1.type = logger

# 绑定sources和sink到channel
# source可以绑定多个channel
agent1.sources.source1.channels = channel1
# sink只能绑定一个channel
agent1.sinks.sink1.channel = channel1

运行:# --conf 指定flume配置文件文件夹,--conf-file 指定我们自定义flume设置。

$ flume-ng agent \
 --name agent1 \
 --conf $FLUME_HOME/conf \
 --conf-file /home/user000/confs/flume_conf/example1.conf \
 -Dflume.root.logger=INFO,console

测试

$ telnet host000 44444
hello
baozi

再看看flume输出:Event是Flume处理数据的基础单元

2018-10-27 10:01:44,074 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }
2018-10-27 10:01:45,126 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 0D                               baozi. }

示例2 监控文件新增数据到控制台

$ vim example2.conf

agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F ~/data/data.log
agent1.sources.source1.shell = /bin/sh -c

agent1.channels.channel1.type = memory

agent1.sinks.sink1.type = logger

agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

运行

$ flume-ng agent \
 --name agent1 \
 --conf $FLUME_HOME/conf \
 --conf-file /home/user000/confs/flume_conf/example2.conf \
 -Dflume.root.logger=INFO,console

测试

$ echo hello >> ~/data/data.log
$ echo baozi >> ~/data/data.log
2018-10-27 10:24:40,884 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
2018-10-27 10:24:52,548 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69                                  baozi }

示例3 从一台服务器到另一台

$ vim exec-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.channels = memory-channel
exec-memory-avro.sinks = avro-sink

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F ~/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = host000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

$ vim avro-memory-logger.conf

avro-memory-logger.sources = avro-source
avro-memory-logger.channels = memory-channel
avro-memory-logger.sinks = logger-sink

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = host000
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel

运行:先启动 avro-memory-logger.conf再启动exec-memory-avro.conf

$ flume-ng agent \
--name avro-memory-logger \
--conf $FLUME_HOME/conf \
 --conf-file  /home/user000/confs/flume_conf/avro-memory-logger.conf  \
-Dflume.root.logger=INFO,console

$ flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
 --conf-file  /home/user000/confs/flume_conf/exec-memory-avro.conf  \
-Dflume.root.logger=INFO,console

测试:之前写到data.log里的数据也一并被打印出来

echo hello2 >> ~/data/data.log
echo baozi2 >> ~/data/data.log

2018-10-27 11:14:25,841 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
2018-10-27 11:14:25,841 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69                                  baozi }
2018-10-27 11:15:06,146 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 32                               hello2 }
2018-10-27 11:15:10,148 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 32                               baozi2 }

示例4 Flume 与Kafka整合示例

Flume相当于生产者,发送数据给Kafka,Kafka启动消费者。
Kafka的各项配置在Flume不同版本中可能不一致。
$ vim exec-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.channels = memory-channel
exec-memory-avro.sinks = avro-sink

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F ~/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = host000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

$ vim avro-memory-kafka.conf

avro-memory-kafka.sources = avro-source
avro-memory-kafka.channels = memory-channel
avro-memory-kafka.sinks = kafka-sink

avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = host000
avro-memory-kafka.sources.avro-source.port = 44444

avro-memory-kafka.channels.memory-channel.type = memory

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = host000:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_test
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

运行

$ flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf \
 --conf-file  /home/user000/confs/flume_conf/avro-memory-kafka.conf  \
-Dflume.root.logger=INFO,console

$ flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
 --conf-file  /home/user000/confs/flume_conf/exec-memory-avro.conf  \
-Dflume.root.logger=INFO,console

$ kafka-console-consumer.sh --zookeeper host000:2181 --topic hello_test --from-beginning

测试

$ echo aaa >> ~/data/data.log
$ echo bbb >> ~/data/data.log

相关文章

  • Flume 示例

    示例1 指定端口采集数据输出到控制台 运行:# --conf 指定flume配置文件文件夹,--conf-file...

  • flume的Agent应用示例

    环境准备 安装rpm文件 重启xineted 解压hadoop-2.5.0-cdh5.3.6-src.tar.gz...

  • Flume01

    Flume架构组成 Flume 负载均衡 Flume Agent内部原理 启动 Flume 监听

  • Flume

    总结 一、Flume的定义 1、flume的优势 2、flume的组成 3、flume的架构 二、 flume部署...

  • 玩转大数据计算之Flume

    Flume版本:我们使用Flume最新的版本:Flume NG 1.7.0 Flume架构Flume是一个分布式的...

  • Flume 入门

    一:Flume是什么: 二:特点: 三:Flume版本介绍 四:Flume NG基本架构 五:Flume NG核心...

  • flume的部署和测试

    1 flume 安装 flume下载:http://flume.apache.org/download.htmlf...

  • 091-BigData-19Flume与Flume之间数据传递

    上一篇:090-BigData-18Flume Flume与Flume之间数据传递 一、单Flume多Channe...

  • java大数据之flume

    一、Flume简介 1.1 Flume的位置 1.2 Flume是什么 (1)Flume提供一种分布式的,可靠地,...

  • Flume(一)概述

    Flume图标 Flume图标 Flume定义 Apache Flume是一个分布式,可靠且可用的系统,用于有效地...

网友评论

      本文标题:Flume 示例

      本文链接:https://www.haomeiwen.com/subject/taatvqtx.html