示例1 指定端口采集数据输出到控制台
# 命名组件
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
# 设置sources
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = host000
agent1.sources.source1.port = 44444
# 设置channel
agent1.channels.channel1.type = memory
# 设置sink
agent1.sinks.sink1.type = logger
# 绑定sources和sink到channel
# source可以绑定多个channel
agent1.sources.source1.channels = channel1
# sink只能绑定一个channel
agent1.sinks.sink1.channel = channel1
运行:# --conf 指定flume配置文件文件夹,--conf-file 指定我们自定义flume设置。
$ flume-ng agent \
--name agent1 \
--conf $FLUME_HOME/conf \
--conf-file /home/user000/confs/flume_conf/example1.conf \
-Dflume.root.logger=INFO,console
测试
$ telnet host000 44444
hello
baozi
再看看flume输出:Event是Flume处理数据的基础单元
2018-10-27 10:01:44,074 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }
2018-10-27 10:01:45,126 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 0D baozi. }
示例2 监控文件新增数据到控制台
$ vim example2.conf
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F ~/data/data.log
agent1.sources.source1.shell = /bin/sh -c
agent1.channels.channel1.type = memory
agent1.sinks.sink1.type = logger
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
运行
$ flume-ng agent \
--name agent1 \
--conf $FLUME_HOME/conf \
--conf-file /home/user000/confs/flume_conf/example2.conf \
-Dflume.root.logger=INFO,console
测试
$ echo hello >> ~/data/data.log
$ echo baozi >> ~/data/data.log
2018-10-27 10:24:40,884 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F hello }
2018-10-27 10:24:52,548 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 baozi }
示例3 从一台服务器到另一台
$ vim exec-memory-avro.conf
exec-memory-avro.sources = exec-source
exec-memory-avro.channels = memory-channel
exec-memory-avro.sinks = avro-sink
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F ~/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = host000
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
$ vim avro-memory-logger.conf
avro-memory-logger.sources = avro-source
avro-memory-logger.channels = memory-channel
avro-memory-logger.sinks = logger-sink
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = host000
avro-memory-logger.sources.avro-source.port = 44444
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel
运行:先启动 avro-memory-logger.conf再启动exec-memory-avro.conf
$ flume-ng agent \
--name avro-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file /home/user000/confs/flume_conf/avro-memory-logger.conf \
-Dflume.root.logger=INFO,console
$ flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file /home/user000/confs/flume_conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
测试:之前写到data.log里的数据也一并被打印出来
echo hello2 >> ~/data/data.log
echo baozi2 >> ~/data/data.log
2018-10-27 11:14:25,841 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F hello }
2018-10-27 11:14:25,841 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 baozi }
2018-10-27 11:15:06,146 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 32 hello2 }
2018-10-27 11:15:10,148 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 32 baozi2 }
示例4 Flume 与Kafka整合示例
Flume相当于生产者,发送数据给Kafka,Kafka启动消费者。
Kafka的各项配置在Flume不同版本中可能不一致。
$ vim exec-memory-avro.conf
exec-memory-avro.sources = exec-source
exec-memory-avro.channels = memory-channel
exec-memory-avro.sinks = avro-sink
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F ~/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = host000
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
$ vim avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source
avro-memory-kafka.channels = memory-channel
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = host000
avro-memory-kafka.sources.avro-source.port = 44444
avro-memory-kafka.channels.memory-channel.type = memory
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = host000:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_test
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
运行
$ flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file /home/user000/confs/flume_conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
$ flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file /home/user000/confs/flume_conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
$ kafka-console-consumer.sh --zookeeper host000:2181 --topic hello_test --from-beginning
测试
$ echo aaa >> ~/data/data.log
$ echo bbb >> ~/data/data.log
网友评论