美文网首页
Kafka组件部署-单节点-详细文档

Kafka组件部署-单节点-详细文档

作者: CoderInsight | 来源:发表于2022-08-11 10:42 被阅读0次

1.下载指定版本的安装包

下载目录

或者直接将下载完成的安装包上传到我们家目录下

★★★★★注意此时分多个版本:

  • 1,测试的安装包必须是这个版本的(“kafka_2.11-0.10.2.0.tgz ”),

    • 2.11是针对Scala 2.11版本
    • 0.10.2.0是针对kafka本身的版本
  • 2,测试版本是 kafka_2.11-2.4.1.tgz

    • 2.11是针对Scala 2.11的版本
    • 2.4.1 是针对kafka版本
  • 所以对其进行了不同的测试

    • 主机器(更改了主机名、hosts映射)
    • 测试机器1:2.11-0.10.2.0 版本
    • 测试机器2:2.11-2.4.1 版本

2.解压

tar -zxvf kafka_2.11-0.10.2.0.tgz -C /usr/local/
cd /usr/local
mv kafka_2.11-0.10.2.0/ kafka

# 我们在安装之前的时候可以创建一个新的账户(非必选)
# sudo chown -R hadoop ./kafka

3.测试

完全可以配置环境变量之后可以在任意目录下执行相应的代码,这里在测试的时候没有配置环境变量,所以一直需要先进去当前kafka的主目录下从而进行相应的操作。

(1).启动默认的 Zookeeper

这里为了方便,直接使用的是默认的zookeeper启动的,当然也可通过配置单机的 zookeeper来实现以下操作。

# 新建一个终端窗口
# 0.1,进入kafka所在的目录
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

(2.1),启动默认的 Kafka

# 0.2,新建一个终端窗口
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

(2.2),也可以修改Kafka的配置文件server.properties

# 确保每个机器上的id不一样
broker.id=0
# 配置服务端的监控地址
listeners=PLAINTEXT://192.168.51.128:9092
# kafka 日志目录
log.dirs=/data/servers/kafka_2.11-2.4.0/logs
# kafka设置的partitons的个数
num.partitions=1
# zookeeper的连接地址, 如果有自己的zookeeper集群, 请直接使用自己搭建的zookeeper集群,多个节点之间使用“,”隔开
zookeeper.connect=192.168.51.128:2181

(3).创建topic

kafka-topics.sh :基本脚本命令

命令参数详解:

--list :查看topic

--create : 创建Topic

--topic <String: topic> :指定 Topic(主题) 名,

--partitions :指定该Topic分区的个数,

--replication-factor :指定每个Partition的备份数

--zookeeper <String: host1:port,host2:port>:指定 Zookeeper 的连接地址参数提示可能并不赞成这样使用,那么就可以直接指定Kafka的连接地址(DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.)

--bootstrap-server<String: host1:port,host2:port> :指定kafka的连接信息,采用Direct方式的时候推荐使用这个。

# 0.3,查看当前集群中的topic,在创建之前和之后都可以进行查看,根据自己的实际需要进行相应的修改
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 新建一个终端窗口
# 1.1,主机器(也是就我最近常用的机器,主机名和hosts映射都重新做的)
# 创建一个名字为demo01的Topic,只有1个分区、每个Partition只有1个备份
# 这里是使用的Zookeeper作为连接
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic demo01
# 2.1,测试机器1(也就 第一个版本的测试,第二个版本的测试也是一样的)
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo01
# 创建 topic  test1
# 存在的价值:1.在指定连接信息的时候可以采用直连(Direct)的方式,也就是说使用配置的Kafka的地址和端口;2.指定集群中的多个地址的时候可以使用","进行隔开;3.这里使用的ip地址指定的,也可以使用主机名的方式指定,但是前提要配置hosts映射
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1

(4).创建Producer

kafka-console-producer.sh 基本脚本命令

命令参数详解:

--broker-list <String: broker-list> :指定kafka的连接信息,集群中多个连接信息之间使用","隔开

--topic <String: topic> :指定Topic

--timeout <Integer: timeout_ms> :设置超时时间

--sync :异步发送信息

# 1.2,常用机器
# 在当前topic窗口执行以下命令来创建Producer
bin/kafka-console-producer.sh --broker-list master:9092 --topic demo01
# 此时可以在命令下边输入消息内容,每一行为一条消息,此时可以将消息推送到kafka服务器,以在consumer中进行查看
# 2.2,测试机器1和2
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo01
# 并在当前窗口运行成功后输入以下信息
hello hadoop
hello wyq

(5).创建Consumer

kafka-console-consumer.sh :基本脚本命令

命令参数详解:

--bootstrap-server :指定kafka的连接信息,集群中多台机器采用","进行分割

--topic <String: topic> :指定消费的Topic

--from-beginning: 是用来指定消费信息是从头开始的。

--group <String: consumer group id>:指定消费者组

# 新建一个终端窗口
# 1.3.1,常用机器
# 创建消费者,查看消费情况(此时为方法1),此时是通过间接访问 zookeeper的2181端口消费数据
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic demo01 --from-beginning

# 1.3.2,也可以用方法2(说明: 因为此时的版本是旧的,会提示--zookeeper将会被取代,然后我们就可以使用--bootstrap-server去实现,但是指定的连接端口(此时是通过kafka的端口9092消费数据)不一样,而是直接指定连接Producer的端口)
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic demo01 --new-consumer --from-beginning
# 测试机器1(也就是 2.11-0.10.2.0 版本)
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo01 --from-beginning
# 此时可以看到我们刚刚生产的三条消息,说明kafka安装成功
# 测试机器2(也就是 2.11-2.4.1 版本)
# 注意此时版本中将之前的"--new-consumer"参数取消了,所以可以直接去掉
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo01  --from-beginning

4.关闭所有程序的顺序

原则上是:按照启动顺序倒着关!!!

  • 1.关闭 Consumer
  • 2.关闭 Producer
  • 3.关闭 Kafka
  • 4.关闭 zookeeper

5.附加操作

(1).后台常驻的方式启动kafka,带上参数 -daemon

(2).停止kafka

/usr/local/kafka/bin/kafka-server-stop.sh   /usr/local/kafka/bin/config/server.properties

(3),查询Topic

1).查看指定的Topic的详细信息

--describe 配合 --topic 参数查看指定Topic的详细信息,同时要指定 --zookeeper 的连接信息

/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 

2).查看 topic 指定分区 offset 的最大值或最小值

time 为 -1 时表示最大值,为 -2 时表示最小值:

/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 127.0.0.1:9092 --partitions 0 

(4),增加Topic的Partition数量

/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5 

(5),删除Topic

--delete 配合--topic参数指定要删除的Topic,同时要指定 --zookeeper 的连接信息

如果要实现真正意义上的删除,必选将配置文件中 delete.topic.enable=true 这个配置项进行更改;因为会提示以下的信息

Topic demo02 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# 1.需要事先将config/server.properties
delete.topic.enable=true
# 2.执行删除操作
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

(6).消费信息

1).指定从头开始

--from-beginning关键字的使用!

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2),从尾部获取数据,此时必须指定分区

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0

3).指定获取的个数的信息

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0 --max-messages 1 

相关文章

网友评论

      本文标题:Kafka组件部署-单节点-详细文档

      本文链接:https://www.haomeiwen.com/subject/zjjywrtx.html