美文网首页
flume使用kafka作为sink

flume使用kafka作为sink

作者: nummycode | 来源:发表于2017-05-24 15:04 被阅读1393次

启动kafka

先启动zookeeper:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

然后启动kafka:

bin/kafka-server-start.sh -daemon config/server.properties

创建topic

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看创建的topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

配置flume

创建配置文件kafak.conf

a1.sources=r1
a1.channels=c1
a1.sinks=k1


a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume

flume-ng agent -n a1 -c conf/ -f conf/kafka.conf -Dflume.root.logger=INFO, console

发送消息

telnet localhost 44444

随意输入几个字符串,然后再消费者页面将看到传过来的数据

相关文章

网友评论

      本文标题:flume使用kafka作为sink

      本文链接:https://www.haomeiwen.com/subject/kucoxxtx.html