启动kafka
先启动zookeeper:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
然后启动kafka:
bin/kafka-server-start.sh -daemon config/server.properties
创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看创建的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
配置flume
创建配置文件kafak.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume
flume-ng agent -n a1 -c conf/ -f conf/kafka.conf -Dflume.root.logger=INFO, console
发送消息
telnet localhost 44444
随意输入几个字符串,然后再消费者页面将看到传过来的数据
网友评论