美文网首页
Flume连接Kafka

Flume连接Kafka

作者: zhouhaolong1 | 来源:发表于2017-09-09 22:59 被阅读270次

使用flume读取linux的文本文件,将数据发送到kafka

安装好zk集群,安装好kafka集群,安装好flume

  1. 在 $FLUME_HOME/conf下新建flume2kafka.conf

     # Name the components on this agent
     a1.sources = r1
     a1.sinks = k1
     a1.channels = c1
     
     # Describe/configure the source
     a1.sources.r1.type = exec
     a1.sources.r1.command = tail -F /home/hadoop/log/test.log
     
     
     # Describe the sink
     a1.sinks.k1.channel = c1
     # 下沉到kafka中
     a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
     # 设置topic
     a1.sinks.k1.topic = mytopic
     # broker地址和端口,至少给一个
     a1.sinks.k1.brokerList = mini1:9092
     # 缓存消息达到20再发送
     a1.sinks.k1.batchSize = 20
     # 当leader接收到消息之后发送ack
     a1.sinks.k1.acks = 1
     a1.sinks.k1.linger.ms = 1
     a1.sinks.k1.compression.type = snappy
     
     # Use a channel which buffers events in memory
     a1.channels.c1.type = memory
     a1.channels.c1.capacity = 1000
     a1.channels.c1.transactionCapacity = 100
     
     # Bind the source and sink to the channel
     a1.sources.r1.channels = c1
     a1.sinks.k1.channel = c1
    
  2. 新建测试文件

     touch /home/hadoop/log/test.log
    
  3. 新建脚本testlog.sh

         #!/bin/bash
         while true
         do
         echo $(date) >> /home/hadoop/log/test.log
         sleep 1
         done
    
  4. 启动Zookeeper集群

     zkServer.sh start
    
  5. 启动Kafka集群

     kafka-server-start.sh $KAFKA_HOME/config/server.properties
    
  6. 创建一个topic

     kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 1 --topic mytopic
    
  7. 启动消费者

     kafka-console-consumer.sh --zookeeper mini1:2181  --topic mytopic
    
  8. 启动flume

     flume-ng agent -n a1 -c conf -f conf/flume2kafka.conf -Dflume.root.logger=INFO,console
    
  9. 启动testlog.sh脚本

     sh testlog.sh
    

在kafka消费端应该会不停的输出时间. 如图

image.png

相关文章

网友评论

      本文标题:Flume连接Kafka

      本文链接:https://www.haomeiwen.com/subject/cioljxtx.html