Kafka 基础
-
Broker :一个kafka实例就是一个broker。一般一台服务器起一个kafka实例,那么可以认为一台服务器就是一个broker。一个集群由多个broker组成,又由zookeeper管理集群的元数据
-
Topic: 一个topic可以理解为一个消息队列,一个broker可以容纳多个topic
-
partition: 每个主题可以有多个分区partition,同一topic的不同partition可以分布在不同broker上,从而便于实现服务端的良好伸缩。每个partition是一个有序的队列,partition是kafka的一个有序单位。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体 (多个partition间)的顺序
因此,要实现topic内的全局有序,就必须使用单分区;要实现局部有序,可以使用hash策略,按key将消息落在特定的分区上,但是扩容时可能会有问题
-
Replica:每个partition又可以拥有多个副本Replica,多副本分布在不同机器上,保证存储可靠。另外,多副本会选择一个作为partition leader。实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。设置N副本,则表示最多允许N-1台broker宕机。
-
message: 每个分区可以存储多条消息message
-
Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
-
Producer :消息生产者,就是向kafka broker发消息的客户端。Producer向kafka推消息的时候,需要指定topic和分区选择策略(一般是轮询策略、随机策略或者hash),从而将消息存入目标分区
-
Consumer :消息消费者,向kafka broker取消息的客户端
-
Consumer Group: 可以创建多个消费实例并设置同一个
group-id
来区分消费组,同一个消费组可以指定一个或多个Topic
进行消费:- 消费组自平衡(Rebalance),kafka会根据消费组实例数量和分区数量自平衡分配
- 不会重复消费,同个组内kafka确保一个分区只会发往一个消费实例,避免重复消费
- 高可用,当一个消费实例挂了,kafka会自动调整消费实例和分区的关系
Golang sarama使用kafka案例
基本概念
-
Config. Config是一个通用的配置项,用于将多个配置选项传递给 Sarama 的各个构造函数,获得各种实例
针对消费者,配置Config.Consumer后,将Config传给消费者的构造函数
针对生产者,配置Config.Producer
针对消费者组,配置Config.Consumer.Group
-
Client. Client is a generic Kafka client。Client是一个接口,描述一个通用的用于连接kafka的客户端。生产者和消费者都是更高层的概念,他们通过Client向kafka推送和拉取消息
It is safe to share a client amongst many users(users means a group of producers or a group of consumers), however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.
多个生产者可以共用一个Client向kafka推消息,多个消费者也可以共用一个Client从kafka拉消息。但推荐单个生产者/消费者配置独立的Client
-
To produce messages, use either the
AsyncProducer
or theSyncProducer
通过配置
sarama.Config.Producer.RequiredAcks
来选择不同模式的生产者The
AsyncProducer
accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases.The
SyncProducer
provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficientThe actual durability guarantees depend on the configured value of
Producer.RequiredAcks
. And for implementation reasons, the SyncProducer requiresProducer.Return.Errors
andProducer.Return.Successes
to be set to true in its configuration.使用SyncProducer时,需要同时设置sarama.Config中的
Producer.RequiredAcks=1
.Producer.Return.Errors=true
和Producer.Return.Successes=true
-
To consume messages, use
Consumer
orConsumer-Group
.
// comsumer.go
// consumer实现了Consumer接口
type consumer struct {
conf *Config
// PartitionConsumer processes Kafka messages from a given topic and partition.
children map[string]map[int32]*partitionConsumer
brokerConsumers map[*Broker]*brokerConsumer
client Client
lock sync.Mutex
}
// 而consumer的Close()、Topics()、Partitions()实际都是调用client的同名方法。例如:
func (c *consumer) Topics() ([]string, error) {
return c.client.Topics()
}
// sarama提供了两个方法返回一个consumer instance
NewConsumer & NewConsumerFromClient
// comsumer_group.go
// 接口ConsumerGroup
type ConsumerGroup interface {
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
Errors() <-chan error
Close() error
}
// Consume是消费者组进行消息消费的方法
// Consume joins a cluster of consumers for a given list of topics and
// starts a 【blocking ConsumerGroupSession through the ConsumerGroupHandler】.
// consumerGroup是接口ConsumerGroup的具体实现
type consumerGroup struct {
//一个consumerGroup包含了一个client和一个consumer,通常是为了继承client和consumer的方法
client Client
config *Config
consumer Consumer
groupID string
memberID string
errors chan error
lock sync.Mutex
closed chan none // type none struct{}
closeOnce sync.Once
userData []byte
}
// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
// It also provides hooks for your consumer group session life-cycle and allow you to
// trigger logic before or after the consume loop(s).
// 指可以自己实现Setup(ConsumerGroupSession)和Cleanup(ConsumerGroupSession)
//
// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
// ensure that all state is safely protected against race conditions.
type ConsumerGroupHandler interface {
// Setup is run at the beginning of a new session, before ConsumeClaim.
// session开始前会调用setup
Setup(ConsumerGroupSession) error
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
// session结束后会调用Cleanup
Cleanup(ConsumerGroupSession) error
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
一个comsumergroup会在一个consumerGroupSession内进行消息的消费
type consumerGroupSession struct {
parent *consumerGroup
memberID string
generationID int32
handler ConsumerGroupHandler // 就是一个个comsumer
claims map[string][]int32
offsets *offsetManager
ctx context.Context
cancel func()
waitGroup sync.WaitGroup
releaseOnce sync.Once
hbDying, hbDead chan none
}
comsumergroup的消费动作调用链如下
*comsumergroup.Consume()
-> *comsumergroup.newSession()
-> *consumerGroupSession.consume(topic, partition)
-> ConsumerGroupHandler.ConsumeClaim(*consumerGroupSession, claim)
consumerGroup使用样例
t := "test_topic"
// #### 实现sarama.ConsumerGroupHandler ####
// Consumer represents a Sarama consumergroup consumer
type Consumer struct {
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
fmt.Println("consumer Setup")
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
fmt.Println("consumer Cleanup")
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
for message := range claim.Messages() {
if message.Topic == t {
// ProcessMessage
fmt.Println(message.Value)
}
}
return nil
}
// #### 实现sarama.ConsumerGroupHandler ####
// get a kafka config
config := sarama.NewConfig()
// 获得comsumergroup
brokers := []string{"127.0.0.1:9092"}
groupname := "my_group"
consumerG, err := sarama.NewConsumerGroup(brokers, groupname, config)
// Iterate over consumer sessions.
ctx := context.Background()
topics := []string{t}
for {
handler := Consumer{}
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
// consumerG.Consume里面会起一个consumerGroupSession,放在for循环里面,每次都是新的session,方便响应rebalance
err := consumerG.Consume(ctx, topics, &handler)
if err != nil {
fmt.Println(err.Error())
return err
}
}
网友评论