[TOC]
基础概念
主题(Topic)与分区(Partition)
Kafka中的消息以主题为单位进行归类,主题是一个逻辑上的概念,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。
主题以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。
偏移量(offset)是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性
Kafka保证的是分区有序而不是主题有序。
Kafka中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个broker
多副本(Replica)机制
增加副本数量可以提升容灾能力
同一分区的不同副本中保存的是相同的消息
副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步
leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务
分区中的所有副本统称为AR(Assigned Replicas)。
所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas)
与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),
由此可见,AR=ISR+OSR。
在正常情况下, AR=ISR,OSR集合为空。
只有在ISR集合中的副本才有资格被选举为新的leader
HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset
分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。
安装Kafka
一个典型的 Kafka 体系架构包括若干 Producer、若干Broker、若干 Consumer
ZooKeeper中共有3个角色:leader、follower和observer
安装 Zookeeper
-
下载Zookeepr
注意不要下载源码版本
https://zookeeper.apache.org/releases.html
图片.png
-
配置环境变量
export ZOOKEEPER_HOME=/Users/gy/MyMacDocuments/apache-zookeeper-3.6.2(zookeeper路径) export PATH="$PATH:$ZOOKEEPER_HOME/bin"
-
修改配置文件
cd $ZOOKEEPER_HOME/conf cp zoo_sample.cfg zoo.cfg
修改 zoo.cfg 如下:
# The number of milliseconds of each tick # Zookeeper服务器心跳时间,单位为ms tickTime=2000 # The number of ticks that the initial # synchronization phase can take # 投票选举新leader的初始化时间 initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement # leader与follower心跳检测最大容忍时间,响应超过 syncLimit * tickTime, # leader认为follower死掉,从服务器列表中删除follower syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. # 数据目录 dataDir=/tmp/zookeeper/data # 日志目录 dataLogDir=/tmp/zookeeper/log # the port at which the clients will connect # Zookeeper对外服务端口 clientPort=2181
-
创建数据和日志目录
mkdir -p /tmp/zookeeper/data mkdir -p /tmp/zookeeper/log
-
在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。
-
启动Zookeepr
zkServer.sh start // 启动 zkServer.sh status // 状态 zkServer.sh stop //停止
-
集群模式配置(由于我没有三台机器就算了)
在这3台机器的/etc/hosts文件中添加3台集群的IP地址与机器域名的映射,如
192.168.0.2 node1 192.168.0.3 node2 192.168.0.4 node3
然后在这3台机器的zoo.cfg文件中添加以下配置:
```sh
server.0=192.168.0.2:2888:3888
server.1=192.168.0.3:2888:3888
server.2=192.168.0.4:2888:3888
```
server.A=B:C:D
A: 服务器的编号,myid里面的值
B: 服务器ip地址
C: 服务器与集群中的leader服务器交换信息的端口
D: 表示选举时服务器相互通信的端口
在这3台机器上各自执行zkServer.sh start命令来启动服务
安装Kafka
-
下载Kafka
archive.apache.org/dist/kafka
图片.png
-
解压并修改配置文件
$KAFKA_HOME/conf/server.properties
# The id of the broker. This must be set to a unique integer for each broker. # broker 编号,如果 集群 中有多个broker,则每个broker编号要设置不同 broker.id=0 # broker 对外提供的服务入口地址 listeners=PLAINTEXT://:9092 # A comma separated list of directories under which to store log files # 存放消息日志文件地址 log.dirs=/tmp/kafka-logs # Kafka 所需的 Zookeeper 集群地址 zookeeper.connect=localhost:2181/kafka
-
启动Kafka
在$KAFKA_HOME目录下执行
./bin/kafka-server-start.sh ./config/server.properties //启动 ./bin/kafka-server-start.sh -daemon ./config/server.properties //后台启动 /kafka-server-start.sh ./config/server.properties & //后台启动
-
查看Kafka服务进程是否已经启动
jps -l
![](https://img.haomeiwen.com/i23753262/ab3fd0478eb9ffd9.png)
Kafka中一些重要的服务端参数
参数 | 介绍 |
---|---|
zookeeper.connect | 该参数指明broker要连接的ZooKeeper集群的服务地址(包含端口号),必填,如果集群中有多个节点,则用逗号分开 多个节点:这种情况可以使用chroot 路径???(啥意思,之后再了解) 如果不指定chroot,那么默认使用ZooKeeper的根路径 |
listeners | 该参数指明broker监听客户端连接的地址列表,逗号分隔,默认值为null 格式为 protocol://hostname:port ,Kafka当前支持的协议类型有PLAINTEXT、SSL、SASL_SSL等,如果未开启安全认证,则使用简单的PLAINTEXT即可 不指定主机名,则表示绑定默认网卡 如果主机名是0.0.0.0,则表示绑定所有的网卡 |
advertised.listeners | 作用和listeners类似,默认值也为 null 主要用于IaaS(Infrastructure as a Service)环境 使用场景:多块网卡,有公网,有私网,可以设置advertised.listeners参数绑定公网IP供外部客户端使用,而配置listeners参数来绑定私网IP地址供broker间通信使用。 |
broker.id | 用来指定Kafka集群中broker的唯一标识,默认值为-1。如果没有设置,那么Kafka会自动生成一个。 |
log.dir和log.dirs | Kafka 日志文件存放的根目录 log.dirs用来配置多个根目录(以逗号分隔) log.dirs 的优先级比 log.dir 高 |
message.max.bytes | 指定broker所能接收消息的最大值,默认值为1000012(B) Producer 发送的消息大于这个参数所设置的值,那么(Producer)就会报出RecordTooLargeException的异常 |
生产者与消费者
生产者:将消息发布到Kafka主题的分区中
消费者:订阅主题从而消费消息
脚本使用
主题相关脚本 : $KAFKA_HOME/bin/kafka-topics.sh
// 创建一个主题
// --zookeeper 指定了Kafka所连接的Zookeeper服务地址
// --create 创建主题的动作
// --topic 创建的主题的名称
// --replication-factor 副本因子 【用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。】
// --partitions 分区个数
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4
// --describe 展示更多主题的具体信息
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-demo
生产者相关脚本: $KAFKA_HOME/bin/kafka-console-producer.sh
// 发布消息
// --broker-list 连接Kafka集群的地址
// --topic 主题
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
消费者相关脚本: $KAFKA_HOME/bin/kafka-console-consumer.sh
// 订阅相关topic
// --bootstrap-server 连接的Kafka集群的地址
// --topic 指定主题
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
Java中使用
-
依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
-
生产者
/** * 生产者 * * @author Jenson */ public class ProducerFastStart { private static final String BROKER_LIST = "localhost:9092"; private static final String TOPIC = "topic-demo"; public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("bootstrap.servers", BROKER_LIST); // 配置生产者客户端参数,并创建生产者实例 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); // 构造所需要发送的消息 ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "hello kafka!"); // 发送消息 producer.send(record); // 关闭生产者客户端 producer.close(); } }
-
消费者
/** * 消费者 * * @author Jenson */ public class ConsumerFastStart { private static final String BROKER_LIST = "localhost:9092"; private static final String TOPIC = "topic-demo"; private static final String GROUP_ID = "group.demo"; public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("bootstrap.servers", BROKER_LIST); // 设置消费组名称 properties.put("group.id", GROUP_ID); // 创建一个消费者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 订阅主题 consumer.subscribe(Collections.singletonList(TOPIC)); // 循环消费消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } } }
遇到的报错
java 使用kafka-clients时报错
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
解决
<!--加入依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
网友评论