使用flume读取linux的文本文件,将数据发送到kafka
安装好zk集群,安装好kafka集群,安装好flume
-
在 $FLUME_HOME/conf下新建flume2kafka.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/log/test.log # Describe the sink a1.sinks.k1.channel = c1 # 下沉到kafka中 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 设置topic a1.sinks.k1.topic = mytopic # broker地址和端口,至少给一个 a1.sinks.k1.brokerList = mini1:9092 # 缓存消息达到20再发送 a1.sinks.k1.batchSize = 20 # 当leader接收到消息之后发送ack a1.sinks.k1.acks = 1 a1.sinks.k1.linger.ms = 1 a1.sinks.k1.compression.type = snappy # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
新建测试文件
touch /home/hadoop/log/test.log
-
新建脚本testlog.sh
#!/bin/bash while true do echo $(date) >> /home/hadoop/log/test.log sleep 1 done
-
启动Zookeeper集群
zkServer.sh start
-
启动Kafka集群
kafka-server-start.sh $KAFKA_HOME/config/server.properties
-
创建一个topic
kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 1 --topic mytopic
-
启动消费者
kafka-console-consumer.sh --zookeeper mini1:2181 --topic mytopic
-
启动flume
flume-ng agent -n a1 -c conf -f conf/flume2kafka.conf -Dflume.root.logger=INFO,console
-
启动testlog.sh脚本
sh testlog.sh
在kafka消费端应该会不停的输出时间. 如图
image.png
网友评论