zookeeper集群
kafka集群
- 下载kafka_2.11-2.0.1.tgz包
- 解压包 tar -zxvf kafka_2.11-2.0.1.tgz
- 修改配置文件/kafka_2.11-2.0.1/config/server.properties
# 设置broker的id,每台机器的broker不同
broker.id=1
# 因为此处使用阿里云,所以需要配置上外网地址
host.name=内网地址
advertised.host.name=外网地址
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
# 设置zookeeper集群连接端口
zookeeper.connect=106.15.95.37:2181,47.99.197.133:2181,106.12.42.149:2181
- 启动kafka
/kafka_2.11-2.0.1/bin/kafka-server-start.sh -daemon /kafka_2.11-2.0.1/config/server.properties
结果:
创建主题
# 创建两个副本,一个分区的主题spy
./kafka-topics.sh --create --zookeeper 106.15.95.37:2181 --replication-factor 2 --partitions 1 --topic spy
查看
[root@izbp1a2dsv8lw7ik396vokz bin]# ./kafka-topics.sh --list --zookeeper 106.15.95.37:2181
spy
创建提供者
./kafka-console-producer.sh --broker-list 106.15.95.37:9092 --topic spy
创建消费者
./kafka-console-consumer.sh --bootstrap-server 106.15.95.37:9092 --topic spy
详细
./kafka-topics.sh --zookeeper 106.15.95.37:2181 --describe
PartitionCount :分区数量
ReplicationFactor:副本数量
Partition:当前分区
Leader:broker.id
Replicas:副本的broker.id
Isr:选举使用的broker.id
删除主题
./kafka-topics.sh --bootstrap-server 106.15.95.37:9092 --delete --topic spy
注意:
如果部署到云服务器,注意修改安全组中配置规则,以及内外网的转换。否则会出现org.apache.kafka.common.errors.TimeoutException异常。
网友评论