总览
Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
基础信息分享
需要详细阅读基础 SDK (按 Go Sarama 为例)
- sarama/config.go 其中有各种基础参数的配置,以及控制生产 & 消费者的一些基础参数,还需要都了解下的,特别是 NewConfig() 中的参数
- sarams/utils.go 各种当前支持的 Kafka 版本
- sarams/sarams.go 监控指标相关
- sarams/sync_producer.go Producer
- sarams/consumer_group.go Consumer Group
- examples
基础知识点
首先,我们必须理解,partiton 是 kafka 的并行单元。从 producer 和 broker 的视角看,向不同的 partition 写入是完全并行的;而对于 consumer,并发数完全取决于 partition 的数量,即,如果 consumer 数量大于 partition 数量,则必有 consumer 闲置。所以,我们可以认为 kafka 的吞吐与 partition 时线性关系。partition 的数量要根据吞吐来推断,假定 p 代表生产者写入单个 partition 的最大吞吐,c 代表消费者从单个 partition 消费的最大吞吐,我们的目标吞吐是 t,那么 partition 的数量应该是 t/p 和 t/c 中较大的那一个。实际情况中,p的影响因素有批处理的规模,压缩算法,确认机制和副本数等,通常建议 partition 的数量一定要大于等于消费者的数量来实现最大并发。
- 一个 partition 就是一个存储 kafka-log 的目录。
- 一个 partition 只能寄宿在一个 broker 上。
- 单个 partition 是可以实现消息的顺序写入的。
- 单个 partition 只能被单个消费者进程消费,与该消费者所属于的消费组无关。这样做,有助于实现顺序消费。
- 单个消费者进程可同时消费多个 partition,即 partition 限制了消费端的并发能力。
- partition 越多则 file 和 memory 消耗越大,要在服务器承受服务器设置。
- 每个 partition 信息都存在所有的zk节点中。
- partition 越多则失败选举耗时越长。
- offset 是对每个 partition 而言的,partition 越多,查询 offset 就越耗时。
- partition 的数量是可以动态增加的(只能加不能减)。
常见客户端问题
- 客户端版本必须与 kafka 版本一致,多半发生在非 Java 客户端情况下
- 客户端 Rebalance 情况发生,版本为 0.10 有一定概率触发频繁 Rebalance;消息队列 Kafka 版的 Consumer 没有独立线程维持心跳,而是把心跳维持与 poll 接口耦合在一起。其结果就是,如果用户消费出现卡顿,就会导致 Consumer 心跳超时,引发 rebalance
客户端入门
注意点: 客户端一定要记住客户端版本与kafka 版本对应关系
- Go
- Ruby
- Java
Producer Demo
package main
import (
"fmt"
"time"
"github.com/Shopify/sarama"
)
func main() {
kafkaVersion, err := sarama.ParseKafkaVersion("0.10.2.0") // 返回对应版本号
address := []string{"127.0.0.1:9092","127.0.0.2:9092","127.0.0.3:9092"}
config := sarama.NewConfig()
config.Version = kafkaVersion
/*
const (
// NoResponse doesn't send any response, the TCP ACK is all you get.
NoResponse RequiredAcks = 0
// WaitForLocal waits for only the local commit to succeed before responding.
WaitForLocal RequiredAcks = 1
// WaitForAll waits for all in-sync replicas to commit before responding.
// The minimum number of in-sync replicas is configured on the broker via
// the `min.insync.replicas` configuration key.
WaitForAll RequiredAcks = -1
)
*/
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要 leader 和 follow 都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在 success channel 返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "cloudinfra_test_topic"
now := time.Now().Format("2006-01-02 15:04:05")
msg.Value = sarama.StringEncoder(now +": This is a test log ...")
// 连接 kafka
client, err := sarama.NewSyncProducer(address, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
Consumer Demo
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
var (
wg sync.WaitGroup
)
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
fmt.Println("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
return nil
}
func main() {
kafkaVersion, err := sarama.ParseKafkaVersion("0.10.2.0")
address := []string{"127.0.0.1:9092","127.0.0.2:9092","127.0.0.3:9092"}
config := sarama.NewConfig()
config.Version = kafkaVersion
consumer := Consumer{
ready: make(chan bool),
}
//创建消费者
ctx, cancel := context.WithCancel(context.Background())
consumerGroup, err := sarama.NewConsumerGroup(address, "Cloudinfra-TestConsumerGroup", config)
if err != nil {
fmt.Println("Failed to start consumer group: %s", err)
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := consumerGroup.Consume(ctx, strings.Split("cloudinfra_test_topic", ","), &consumer); err != nil {
fmt.Println("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready
fmt.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
fmt.Println("terminating: context cancelled")
case <-sigterm:
fmt.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = consumerGroup.Close(); err != nil {
fmt.Println("Error closing client: ", err)
}
}
生产者最佳实践
失败重试
分布式环境下,由于网络等原因,偶尔发送失败是常见的。导致这种失败的原因可能是消息已经发送成功,但是 Ack 失败,也有可能是确实没发送成功。
云上很多全托管消息队列 Kafka 是 VIP 网络架构,会主动断开空闲连接(30 秒没活动),因此,不是一直活跃的客户端会经常收到 "connection rest by peer" 错误,建议重试消息发送。
重试参数,您可以根据业务需求,设置以下重试参数:
- retries,重试次数,建议设置为 3。
- retry.backoff.ms,重试间隔,建议设置为 1000。
异步发送
发送接口是异步的,如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。
线程安全
Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,一个应用对应一个 Producer 就足够了。
Acks
- acks=0,表示无需服务端的 Response,性能较高,丢数据风险较大;
- acks=1,服务端主节点写成功即返回 Response,性能中等,丢数据风险中等,主节点宕机可能导致数据丢失;
- acks=all,服务端主节点写成功,且备节点同步成功,才返回 Response,性能较差,数据较为安全,主节点和备节点都宕机才会导致数据丢失。
- 一般建议选择 acks=1,重要的服务可以设置 acks=all。
Batch
Batch 的基本思路是:把消息缓存在内存中,并进行打包发送。Kafka 通过 Batch 来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。 在构建 Producer 时,需要考虑以下两个参数:
- batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)达到这个数值时,就会触发一次网络请求,然后客户端把消息真正发往服务器;
- linger.ms : 每条消息待在缓存中的最长时间。若超过这个时间,就会忽略 batch.size 的限制,然后客户端立即把消息发往服务器。
由此可见,Kafka 客户端什么时候把消息真正发往服务器,是由上面两个参数共同决定的: batch.size 有助于提高吞吐,linger.ms 有助于控制延迟。您可以根据具体业务需求进行调整。
Rebalance 如何避免
- session.timeout.ms:建议设置成 25s,不超过 30s。
- max.poll.records:要小于(最好是远小于)“单个线程每秒消费的条数”x“消费线程的个数”x“session.timeout 的秒数”。
消费者最佳实践
消费消息基本流程
消息队列 Kafka 版订阅者在订阅消息时的基本流程是: 1. 2. 3.
- Poll 数据
- 执行消费逻辑
- 再次 poll 数据
负载均衡
每个 Consumer Group 可以包含多个消费实例,即可以启动多个消息队列 Kafka 版 Consumer,并把参数 group.id 设置成相同的值。属于同一个 Consumer Group 的消费实例会负载消费订阅的 Topic。
举例:Consumer Group A 订阅了 Topic A,并开启三个消费实例 C1、C2、C3,则发送到 Topic A 的每条消息最终只会传给 C1、C2、C3 的某一个。Kafka 默认会均匀地把消息传给各个消息实例,以做到消费负载均衡。
Kafka 负载消费的内部原理是,把订阅的 Topic 的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量,否则会有实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡。
消息队列 Kafka 版的每个 Topic 的分区数量默认是 16 个,已经足够满足大部分场景的需求,且云上服务会根据容量调整分区数。
多个订阅
一个 Consumer Group 可以订阅多个 Topic。 一个 Topic 也可以被多个 Consumer Group 订阅,且各个 Consumer Group 独立消费 Topic 下的所有消息。
举例:Consumer Group A 订阅了 Topic A,Consumer Group B 也订阅了 Topic A,则发送到 Topic A 的每条消息,不仅会传一份给 Consumer Group A 的消费实例,也会传一份给 Consumer Group B 的消费实例,且这两个过程相互独立,相互没有任何影响。
消费位点
每个 Topic 会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点 MaxOffset。
消息队列 Kafka 版 Consumer 会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为 ConsumerOffset。
剩余的未消费的条数(也称为消息堆积量) = MaxOffset - ConsumerOffset
消费位点提交
消息队列 Kafka 版消费者有两个相关参数:
- enable.auto.commit:默认值为 true。
- auto.commit.interval.ms: 默认值为 1000,也即 1s。
这两个参数组合的结果就是,每次 poll 数据前会先检查上次提交位点的时间,如果距离当前时间已经超过参数 auto.commit.interval.ms 规定的时长,则客户端会启动位点提交动作。
因此,如果将enable.auto.commit设置为 true,则需要在每次 poll 数据时,确保前一次 poll 出来的数据已经消费完毕,否则可能导致位点跳跃。
如果想自己控制位点提交,请把 enable.auto.commit 设为 false,并调用 commit(offsets)函数自行控制位点提交。
消费位点重置
以下两种情况,会发生消费位点重置:
- 当服务端不存在曾经提交过的位点时(比如客户端第一次上线)。
- 当从非法位点拉取消息时(比如某个分区最大位点是10,但客户端却从11开始拉取消息)。
Java 客户端可以通过 auto.offset.reset 来配置重置策略,主要有三种策略:
- "latest",从最大位点开始消费。
- "earliest",从最小位点开始消费。
- "none",不做任何操作,也即不重置。
- 推荐业务自带幂等能力。
Kafka 故障转移
- Broker 出现故障,由于 Topic Partition 是单 Master 提供读写的设计,若该 Broker 上负责有 Master 级别的 Topic Partition,需要触发该 Broker 负责的 Partition 的重新选主
- Controller 负责监控 Broker 并对其进行故障转移,Controller 在 Kafka 集群中也是单 Master,出现故障时需要从剩余 Broker 中从新选举
Broker 故障转移
在 Topic 创建时,Kafka 集群根据 Partition 配置创建多个 Topic Partition。每个 Topic Partition 有且仅有一个 leader,有一或多个 follower(数量由 replicate 因子而定)。Kafka 使用以下方法将 Topic Partition 分散到多个Broker,使得 Topic Partition 尽可能的分散:
- 假设有 n 个 broker
- 第 i 个 partition 的 leader 副本将被分配到第 (i mod n) 个 broker
- 第 i 个 partition 的第 j 个 follower 副本将被分配到第 ((i+j) mod n) 个 broker
往Kafka集群生产数据时,若将 ack 配置为 all,Broker 将确保在所有 follower 拉取到该消息时才返回给producer确认信号,如下图所示
Broker
leader 收到新消息会有落盘动作,follower 的 IO 线程拉取到新消息后,在落盘之前会回复给 leader 以 ACK 信号,此时新消息只在 follower 的内存中。这样设计是在可靠性和性能之间做权衡,因为 leader 和所有 follower 全部挂掉的概率是极低的,只要有 follower 在内存中保有新消息,就会在未来被落盘。关于 leader 和 follower 之间的数据同步
Leader
- Controller 在 Zookeeper 注册 watcher,当 Broker 宕机时 Zookeeper 会 fire 事件
- Controller 从 Zookeeper 读取可用的Broker列表
- Controller 构造宕机 Broker 需要故障转移的 Topic Partition,对于每个 Topic Partition:
- 从 Zookeeper 获取该 TopicPartition的ISR集合
- 决定新的 leader 所在Broker
- 将新 leader 所在 Broker、ISR、leader_epoch 等信息写回 Zookeeper
- 通过 RPC 直接向新 leader 所在 Broker 发送 leaderAndISRRequest 命令
- 原 follower 监听到 leader 变化,改为从 leader 拉取最新消息
Kafka 针对每个 Topic Partition 维护了 ISR 集合,只要 follower 存在于该集合中就意味着 follower 与 leader 的消息延迟在可接受范围内,这个可接受范围是通过消息落后条数、最近一次同步时间来配置的。极端情况下,ISR 集合可能为空,这意味着已存活的但不在 ISR 集合中的 follower 落后于 leader 太多。此时若运行提升 IRS 集合以外的 follower 为 leader 则有丢消息的风险,用户需要在可用性和一致性之间做出选择。
这里引申出另一个问题,Kafka 集群为何不使用 Zookeeper 进行 leader 选举?
- Broker 宕机时,其管理的 N 个 leader 角色的 Topic Partition 都要触发重新选举,如果 N 太大将给 Zookeeper 带来非常大的压力,即剩余的 Broker 同时进行 N 次 leader 选举
- 引入 Controller 模块可以减轻 Zookeeper 压力,通过自定义的重新选举逻辑可使得 leader 尽可能分散到各 Broker 中
Controller 故障转移
- Controller模块,是所有 Broker 借助 Zookeeper 临时节点选举出来的领导者 Broker。它是无状态的,不像 Broker 需要管理 Topic Partition 的数据。当 Controller 宕机时,通过 Zookeeper 触发重新选举即可
网友评论