Kafka的自我介绍
很多人都认为Kafka是一个消息队列,实际上并不完全对,在官网的标题中使用了一句话来描述Kafka:
A distributed streaming platform(一个分布式的流平台)
所以在本质上来讲,Kafka并不是一个消息队列,它实际上是用来做流处理的一个平台。那么什么是分布式的流平台呢?这就需要简单说明下Kafka诞生的背景:
Kafka早期设计的目的是用作于 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的一个分布式消息发布和订阅系统,起初基于Scala编写(现在是Scala + Java),之后成为 Apache 基金会的一个顶级项目。
活动流数据是所有的网站对用户的使用情况做分析的时候要用到的最常规的部分,活动数据包括页面的访问量(PV)、被查看内容方面的信息以及搜索内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性的对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等)。
实际上在这种大数据的流处理里面,Kafka经常会对接Spark、Flink等实时流计算引擎,这些场景要求流处理平台具有高性能、高吞吐量、低延迟等特点,而 Kafka 具有很好的吞吐量、内置分区、冗余及容错性的优点(Kafka 每秒可以处理几十万消息),因此让 Kafka 成为了一个很好的大规模消息处理应用的解决方案。
由于 Kafka 提供了类似 JMS 的特性,所以很多人对它的第一印象就是消息队列。但是在设计和实现上是完全不同的,而且它也不是 JMS 规范的实现,因此我们需要纠正对Kafka的错误认知。
Zookeeper安装
接下来演示一下Kafka的安装与配置。Kafka是基于Zookeeper来实现分布式协调的,所以在安装Kafka之前需要先安装Zookeeper。Zookeeper和Kafka都依赖于JDK,我这里已经事先安装好了JDK:
[root@txy-server2 ~]# java --version
java 11.0.5 2019-10-15 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.5+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.5+10-LTS, mixed mode)
[root@txy-server2 ~]#
准备好JDK后,到Zookeeper的官网下载地址,复制下载链接:
然后到Linux中使用wget
命令进行下载,如下:
[root@txy-server2 ~]# cd /usr/local/src
[root@txy-server2 /usr/local/src]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
解压下载好的压缩包,并将解压后的目录移动和重命名:
[root@txy-server2 /usr/local/src]# tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
[root@txy-server2 /usr/local/src]# mv apache-zookeeper-3.6.1-bin ../zookeeper
进入到Zookeeper的配置文件目录,将zoo_sample.cfg
这个示例配置文件拷贝一份并命名为zoo.cfg
,这是Zookeeper默认的配置文件名称:
[root@txy-server2 /usr/local/src]# cd ../zookeeper/conf/
[root@txy-server2 /usr/local/zookeeper/conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@txy-server2 /usr/local/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg
修改一下配置文件中的dataDir
配置项,指定一个磁盘空间较大的目录:
[root@txy-server2 /usr/local/zookeeper/conf]# vim zoo.cfg
# 指定Zookeeper的数据存储目录,类比于MySQL的dataDir
dataDir=/data/zookeeper
[root@txy-server2 /usr/local/zookeeper/conf]# mkdir -p /data/zookeeper
- 如果只是学习使用的话,这一步其实可以忽略,采用默认配置即可
接下来就可以进入bin
目录,使用启动脚本来启动Zookeeper了,如下示例:
[root@txy-server2 /usr/local/zookeeper/conf]# cd ../bin/
[root@txy-server2 /usr/local/zookeeper/bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@txy-server2 /usr/local/zookeeper/bin]#
启动完成后,可以通过查看是否正常监听端口号来判断是否启动成功,如下则是启动成功了:
[root@txy-server2 ~]# netstat -lntp |grep 2181
tcp6 0 0 :::2181 :::* LISTEN 7825/java
[root@txy-server2 ~]#
Kafka安装
安装完Zookeeper后,接下来就可以安装Kafka了,同样的套路首先去Kafka的官网下载地址,复制下载链接:
然后到Linux中使用wget
命令进行下载,如下:
[root@txy-server2 ~]# cd /usr/local/src
[root@txy-server2 /usr/local/src]# wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
解压下载好的压缩包,并将解压后的目录移动和重命名:
[root@txy-server2 /usr/local/src]# tar -xvf kafka_2.13-2.5.0.tgz
[root@txy-server2 /usr/local/src]# mv kafka_2.13-2.5.0 ../kafka
进入Kafka的配置文件目录,修改配置文件:
[root@txy-server2 /usr/local/src]# cd ../kafka/config/
[root@txy-server2 /usr/local/kafka/config]# vim server.properties
# 指定监听的地址及端口号,该配置项是指定内网ip
listeners=PLAINTEXT://127.0.0.1:9092
# 如果需要开放外网访问,则在该配置项指定外网ip
advertised.listeners=PLAINTEXT://127.0.0.1:9092
# 指定kafka日志文件的存储目录
log.dirs=/usr/local/kafka/kafka-logs
# 指定zookeeper的连接地址,多个地址用逗号分隔
zookeeper.connect=localhost:2181
[root@txy-server2 /usr/local/kafka/config]# mkdir /usr/local/kafka/kafka-logs
Kafka基本概念及使用演示
基本概念
- Topic:一个虚拟的概念,由一个到多个Partition组成,Topic作为生产者和消费者交互数据的媒介
- Partition:实际的消息存储单位,也就是真正存储消息的地方
- Producer:消息生产者
- Consumer:消息消费者
在完成配置文件的修改后,为了方便使用Kafka的命令脚本,我们可以将Kafka的bin
目录配置到环境变量中:
[root@txy-server2 ~]# vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@txy-server2 ~]# source /etc/profile # 让配置生效
这样就可以使用如下命令启动Kafka了:
[root@txy-server2 ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &
执行以上命令后,启动日志会输出到控制台,可以通过日志判断是否启动成功,也可以通过查看是否监听了9092
端口来判断是否启动成功:
[root@txy-server2 ~]# netstat -lntp |grep 9092
tcp6 0 0 172.21.0.10:9092 :::* LISTEN 31943/java
[root@txy-server2 ~]#
我们可以使用如下命令创建一个Topic:
[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
Created topic test-topic. # 控制台输出这句表示创建完成
...
[root@txy-server2 ~]#
参数说明:
-
--create
:表示创建一个Topic -
--zookeeper
:指定zookeeper的ip及端口号 -
--replication-factor
:指定复制因子 -
--partitions
:指定partition的数量 -
--topic
:指定创建的topic的名称
查看已经创建的Topic信息:
[root@txy-server2 ~]# kafka-topics.sh --list --zookeeper localhost:2181
test-topic
[root@txy-server2 ~]#
向test-topic
生产一些消息:
[root@txy-server2 ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test-topic
>helloworld
>第二条消息
>第三条消息
从test-topic
中消费消息:
[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:2181 --topic test-topic --from-beginning
helloworld
第二条消息
消息
第四条消息
第五条消息
最后,停止Kafka的命令如下:
[root@txy-server2 ~]# kafka-server-stop.sh
网友评论