在三台服务器下分别安装好 kafka
一、配置一个 ZooKeeper 集群
修改三个服务器下的 config/zookeeper.properties 为:
dataDir=/Users/desire/Library/kafka_2.11/data
clientPort=2383
initLimit=5
syncLimit=2
server.0=192.168.165.205:3434:3535
server.1=192.168.171.50:3434:3535
server.2=192.168.171.51:3434:3535
然后再 dataDir 目录下写一个 myid 文件,内容为当前服务器的 id
二、配置 broker 集群
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://192.168.165.205:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.165.205:2383,192.168.171.50:2383,192.168.171.51:2383
zookeeper.connection.timeout.ms=10000
group.initial.rebalance.delay.ms=0
需要修改的地方为
- broker.id
- listeners
- log.dirs
- zookeeper.connect
第一个服务器
broker.id=0
listeners=PLAINTEXT://192.168.165.205:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=192.168.165.205:2383,192.168.171.50:2383,192.168.171.51:2383
第二个服务器
broker.id=1
listeners=PLAINTEXT://192.168.171.50:9092
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.165.205:2383,192.168.171.50:2383,192.168.171.51:2383
第三个服务器
broker.id=2
listeners=PLAINTEXT://192.168.171.51:9092
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=192.168.165.205:2383,192.168.171.50:2383,192.168.171.51:2383
三、生产者配置文件修改
bootstrap.servers=192.168.165.205:9092,192.168.171.50:9092,192.168.171.51:9092
四、消费者配置文件修改
zookeeper.connect=192.168.165.205:2383,192.168.171.50:2383,192.168.171.51:2383
zookeeper.connection.timeout.ms=10000
五、运行服务
分别开启三个服务器的 ZooKeeper 服务
./zookeeper-server-start.sh ../config/
zookeeper.properties
分别开启三个服务器的 kafka 服务
./kafka-server-start.sh ../config/server.properties
六、生产者发送消息
-
创建一个 topic
./kafka-topics.sh --create --zookeeper 192.168.165.205:2181 --replication-factor 3 --partitions 1 --topic test-replicated-topic
-
生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.165.205:9092 --topic test-replicated-topic > hello,kafka! > bye,kafka!
七、消费者消费消息
-
第一台服务器
./kafka-console-consumer.sh --zookeeper 192.168.165.205:2181 --from-beginning --topic test-replicated-topic
-
第二胎服务器
./kafka-console-consumer.sh --zookeeper 192.168.171.50:2181 --from-beginning --topic test-replicated-topic
-
第三台服务器
./kafka-console-consumer.sh --zookeeper 192.168.171.51:2181 --from-beginning --topic test-replicated-topic
网友评论