flume 使用 kafka作为 sink ,exec 执行命令作为source
1.1 安装kafka和flume
1.2 配置文件
在 flume home 目录下的conf 创建 kafka.properties
命令: vim /opt/data/apache-flume-1.7.0-bin/conf/kafka.properties
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type=exec
#设置执行命令
agent.sources.s1.command=tail -F /opt/data/kafka.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
#设置Kafka接收器
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink #设置Kafka的broker地址和端口号
agent.sinks.k1.brokerList=localhost:9092
#设置Kafka的Topic
agent.sinks.k1.topic=test
#设置序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel = c1
1.3 编写shell 脚本
命令:vim kafka.sh
for((i=0;i<=1000;i++));
do echo "kafka-"+$i>>/opt/data/kafka.log;
done
创建 /opt/data/kafka.log 文件
1.3 启动服务
后台启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
后台启动kafka
bin/kafka-server-start.sh -daemon config/server.properties &
启动消费者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
1.4 执行sh脚本,
进入脚本目录 ./kafka.sh,查看消费者
网友评论