flume+kafka

作者: Mervyn_2014 | 来源:发表于2017-05-19 17:41 被阅读41次
flume 使用 kafka作为 sink ,exec 执行命令作为source
1.1 安装kafka和flume

安装flume
安装kafka

1.2 配置文件

在 flume home 目录下的conf 创建 kafka.properties
命令: vim /opt/data/apache-flume-1.7.0-bin/conf/kafka.properties

agent.sources = s1                                                                                                                  
agent.channels = c1                                                                                                                 
agent.sinks = k1                                                                                                                    
agent.sources.s1.type=exec               
#设置执行命令                                                                                          
agent.sources.s1.command=tail -F /opt/data/kafka.log                                                                               
agent.sources.s1.channels=c1                                                                                                        
agent.channels.c1.type=memory                                                                                                       
agent.channels.c1.capacity=10000                                                                                                    
agent.channels.c1.transactionCapacity=100                                                                                           
#设置Kafka接收器                                                                                                                    
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink                                                                 #设置Kafka的broker地址和端口号                                                                                                      
agent.sinks.k1.brokerList=localhost:9092                                                                                              
#设置Kafka的Topic                                                                                                                   
agent.sinks.k1.topic=test                                                                                                      
#设置序列化方式                                                                                                                     
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder 
agent.sinks.k1.channel = c1
1.3 编写shell 脚本

命令:vim kafka.sh

 for((i=0;i<=1000;i++));
do echo "kafka-"+$i>>/opt/data/kafka.log;
done

创建 /opt/data/kafka.log 文件

1.3 启动服务
后台启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
后台启动kafka
bin/kafka-server-start.sh -daemon config/server.properties &
启动消费者
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
1.4 执行sh脚本,

进入脚本目录 ./kafka.sh,查看消费者

相关文章

网友评论

    本文标题:flume+kafka

    本文链接:https://www.haomeiwen.com/subject/clwyxxtx.html