具体流程如下图:
image.pngSarama有两种类型的生产者,同步生产者和异步生产者。
To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of Producer.RequiredAcks. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
大致意思是异步生产者使用channel接收(生产成功或失败)的消息,并且也通过channel来发送消息,这样做通常是性能最高的。而同步生产者需要阻塞,直到收到了acks。但是这也带来了两个问题,一是性能变得更差了,而是可靠性是依靠参数acks来保证的。
异步生产者 Demo 如下:
func Producer(topic string, limit int) {
config := sarama.NewConfig()
// 异步生产者不建议把 Errors 和 Successes 都开启,一般开启 Errors 就行
// 同步生产者就必须都开启,因为会同步返回发送成功或者失败
config.Producer.Return.Errors = true // 设定是否需要返回错误信息
config.Producer.Return.Successes = true // 设定是否需要返回成功信息
producer, err := sarama.NewAsyncProducer([]string{conf.HOST}, config)
if err != nil {
log.Fatal("NewSyncProducer err:", err)
}
var (
wg sync.WaitGroup
enqueued, timeout, successes, errors int
)
// [!important] 异步生产者发送后必须把返回值从 Errors 或者 Successes 中读出来 不然会阻塞 sarama 内部处理逻辑 导致只能发出去一条消息
wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
// log.Printf("[Producer] Success: key:%v msg:%+v \n", s.Key, s.Value)
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for e := range producer.Errors() {
log.Printf("[Producer] Errors:err:%v msg:%+v \n", e.Msg, e.Err)
errors++
}
}()
// 异步发送
for i := 0; i < limit; i++ {
str := strconv.Itoa(int(time.Now().UnixNano()))
msg := &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(str)}
// 异步发送只是写入内存了就返回了,并没有真正发送出去
// sarama 库中用的是一个 channel 来接收,后台 goroutine 异步从该 channel 中取出消息并真正发送
// select + ctx 做超时控制,防止阻塞 producer.Input() <- msg 也可能会阻塞
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
select {
case producer.Input() <- msg:
enqueued++
case <-ctx.Done():
timeout++
}
cancel()
if i%10000 == 0 && i != 0 {
log.Printf("已发送消息数:%d 超时数:%d\n", i, timeout)
}
}
// We are done
producer.AsyncClose()
wg.Wait()
log.Printf("发送完毕 总发送条数:%d enqueued:%d timeout:%d successes: %d errors: %d\n", limit, enqueued, timeout, successes, errors)
}
1)NewAsyncProducer() :创建 一个 producer 对象
2)producer.Input() <- msg :发送消息
3)s = <-producer.Successes(),e := <-producer.Errors() :异步获取成功或失败信息
2. 发送流程源码分析
另外:由于同步生产者和异步生产者逻辑是一致的,只是在异步生产者基础上封装了一层,所以本文主要分析了异步生产者。
// 可以看到 同步生产者其实就是把异步生产者封装了一层
type syncProducer struct {
producer *asyncProducer
wg sync.WaitGroup
}
NewAsyncProducer
首先是构建一个异步生产者对象
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
client, err := NewClient(addrs, conf)
if err != nil {
return nil, err
}
return newAsyncProducer(client)
}
func newAsyncProducer(client Client) (AsyncProducer, error) {
// ...
p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
}
go withRecover(p.dispatcher)
go withRecover(p.retryHandler)
}
可以看到在 newAsyncProducer 最后开启了两个 goroutine,一个为 dispatcher,一个为 retryHandler 。
retryHandler 主要是处理重试逻辑,暂时先忽略。
dispatcher
主要根据 topic 将消息分发到对应的 channel。
func (p *asyncProducer) dispatcher() {
handlers := make(map[string]chan<- *ProducerMessage)
// ...
for msg := range p.input {
// 拦截器逻辑
for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
// 找到这个Topic对应的Handler
handler := handlers[msg.Topic]
if handler == nil {
// 如果没有这个Topic对应的Handler,那么创建一个
handler = p.newTopicProducer(msg.Topic)
handlers[msg.Topic] = handler
}
// 然后把这条消息写进这个Handler中
handler <- msg
}
}
具体逻辑:从 p.input 中取出消息并写入到 handler 中,如果 topic 对应的 handler 不存在,则调用 newTopicProducer() 创建。
这里的 handler 是一个 buffered channel
然后让我们来看下handler = p.newTopicProducer(msg.Topic)这一行的代码。
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
tp := &topicProducer{
parent: p,
topic: topic,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
handlers: make(map[int32]chan<- *ProducerMessage),
partitioner: p.conf.Producer.Partitioner(topic),
}
go withRecover(tp.dispatch)
return input
}
在这里创建了一个缓冲大小为ChannelBufferSize的channel,用于存放发送到这个主题的消息,然后创建了一个 topicProducer。
在这个时候你可以认为消息已经交付给各个 topic 对应的 topicProducer 了。
还有一个需要注意的是newTopicProducer 的这种写法,内部创建一个 chan 返回到外层,然后通过在内部新开一个 goroutine 来处理该 chan 里的消息,这种写法在后面还会遇到好几次。
topicDispatch
newTopicProducer的最后一行go withRecover(tp.dispatch)又启动了一个 goroutine 用于处理消息。也就是说,到了这一步,对于每一个Topic,都有一个协程来处理消息。
dispatch 具体如下:
func (tp *topicProducer) dispatch() {
for msg := range tp.input {
handler := tp.handlers[msg.Partition]
if handler == nil {
handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
tp.handlers[msg.Partition] = handler
}
handler <- msg
}
}
可以看到又是同样的套路:
1)找到这条消息所在的分区对应的 channel,然后把消息丢进去
2)如果不存在则新建 chan
PartitionDispatch
新建的 chan 是通过 newPartitionProducer 返回的,和之前的newTopicProducer又是同样的套路,点进去看一下:
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
pp := &partitionProducer{
parent: p,
topic: topic,
partition: partition,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
}
go withRecover(pp.dispatch)
return input
}
果然是这样,有没有一种似曾相识的感觉。
TopicProducer是按照 Topic 进行分发,这里的 PartitionProducer 则是按照 partition 进行分发。
到这里可以认为消息已经交付给对应 topic 下的对应 partition 了。
每个 partition 都会有一个 goroutine 来处理分发给自己的消息。
PartitionProducer
到了这一步,我们再来看看消息到了每个 partition 所在的 channel 之后,是如何处理的。
其实在这一步中,主要是做一些错误处理之类的,然后把消息丢进brokerProducer。
可以理解为这一步是业务逻辑层到网络IO层的转变,在这之前我们只关心消息去到了哪个分区,而在这之后,我们需要找到这个分区所在的 broker 的地址,并使用之前已经建立好的 TCP 连接,发送这条消息。
具体 pp.dispatch 代码如下
func (pp *partitionProducer) dispatch() {
// 找到这个主题和分区的leader所在的broker
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
if pp.leader != nil {
// 根据 leader 信息创建一个 BrokerProducer 对象
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1)
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}
// 然后把消息丢进brokerProducer中
for msg := range pp.input {
pp.brokerProducer.input <- msg
}
}
根据之前的套路我们知道,真正的逻辑肯定在pp.parent.getBrokerProducer(pp.leader) 这个方法里面。
BrokerProducer
到了这里,大概算是整个发送流程最后的一个步骤了。
让我们继续跟进pp.parent.getBrokerProducer(pp.leader)这行代码里面的内容。其实就是找到asyncProducer中的brokerProducer,如果不存在,则创建一个。
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()
bp := p.brokers[broker]
if bp == nil {
bp = p.newBrokerProducer(broker)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
}
p.brokerRefs[bp]++
return bp
}
又调用了newBrokerProducer(),继续追踪下去:
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse)
)
bp := &brokerProducer{
parent: p,
broker: broker,
input: input,
output: bridge,
responses: responses,
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)
// minimal bridge to make the network response `select`able
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
response, err := broker.Produce(request)
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
}
close(responses)
})
if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
return bp
}
这里又启动了两个 goroutine,一个为 run,一个是匿名函数姑且称为 bridge。
bridge 看起来是真正的发送逻辑,那么 batch handle 逻辑应该是在 run 方法里了。
这里先分析 bridge 函数,run 在下一章分析。
buildRequest
buildRequest 方法主要是构建一个标准的 Kafka Request 消息。
根据不同版本、是否配置压缩信息做了额外处理,这里先忽略,只看核心代码:
func (ps *produceSet) buildRequest() *ProduceRequest {
req := &ProduceRequest{
RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
}
for topic, partitionSets := range ps.msgs {
for partition, set := range partitionSets {
rb := set.recordsToSend.RecordBatch
if len(rb.Records) > 0 {
rb.LastOffsetDelta = int32(len(rb.Records) - 1)
for i, record := range rb.Records {
record.OffsetDelta = int64(i)
}
}
req.AddBatch(topic, partition, rb)
continue
}
}
}
首先是构建一个 req 对象,然后遍历 ps.msg 中的消息,根据 topic 和 partition 分别写入到 req 中。
Produce
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
var (
response *ProduceResponse
err error
)
if request.RequiredAcks == NoResponse {
err = b.sendAndReceive(request, nil)
} else {
response = new(ProduceResponse)
err = b.sendAndReceive(request, response)
}
if err != nil {
return nil, err
}
return response, nil
}
最终调用了sendAndReceive()方法将消息发送出去。
如果我们设置了需要 Acks,就会传一个 response 进去接收返回值;如果没设置,那么消息发出去之后,就不管了。
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
promise, err := b.send(req, res != nil, responseHeaderVersion)
if err != nil {
return err
}
select {
case buf := <-promise.packets:
return versionedDecode(buf, res, req.version())
case err = <-promise.errors:
return err
}
}
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
if err != nil {
return nil, err
}
bytes, err := b.write(buf)
}
最终通过bytes, err := b.write(buf) 发送出去。
至此,Sarama生产者相关的内容就介绍完毕了。
还有一个比较重要的,消息打包批量发送的逻辑,比较多再下一章讲。
3. 消息打包源码分析
在之前 BrokerProducer 逻辑中启动了两个 goroutine,其中 bridge 从 chan 中取消息并真正发送出去。
那么这个 chan 里的消息是哪里来的呢?
其实这就是另一个 goroutine 的工作了。
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse)
)
bp := &brokerProducer{
parent: p,
broker: broker,
input: input,
output: bridge,
responses: responses,
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)
// minimal bridge to make the network response `select`able
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
response, err := broker.Produce(request)
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
}
close(responses)
})
if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
return bp
}
run
func (bp *brokerProducer) run() {
var output chan<- *produceSet
for {
select {
case msg, ok := <-bp.input:
// 1. 检查 buffer 空间是否足够存放当前 msg
if bp.buffer.wouldOverflow(msg) {
if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
// 2. 将 msg 存入 buffer
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
}
// 3. 如果间隔时间到了,也会将消息发出去
case <-bp.timer:
bp.timerFired = true
// 4. 将 buffer 里的数据发送到 局部变量 output chan 里
case output <- bp.buffer:
bp.rollOver()
case response, ok := <-bp.responses:
if ok {
bp.handleResponse(response)
}
}
// 5.如果发送时间到了 或者消息大小或者条数达到阈值 则表示可以发送了 将 bp.output chan 赋值给局部变量 output
if bp.timerFired || bp.buffer.readyToFlush() {
output = bp.output
} else {
output = nil
}
}
}
1)首先检测 buffer 空间
2)将 msg 写入 buffer
3)后面的 3 4 5 步都是在发送消息,或者为发送消息做准备
wouldOverflow
if bp.buffer.wouldOverflow(msg) {
if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
在 add 之前先调用bp.buffer.wouldOverflow(msg) 方法检查 buffer 是否存在足够空间以存放当前消息。
wouldOverflow 比较简单,就是判断当前消息大小或者消息数量是否超过设定值:
func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
switch {
case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
return true
case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
return true
case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
return true
default:
return false
}
}
如果不够就要调用bp.waitForSpace() 等待 buffer 腾出空间,其实就是把 buffer 里的消息发到 output chan。
这个 output chan 就是前面匿名函数里的 bridge。
func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
for {
select {
case response := <-bp.responses:
bp.handleResponse(response)
if reason := bp.needsRetry(msg); reason != nil {
return reason
} else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
return nil
}
case bp.output <- bp.buffer:
bp.rollOver()
return nil
}
}
}
add
接下来是调用bp.buffer.add()把消息添加到 buffer,功能比较简单,把待发送的消息添加到 buffer 中。
func (ps *produceSet) add(msg *ProducerMessage) error {
// 1.消息编码
key, err = msg.Key.Encode()
val, err = msg.Value.Encode()
// 2.添加消息到 set.msgs 数组
set.msgs = append(set.msgs, msg)
// 3.添加到set.recordsToSend
msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
msgToSend.Timestamp = timestamp
msgToSend.Version = 1
}
set.recordsToSend.MsgSet.addMessage(msgToSend)
// 4. 增加 buffer 大小和 buffer 中的消息条数
ps.bufferBytes += size
ps.bufferCount++
}
set.recordsToSend.MsgSet.addMessage也很简单:
func (ms *MessageSet) addMessage(msg *Message) {
block := new(MessageBlock)
block.Msg = msg
ms.Messages = append(ms.Messages, block)
}
定时发送
因为异步发送者除了消息数或者消息大小达到阈值会触发一次发送之外,到了一定时间也会触发一次发送,具体逻辑也在这个 run 方法里,这个地方比较有意思。
func (bp *brokerProducer) run() {
var output chan<- *produceSet
for {
select {
case msg, ok := <-bp.input:
// 1.时间到了就将 bp.timerFired 设置为 true
case <-bp.timer:
bp.timerFired = true
// 3.直接把 buffer 里的消息往局部变量 output 里发
case output <- bp.buffer:
bp.rollOver()
}
// 2.如果时间到了,或者 buffer 里的消息达到阈值后都会触发真正的发送逻辑,这里实现比较有意思,需要发送的时候就把 bp.output 也就是存放真正需要发送的批量消息的 chan 赋值给局部变量 output,如果不需要发送就把局部变量 output 清空
if bp.timerFired || bp.buffer.readyToFlush() {
output = bp.output
} else {
output = nil
}
}
}
根据注释中的 1、2、3步骤看来,如果第二步需要发送就会给 output 赋值,这样下一轮 select 的时候case output <- bp.buffer: 这个 case 就可能会执行到,就会把消息发给 output,实际上就是发送给了 bp.output.
如果第二步时不需要发消息,output 就被置空,select 时对应的 case 就不会被执行。
正常写法一般是在启动一个 goroutine 来处理定时发送的功能,但是这样两个 goroutine 之间就会存在竞争,会影响性能。这样处理省去了加解锁过程,性能会高一些,但是随之而来的是代码复杂度的提升。
网友评论