美文网首页
Kafka系列之(4)——Kafka Producer流程解析

Kafka系列之(4)——Kafka Producer流程解析

作者: 康康不遛猫 | 来源:发表于2017-05-15 00:01 被阅读0次

    Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer。

    1、Kafka Producer工作流程

    Paste_Image.png
    Paste_Image.png

    注:ProducerRecord允许用户在创建消息对象的时候就直接指定要发送的分区,这样producer后续发送该消息时可以直接发送到指定分区,而不用先通过Partitioner计算目标分区了。另外,我们还可以直接指定消息的时间戳——但一定要慎重使用这个功能,因为它有可能会令时间戳索引机制失效。


    Paste_Image.png

    流程描述:
    用户首先构建待发送的消息对象ProducerRecord,然后调用KafkaProducer#send方法进行发送。KafkaProducer接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给partitioner去确定目标分区,最后追加写入到内存中的消息缓冲池(accumulator)。此时KafkaProducer#send方法成功返回。同时,KafkaProducer中还有一个专门的Sender IO线程负责将缓冲池中的消息分批次发送给对应的broker,完成真正的消息发送逻辑。
    新版本的producer从设计上来说具有以下几个特点:
    总共创建两个线程:执行KafkaPrducer#send逻辑的线程——我们称之为“用户主线程”;执行发送逻辑的IO线程——我们称之为“Sender线程”。
    不同于Scala老版本的producer,新版本producer完全异步发送消息,并提供了回调机制(callback)供用户判断消息是否成功发送。
    batching机制——“分批发送“机制。每个批次(batch)中包含了若干个PRODUCE请求,因此具有更高的吞吐量。
    更加合理的默认分区策略:对于无key消息而言,Scala版本分区策略是一段时间内(默认是10分钟)将消息发往固定的目标分区,这容易造成消息分布的不均匀,而新版本的producer采用轮询的方式均匀地将消息分发到不同的分区。
    底层统一使用基于Selector的网络客户端实现,结合Java提供的Future实现完整地提供了更加健壮和优雅的生命周期管理。
    关键参数
    batch.size 我把它列在了首位,因为该参数对于调优producer至关重要。之前提到过新版producer采用分批发送机制,该参数即控制一个batch的大小。默认是16KB
    acks关乎到消息持久性(durability)的一个参数。高吞吐量和高持久性很多时候是相矛盾的,需要先明确我们的目标是什么? 高吞吐量?高持久性?亦或是中等?因此该参数也有对应的三个取值:0, -1和1
    linger.ms减少网络IO,节省带宽之用。原理就是把原本需要多次发送的小batch,通过引入延时的方式合并成大batch发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时
    compression.type producer所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置
    max.in.flight.requests.per.connection 关乎消息乱序的一个配置参数。它指定了Sender线程在单个Socket连接上能够发送未应答PRODUCE请求的最大请求数。适当增加此值通常会增大吞吐量,从而整体上提升producer的性能。不过笔者始终觉得其效果不如调节batch.size来得明显,所以请谨慎使用。另外如果开启了重试机制,配置该参数大于1可能造成消息发送的乱序(先发送A,然后发送B,但B却先行被broker接收)
    retries 重试机制,对于瞬时失败的消息发送,开启重试后KafkaProducer会尝试再次发送消息。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。

    2、内部流程

    当用户调用KafkaProducer.send(ProducerRecord, Callback)时Kafka内部流程分析:

    (1)、Step 1: 序列化+计算目标分区

    这是KafkaProducer#send逻辑的第一步,即为待发送消息进行序列化并计算目标分区,如下图所示:


    Paste_Image.png

    如上图所示,一条所属topic是"test",消息体是"message"的消息被序列化之后结合KafkaProducer缓存的元数据(比如该topic分区数信息等)共同传给后面的Partitioner实现类进行目标分区的计算。

    (2)、 Step 2: 追加写入消息缓冲区(accumulator)

    producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。除了之前在“关键参数”段落中提到的linger.ms和batch.size等参数之外,该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。举个简单的例子,假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{"test-0" -> [batch1, batch2], "test-1" -> [batch3]}。
    单个topic分区下的batch队列中保存的是若干个消息批次。每个batch中最重要的3个组件包括:
    compressor: 负责执行追加写入操作
    batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方
    thunks:保存消息回调逻辑的集合
    这一步的目的就是将待发送的消息写入消息缓冲池中,具体流程如下图所示:


    Paste_Image.png

    这一步执行完毕之后理论上讲KafkaProducer.send方法就执行完毕了,用户主线程所做的事情就是等待Sender线程发送消息并执行返回结果了。

    (3)、Step 3: Sender线程预处理及消息发送

    此时,该Sender线程登场了。严格来说,Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的:
    不断轮询缓冲区寻找已做好发送准备的分区
    将轮询获得的各个batch按照目标分区所在的leader broker进行分组;
    将分组后的batch通过底层创建的Socket连接发送给各个broker;
    等待服务器端发送response回来。
    为了说明上的方便,我还是基于图的方式来解释Sender线程的工作原理:

    Paste_Image.png

    (4)、Step 4: Sender线程处理response

    上图中Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法,如下图所示:

    Paste_Image.png
    做完这一步,producer发送消息就可以算作是100%完成了。通过这4步我们可以看到新版本producer发送事件完全是异步过程。因此在调优producer前我们就需要搞清楚性能瓶颈到底是在用户主线程还是在Sender线程
    由于KafkaProducer是线程安全的,因此在使用上有两种基本的使用方法:
    Paste_Image.png

    refer:
    http://www.cnblogs.com/huxi2b/p/6364613.html
    http://www.cnblogs.com/davidwang456/p/4182001.html

    相关文章

      网友评论

          本文标题:Kafka系列之(4)——Kafka Producer流程解析

          本文链接:https://www.haomeiwen.com/subject/kqnztxtx.html