美文网首页
Flume 示例

Flume 示例

作者: 歌哥居士 | 来源:发表于2019-03-29 16:08 被阅读0次

    示例1 指定端口采集数据输出到控制台

    # 命名组件
    agent1.sources = source1
    agent1.channels = channel1
    agent1.sinks = sink1
    
    # 设置sources
    agent1.sources.source1.type = netcat
    agent1.sources.source1.bind = host000
    agent1.sources.source1.port = 44444
    
    # 设置channel
    agent1.channels.channel1.type = memory
    
    # 设置sink
    agent1.sinks.sink1.type = logger
    
    # 绑定sources和sink到channel
    # source可以绑定多个channel
    agent1.sources.source1.channels = channel1
    # sink只能绑定一个channel
    agent1.sinks.sink1.channel = channel1
    

    运行:# --conf 指定flume配置文件文件夹,--conf-file 指定我们自定义flume设置。

    $ flume-ng agent \
     --name agent1 \
     --conf $FLUME_HOME/conf \
     --conf-file /home/user000/confs/flume_conf/example1.conf \
     -Dflume.root.logger=INFO,console
    

    测试

    $ telnet host000 44444
    hello
    baozi
    

    再看看flume输出:Event是Flume处理数据的基础单元

    2018-10-27 10:01:44,074 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }
    2018-10-27 10:01:45,126 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 0D                               baozi. }
    

    示例2 监控文件新增数据到控制台

    $ vim example2.conf

    agent1.sources = source1
    agent1.channels = channel1
    agent1.sinks = sink1
    
    agent1.sources.source1.type = exec
    agent1.sources.source1.command = tail -F ~/data/data.log
    agent1.sources.source1.shell = /bin/sh -c
    
    agent1.channels.channel1.type = memory
    
    agent1.sinks.sink1.type = logger
    
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1
    

    运行

    $ flume-ng agent \
     --name agent1 \
     --conf $FLUME_HOME/conf \
     --conf-file /home/user000/confs/flume_conf/example2.conf \
     -Dflume.root.logger=INFO,console
    

    测试

    $ echo hello >> ~/data/data.log
    $ echo baozi >> ~/data/data.log
    2018-10-27 10:24:40,884 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
    2018-10-27 10:24:52,548 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69                                  baozi }
    

    示例3 从一台服务器到另一台

    $ vim exec-memory-avro.conf

    exec-memory-avro.sources = exec-source
    exec-memory-avro.channels = memory-channel
    exec-memory-avro.sinks = avro-sink
    
    exec-memory-avro.sources.exec-source.type = exec
    exec-memory-avro.sources.exec-source.command = tail -F ~/data/data.log
    exec-memory-avro.sources.exec-source.shell = /bin/sh -c
    
    exec-memory-avro.channels.memory-channel.type = memory
    
    exec-memory-avro.sinks.avro-sink.type = avro
    exec-memory-avro.sinks.avro-sink.hostname = host000
    exec-memory-avro.sinks.avro-sink.port = 44444
    
    exec-memory-avro.sources.exec-source.channels = memory-channel
    exec-memory-avro.sinks.avro-sink.channel = memory-channel
    

    $ vim avro-memory-logger.conf

    avro-memory-logger.sources = avro-source
    avro-memory-logger.channels = memory-channel
    avro-memory-logger.sinks = logger-sink
    
    avro-memory-logger.sources.avro-source.type = avro
    avro-memory-logger.sources.avro-source.bind = host000
    avro-memory-logger.sources.avro-source.port = 44444
    
    avro-memory-logger.channels.memory-channel.type = memory
    
    avro-memory-logger.sinks.logger-sink.type = logger
    
    avro-memory-logger.sources.avro-source.channels = memory-channel
    avro-memory-logger.sinks.logger-sink.channel = memory-channel
    

    运行:先启动 avro-memory-logger.conf再启动exec-memory-avro.conf

    $ flume-ng agent \
    --name avro-memory-logger \
    --conf $FLUME_HOME/conf \
     --conf-file  /home/user000/confs/flume_conf/avro-memory-logger.conf  \
    -Dflume.root.logger=INFO,console
    
    $ flume-ng agent \
    --name exec-memory-avro \
    --conf $FLUME_HOME/conf \
     --conf-file  /home/user000/confs/flume_conf/exec-memory-avro.conf  \
    -Dflume.root.logger=INFO,console
    

    测试:之前写到data.log里的数据也一并被打印出来

    echo hello2 >> ~/data/data.log
    echo baozi2 >> ~/data/data.log
    
    2018-10-27 11:14:25,841 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
    2018-10-27 11:14:25,841 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69                                  baozi }
    2018-10-27 11:15:06,146 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 32                               hello2 }
    2018-10-27 11:15:10,148 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 32                               baozi2 }
    

    示例4 Flume 与Kafka整合示例

    Flume相当于生产者,发送数据给Kafka,Kafka启动消费者。
    Kafka的各项配置在Flume不同版本中可能不一致。
    $ vim exec-memory-avro.conf

    exec-memory-avro.sources = exec-source
    exec-memory-avro.channels = memory-channel
    exec-memory-avro.sinks = avro-sink
    
    exec-memory-avro.sources.exec-source.type = exec
    exec-memory-avro.sources.exec-source.command = tail -F ~/data/data.log
    exec-memory-avro.sources.exec-source.shell = /bin/sh -c
    
    exec-memory-avro.channels.memory-channel.type = memory
    
    exec-memory-avro.sinks.avro-sink.type = avro
    exec-memory-avro.sinks.avro-sink.hostname = host000
    exec-memory-avro.sinks.avro-sink.port = 44444
    
    exec-memory-avro.sources.exec-source.channels = memory-channel
    exec-memory-avro.sinks.avro-sink.channel = memory-channel
    

    $ vim avro-memory-kafka.conf

    avro-memory-kafka.sources = avro-source
    avro-memory-kafka.channels = memory-channel
    avro-memory-kafka.sinks = kafka-sink
    
    avro-memory-kafka.sources.avro-source.type = avro
    avro-memory-kafka.sources.avro-source.bind = host000
    avro-memory-kafka.sources.avro-source.port = 44444
    
    avro-memory-kafka.channels.memory-channel.type = memory
    
    avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
    avro-memory-kafka.sinks.kafka-sink.brokerList = host000:9092
    avro-memory-kafka.sinks.kafka-sink.topic = hello_test
    avro-memory-kafka.sinks.kafka-sink.batchSize = 5
    avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
    
    avro-memory-kafka.sources.avro-source.channels = memory-channel
    avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
    

    运行

    $ flume-ng agent \
    --name avro-memory-kafka \
    --conf $FLUME_HOME/conf \
     --conf-file  /home/user000/confs/flume_conf/avro-memory-kafka.conf  \
    -Dflume.root.logger=INFO,console
    
    $ flume-ng agent \
    --name exec-memory-avro \
    --conf $FLUME_HOME/conf \
     --conf-file  /home/user000/confs/flume_conf/exec-memory-avro.conf  \
    -Dflume.root.logger=INFO,console
    
    $ kafka-console-consumer.sh --zookeeper host000:2181 --topic hello_test --from-beginning
    

    测试

    $ echo aaa >> ~/data/data.log
    $ echo bbb >> ~/data/data.log
    

    相关文章

      网友评论

          本文标题:Flume 示例

          本文链接:https://www.haomeiwen.com/subject/taatvqtx.html