美文网首页
Q for Kafka

Q for Kafka

作者: Secret_Sun | 来源:发表于2020-04-09 19:25 被阅读0次

    总览

    Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

    基础信息分享

    需要详细阅读基础 SDK (按 Go Sarama 为例)

    基础知识点

    首先,我们必须理解,partiton 是 kafka 的并行单元。从 producer 和 broker 的视角看,向不同的 partition 写入是完全并行的;而对于 consumer,并发数完全取决于 partition 的数量,即,如果 consumer 数量大于 partition 数量,则必有 consumer 闲置。所以,我们可以认为 kafka 的吞吐与 partition 时线性关系。partition 的数量要根据吞吐来推断,假定 p 代表生产者写入单个 partition 的最大吞吐,c 代表消费者从单个 partition 消费的最大吞吐,我们的目标吞吐是 t,那么 partition 的数量应该是 t/p 和 t/c 中较大的那一个。实际情况中,p的影响因素有批处理的规模,压缩算法,确认机制和副本数等,通常建议 partition 的数量一定要大于等于消费者的数量来实现最大并发。

    • 一个 partition 就是一个存储 kafka-log 的目录。
    • 一个 partition 只能寄宿在一个 broker 上。
    • 单个 partition 是可以实现消息的顺序写入的。
    • 单个 partition 只能被单个消费者进程消费,与该消费者所属于的消费组无关。这样做,有助于实现顺序消费。
    • 单个消费者进程可同时消费多个 partition,即 partition 限制了消费端的并发能力。
    • partition 越多则 file 和 memory 消耗越大,要在服务器承受服务器设置。
    • 每个 partition 信息都存在所有的zk节点中。
    • partition 越多则失败选举耗时越长。
    • offset 是对每个 partition 而言的,partition 越多,查询 offset 就越耗时。
    • partition 的数量是可以动态增加的(只能加不能减)。

    常见客户端问题

    • 客户端版本必须与 kafka 版本一致,多半发生在非 Java 客户端情况下
    • 客户端 Rebalance 情况发生,版本为 0.10 有一定概率触发频繁 Rebalance;消息队列 Kafka 版的 Consumer 没有独立线程维持心跳,而是把心跳维持与 poll 接口耦合在一起。其结果就是,如果用户消费出现卡顿,就会导致 Consumer 心跳超时,引发 rebalance

    客户端入门

    注意点: 客户端一定要记住客户端版本与kafka 版本对应关系

    Producer Demo

    package main
     
    import (
        "fmt"
        "time"
     
        "github.com/Shopify/sarama"
    )
     
    func main() {
        kafkaVersion, err := sarama.ParseKafkaVersion("0.10.2.0") // 返回对应版本号
        address := []string{"127.0.0.1:9092","127.0.0.2:9092","127.0.0.3:9092"}
     
        config := sarama.NewConfig()
        config.Version = kafkaVersion
     
        /*
        const (
            // NoResponse doesn't send any response, the TCP ACK is all you get.
            NoResponse RequiredAcks = 0
            // WaitForLocal waits for only the local commit to succeed before responding.
            WaitForLocal RequiredAcks = 1
            // WaitForAll waits for all in-sync replicas to commit before responding.
            // The minimum number of in-sync replicas is configured on the broker via
            // the `min.insync.replicas` configuration key.
            WaitForAll RequiredAcks = -1
        )
         */
     
        config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要 leader 和 follow 都确认
        config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
        config.Producer.Return.Successes = true                   // 成功交付的消息将在 success channel 返回
     
        // 构造一个消息
        msg := &sarama.ProducerMessage{}
        msg.Topic = "cloudinfra_test_topic"
        now := time.Now().Format("2006-01-02 15:04:05")
        msg.Value = sarama.StringEncoder(now +": This is a test log ...")
     
        // 连接 kafka
        client, err := sarama.NewSyncProducer(address, config)
        if err != nil {
            fmt.Println("producer closed, err:", err)
            return
        }
        defer client.Close()
        // 发送消息
        pid, offset, err := client.SendMessage(msg)
        if err != nil {
            fmt.Println("send msg failed, err:", err)
            return
        }
        fmt.Printf("pid:%v offset:%v\n", pid, offset)
    }
    

    Consumer Demo

    package main
     
    import (
        "context"
        "fmt"
        "os"
        "os/signal"
        "strings"
        "sync"
        "syscall"
     
        "github.com/Shopify/sarama"
    )
     
    var (
        wg sync.WaitGroup
    )
     
    // Consumer represents a Sarama consumer group consumer
    type Consumer struct {
        ready chan bool
    }
     
    // Setup is run at the beginning of a new session, before ConsumeClaim
    func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
        // Mark the consumer as ready
        close(consumer.ready)
        return nil
    }
     
    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
        return nil
    }
     
    // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
    func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        // NOTE:
        // Do not move the code below to a goroutine.
        // The `ConsumeClaim` itself is called within a goroutine, see:
        // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
        for message := range claim.Messages() {
            fmt.Println("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
            session.MarkMessage(message, "")
        }
        return nil
    }
     
    func main() {
        kafkaVersion, err := sarama.ParseKafkaVersion("0.10.2.0")
        address := []string{"127.0.0.1:9092","127.0.0.2:9092","127.0.0.3:9092"}
        config := sarama.NewConfig()
        config.Version = kafkaVersion
     
        consumer := Consumer{
            ready: make(chan bool),
        }
     
        //创建消费者
        ctx, cancel := context.WithCancel(context.Background())
        consumerGroup, err := sarama.NewConsumerGroup(address, "Cloudinfra-TestConsumerGroup", config)
     
        if err != nil {
            fmt.Println("Failed to start consumer group: %s", err)
            return
        }
     
        wg := &sync.WaitGroup{}
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                // `Consume` should be called inside an infinite loop, when a
                // server-side rebalance happens, the consumer session will need to be
                // recreated to get the new claims
                if err := consumerGroup.Consume(ctx, strings.Split("cloudinfra_test_topic", ","), &consumer); err != nil {
                    fmt.Println("Error from consumer: %v", err)
                }
                // check if context was cancelled, signaling that the consumer should stop
                if ctx.Err() != nil {
                    return
                }
                consumer.ready = make(chan bool)
            }
        }()
        <-consumer.ready
        fmt.Println("Sarama consumer up and running!...")
     
        sigterm := make(chan os.Signal, 1)
        signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
        select {
        case <-ctx.Done():
            fmt.Println("terminating: context cancelled")
        case <-sigterm:
            fmt.Println("terminating: via signal")
        }
        cancel()
        wg.Wait()
        if err = consumerGroup.Close(); err != nil {
            fmt.Println("Error closing client: ", err)
        }
    }
    

    生产者最佳实践

    失败重试

    分布式环境下,由于网络等原因,偶尔发送失败是常见的。导致这种失败的原因可能是消息已经发送成功,但是 Ack 失败,也有可能是确实没发送成功。

    云上很多全托管消息队列 Kafka 是 VIP 网络架构,会主动断开空闲连接(30 秒没活动),因此,不是一直活跃的客户端会经常收到 "connection rest by peer" 错误,建议重试消息发送。

    重试参数,您可以根据业务需求,设置以下重试参数:

    • retries,重试次数,建议设置为 3。
    • retry.backoff.ms,重试间隔,建议设置为 1000。

    异步发送

    发送接口是异步的,如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。

    线程安全

    Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,一个应用对应一个 Producer 就足够了。

    Acks

    • acks=0,表示无需服务端的 Response,性能较高,丢数据风险较大;
    • acks=1,服务端主节点写成功即返回 Response,性能中等,丢数据风险中等,主节点宕机可能导致数据丢失;
    • acks=all,服务端主节点写成功,且备节点同步成功,才返回 Response,性能较差,数据较为安全,主节点和备节点都宕机才会导致数据丢失。
    • 一般建议选择 acks=1,重要的服务可以设置 acks=all。

    Batch

    Batch 的基本思路是:把消息缓存在内存中,并进行打包发送。Kafka 通过 Batch 来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。 在构建 Producer 时,需要考虑以下两个参数:

    • batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)达到这个数值时,就会触发一次网络请求,然后客户端把消息真正发往服务器;
    • linger.ms : 每条消息待在缓存中的最长时间。若超过这个时间,就会忽略 batch.size 的限制,然后客户端立即把消息发往服务器。
      由此可见,Kafka 客户端什么时候把消息真正发往服务器,是由上面两个参数共同决定的: batch.size 有助于提高吞吐,linger.ms 有助于控制延迟。您可以根据具体业务需求进行调整。

    Rebalance 如何避免

    消费者最佳实践

    消费消息基本流程

    消息队列 Kafka 版订阅者在订阅消息时的基本流程是: 1. 2. 3.

    1. Poll 数据
    2. 执行消费逻辑
    3. 再次 poll 数据

    负载均衡

    每个 Consumer Group 可以包含多个消费实例,即可以启动多个消息队列 Kafka 版 Consumer,并把参数 group.id 设置成相同的值。属于同一个 Consumer Group 的消费实例会负载消费订阅的 Topic。

    举例:Consumer Group A 订阅了 Topic A,并开启三个消费实例 C1、C2、C3,则发送到 Topic A 的每条消息最终只会传给 C1、C2、C3 的某一个。Kafka 默认会均匀地把消息传给各个消息实例,以做到消费负载均衡。

    Kafka 负载消费的内部原理是,把订阅的 Topic 的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量,否则会有实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡。

    消息队列 Kafka 版的每个 Topic 的分区数量默认是 16 个,已经足够满足大部分场景的需求,且云上服务会根据容量调整分区数。

    多个订阅

    一个 Consumer Group 可以订阅多个 Topic。 一个 Topic 也可以被多个 Consumer Group 订阅,且各个 Consumer Group 独立消费 Topic 下的所有消息。

    举例:Consumer Group A 订阅了 Topic A,Consumer Group B 也订阅了 Topic A,则发送到 Topic A 的每条消息,不仅会传一份给 Consumer Group A 的消费实例,也会传一份给 Consumer Group B 的消费实例,且这两个过程相互独立,相互没有任何影响。

    消费位点

    每个 Topic 会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点 MaxOffset。

    消息队列 Kafka 版 Consumer 会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为 ConsumerOffset。

    剩余的未消费的条数(也称为消息堆积量) = MaxOffset - ConsumerOffset

    消费位点提交

    消息队列 Kafka 版消费者有两个相关参数:

    • enable.auto.commit:默认值为 true。
    • auto.commit.interval.ms: 默认值为 1000,也即 1s。

    这两个参数组合的结果就是,每次 poll 数据前会先检查上次提交位点的时间,如果距离当前时间已经超过参数 auto.commit.interval.ms 规定的时长,则客户端会启动位点提交动作。

    因此,如果将enable.auto.commit设置为 true,则需要在每次 poll 数据时,确保前一次 poll 出来的数据已经消费完毕,否则可能导致位点跳跃。

    如果想自己控制位点提交,请把 enable.auto.commit 设为 false,并调用 commit(offsets)函数自行控制位点提交。

    消费位点重置

    以下两种情况,会发生消费位点重置:

    • 当服务端不存在曾经提交过的位点时(比如客户端第一次上线)。
    • 当从非法位点拉取消息时(比如某个分区最大位点是10,但客户端却从11开始拉取消息)。

    Java 客户端可以通过 auto.offset.reset 来配置重置策略,主要有三种策略:

    • "latest",从最大位点开始消费。
    • "earliest",从最小位点开始消费。
    • "none",不做任何操作,也即不重置。
    • 推荐业务自带幂等能力。

    Kafka 故障转移

    • Broker 出现故障,由于 Topic Partition 是单 Master 提供读写的设计,若该 Broker 上负责有 Master 级别的 Topic Partition,需要触发该 Broker 负责的 Partition 的重新选主
    • Controller 负责监控 Broker 并对其进行故障转移,Controller 在 Kafka 集群中也是单 Master,出现故障时需要从剩余 Broker 中从新选举

    Broker 故障转移

    在 Topic 创建时,Kafka 集群根据 Partition 配置创建多个 Topic Partition。每个 Topic Partition 有且仅有一个 leader,有一或多个 follower(数量由 replicate 因子而定)。Kafka 使用以下方法将 Topic Partition 分散到多个Broker,使得 Topic Partition 尽可能的分散:

    • 假设有 n 个 broker
    • 第 i 个 partition 的 leader 副本将被分配到第 (i mod n) 个 broker
    • 第 i 个 partition 的第 j 个 follower 副本将被分配到第 ((i+j) mod n) 个 broker

    往Kafka集群生产数据时,若将 ack 配置为 all,Broker 将确保在所有 follower 拉取到该消息时才返回给producer确认信号,如下图所示


    Broker

    leader 收到新消息会有落盘动作,follower 的 IO 线程拉取到新消息后,在落盘之前会回复给 leader 以 ACK 信号,此时新消息只在 follower 的内存中。这样设计是在可靠性和性能之间做权衡,因为 leader 和所有 follower 全部挂掉的概率是极低的,只要有 follower 在内存中保有新消息,就会在未来被落盘。关于 leader 和 follower 之间的数据同步


    Leader
    • Controller 在 Zookeeper 注册 watcher,当 Broker 宕机时 Zookeeper 会 fire 事件
    • Controller 从 Zookeeper 读取可用的Broker列表
    • Controller 构造宕机 Broker 需要故障转移的 Topic Partition,对于每个 Topic Partition:
      1. 从 Zookeeper 获取该 TopicPartition的ISR集合
      2. 决定新的 leader 所在Broker
      3. 将新 leader 所在 Broker、ISR、leader_epoch 等信息写回 Zookeeper
      4. 通过 RPC 直接向新 leader 所在 Broker 发送 leaderAndISRRequest 命令
      5. 原 follower 监听到 leader 变化,改为从 leader 拉取最新消息

    Kafka 针对每个 Topic Partition 维护了 ISR 集合,只要 follower 存在于该集合中就意味着 follower 与 leader 的消息延迟在可接受范围内,这个可接受范围是通过消息落后条数、最近一次同步时间来配置的。极端情况下,ISR 集合可能为空,这意味着已存活的但不在 ISR 集合中的 follower 落后于 leader 太多。此时若运行提升 IRS 集合以外的 follower 为 leader 则有丢消息的风险,用户需要在可用性和一致性之间做出选择。

    这里引申出另一个问题,Kafka 集群为何不使用 Zookeeper 进行 leader 选举?

    • Broker 宕机时,其管理的 N 个 leader 角色的 Topic Partition 都要触发重新选举,如果 N 太大将给 Zookeeper 带来非常大的压力,即剩余的 Broker 同时进行 N 次 leader 选举
    • 引入 Controller 模块可以减轻 Zookeeper 压力,通过自定义的重新选举逻辑可使得 leader 尽可能分散到各 Broker 中

    Controller 故障转移

    • Controller模块,是所有 Broker 借助 Zookeeper 临时节点选举出来的领导者 Broker。它是无状态的,不像 Broker 需要管理 Topic Partition 的数据。当 Controller 宕机时,通过 Zookeeper 触发重新选举即可

    相关文章

      网友评论

          本文标题:Q for Kafka

          本文链接:https://www.haomeiwen.com/subject/qkdhmhtx.html