Kafka(四)集群之kafka

作者: 我犟不过你 | 来源:发表于2021-02-02 15:09 被阅读0次

在章节二(https://www.jianshu.com/p/d9fefdf2db85)中,我们部署了单机的kafka,现在我们部署一套集群模式的kafka。

这里我准备了三台虚拟机:
192.168.184.134
192.168.184.135
192.168.184.136
每台机器部署一个zk和kafka。

上一章节中zk集群已经部署完毕。

一、上传tar包到每台服务器

在章节二中,134这台机器已经有kafka存在了,我们在另外两台机器上安装kafka:

[root@bogon ~]# cd /opt/
[root@bogon opt]# tar -xvf kafka_2.12-2.7.0.tgz 

二、修改配置文件

[root@bogon opt]# cd kafka_2.12-2.7.0
[root@bogon kafka_2.12-2.7.0]# cd config/
[root@bogon config]# vi server.properties 

在上面的文件中有几个关键点,我们一一进行配置,我会对配置中的说明翻译:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
#broker的id,必须为每个broker设置唯一的正整形
broker.id=0

以下这两个listeners,advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。
在内网中我们使用listenners就可以了,在docker等容器或云中使用advertised。

############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# socket服务监听这个地址。如果不配置将会从java.net.InetAddress.getCanonicalHostName()获取。该方法获取的是当前主机名。
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# broker将主机名通知给生产者和消费者,如果不设置,它将获取listeners的值。否者获取当前主机名
advertised.listeners=PLAINTEXT://192.168.184.134:9092

下面这个是日志路径的配置

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
# 以逗号分隔的,存在日志文件路径
log.dirs=/opt/kafka_2.12-2.7.0/kafka-logs

下面这个是个重点的东西,topic在磁盘上会分为多个partitions存储,相比单一文件存储,增加了并行性,在后续文章中会详细去讲解:

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# topic的默认日志页数。更多的页将会带来更好的并行性,但是会导致文件的数量跨度更大。
num.partitions=10

日志的保存时间:

# The minimum age of a log file to be eligible for deletion due to age
# 日志文件可删除的最小年龄
log.retention.hours=168

以下是zookeeper的配置:

#zk的链接地址,逗号分隔
zookeeper.connect=192.168.184.134:2181,192.168.184.135:2181,192.168.184.136:2181

# 链接zk的超时间,单位是毫秒
zookeeper.connection.timeout.ms=18000

三、启动

这里我们直接设置后台启动,三个节点都是如此:

nohup ./bin/kafka-server-start.sh config/server.properties >/dev/null &

这里面有个小坑,还记得之前我们搭建的单机环境吗?那时候默认的日志文件夹在/tmp/kafka-logs下面,生成了很多内容,导致我们134这个节点无法启动成功,报错如下:

[2021-02-02 14:38:24,741] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID UpEhBVQ6TkW5Wd_VQKDD6g doesn't match stored clusterId Some(FTThHFq_RzWNZKsduj880A) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:252)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
[2021-02-02 14:38:24,744] INFO shutting down (kafka.server.KafkaServer)

解决这个问题只需要把/tmp/kafka-logs文件删除就好了。

看到日志出现这一句表明启动成功了:

[2021-02-02 14:47:45,172] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

四、验证

下面我们验证下是否搭建成功了,首先使用kafkatool工机具连接看下:

kafkatool kafkatool kafkatool

我们在134节点创建一个topic:

[root@bogon kafka_2.12-2.7.0]# bin/kafka-topics.sh --create --zookeeper 192.168.184.134:2181 --replication-factor 1 --partitions 1 --topic test-kafka
Created topic test-kafka.

查看topic列表:

[root@bogon kafka_2.12-2.7.0]# bin/kafka-topics.sh --list --zookeeper 192.168.184.134:2181
test-kafka

在kafkatool中查看:


kafkatool

创建生产者:

[root@bogon kafka_2.12-2.7.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-kafka
>

创建消费者:

[root@bogon kafka_2.12-2.7.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-kafka --from-beginning

生成者发送消息:

[root@bogon kafka_2.12-2.7.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-kafka
>hello kafka
>

消费者接收消息:

[root@bogon kafka_2.12-2.7.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-kafka --from-beginning
hello kafka

到此为止,kafka的集群搭建已经完成了。在后面的文章我们会去学习如何在springboot中集成kafka。

相关文章

  • Kafka相关文集

    Kafka 运维 kafka集群配置 kafka的安装(包括zookeeper) 原理及使用 Kafka之sync...

  • kafka cluster

    一、说明 二、kafka是什么 三、kafka作用 四、kafka结构和术语 五、kafka 集群搭建 5.1、环...

  • Kafka(四)集群之kafka

    在章节二(https://www.jianshu.com/p/d9fefdf2db85[https://www.j...

  • kafka命令行的管理使用

    启动kafka集群 首先要启动好kafka集群1、集群时间同步2、启动zookeeper集群3、启动kafka集群...

  • kafka集群管理工具kafka-manager安装部署

    kafka集群管理工具kafka-manager安装部署 kafka安装 Kafka集群环境需要以下环境支持: J...

  • Zookeeper集群、Kafka集群

    Zookeeper集群、Kafka集群 共三台服务器,分别为:kafka-01、kafka-02、kafka-03...

  • Kafka服务器端

    目录 kafka是如何维护集群成员关系 kafka集群中分区首领是如何选取的 kafka如何进行复制 kafka是...

  • kafka集群维护

    【kafka集群维护】 【kafka集群分区日志迁移】(热部署)迁移topic数据到其他broker,请遵循下面四步:

  • kafka安装

    kafka 集群安装 Kafka常用操作命令

  • spark streaming + kafka +python(

    一、环境部署 hadoop集群2.7.1zookeerper集群kafka集群:kafka_2.11-0.10.0...

网友评论

    本文标题:Kafka(四)集群之kafka

    本文链接:https://www.haomeiwen.com/subject/cwhytltx.html