Kafka是一个消息订阅系统,通过接收消息顺序存储在本地磁盘,以便后端应用从kafka读取消息。本文基于Kafka 0.10.0版本对kafka的消息发送流程进行分析:
record的partition为非空且合法(0 =< partition <= topic.partitions.size)时,直接使用record中的partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return DefaultPartitioner.toPositive(nextValue) % numPartitions;
} else {
// hash the keyBytes to choose a partition
return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
RecordAccumulator 作为一个queue将消息累计放置在memoryRecords中,以便成批发送至server
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
return future;
KafkaProducer在初始化时会初始化并启动名为kafka-producer-network-thread的线程。线程的run方法会循环执行如下run(long now)方法:
void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 如果有partition找不到leader,则需要设置重新获取metadata的标志
if (result.unknownLeadersExist)
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
// 根据netWorkClient中维持的与各broker的链接信息,去除部分链接状态无效的readyNodes
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
// 将每个batch要发送的消息与每个ready节点对应起来
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
// 将一些长时间没有发送出去的batch,置为expire状态
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
// 为每一个node创建producerequest
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
// 发送获取metadata请求,handle各种发request送和返回的response,handle各种连接状态
this.client.poll(pollTimeout, now);
- 获取cluster信息,cluster类包含了topic,broker等信息,可以用来表示一个kafka集群
- 获取可以发送消息的Node节点(readyNode),如果有partition找不到leader,则标志需要更新metadata,如果node连接异常,则从readyNode中去除
- 找出本次需要发送消息的Node和要发送至该节点的recordBatch
- 清理“长时间”没有发送出去的recordBatch
- 创建并发送produce 请求
- 发送获取metadata请求,handle各种发request送和返回的response,handle各种连接状态
Cluster 信息
private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = false;
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
if (leader == null) {
unknownLeadersExist = true;
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
//判断是否需要backoff attempts lastAttemptMs的值是在上次发送失败后,handleResponse时更新
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.records.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
// 根据如下条件判断该节点是否有可以发送的消息
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
} else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
( full || expired || exhausted || closed || flushInProgress() ) && (!backingOff)
full : 该partition有多于一个batch或者当前batch处于full状态
expired : 已经等待的时间(当前时间 - 上次尝试时间) > 本身需要等待的时间(if(需要backoff) : retry的backoff else lingerMs)
exhausted : 有消息处在分配状态,内存还没有回收
closed : 此recordAccuulator已经被关闭
flushInProgress : 业务层调用了flush方法
过程 : 遍历所有的readyNodes,针对每个readyNode,找出该Node上面的partition,如果该partition存在queued的recordBatch,且符合发送条件(size 超过限制 且针对该Node已经有partition要发送,则跳过该batch的发送)则将该消息放入List<RecordBatch>中返回
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
// Only proceed if the partition has no in-flight batches.
if (!muted.contains(tp)) {
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {
synchronized (deque) {
RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
// Only drain the batch if it is not during backoff period.
if (!backoff) {
// size 超过限制 且针对该Node已经有partition要发送,则跳过该batch的发送
if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request
} else {
RecordBatch batch = deque.pollFirst();
size += batch.records.sizeInBytes();
batch.drainedMs = now;
this.drainIndex = (this.drainIndex + 1) % parts.size();
} while (start != drainIndex);
batches.put(node.id(), ready);
过程 : 遍历batches,针对每一个partition的recordBatch,判断该batch是否过期,如果过期,则清理。
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> dq = entry.getValue();
TopicPartition tp = entry.getKey();
if (!muted.contains(tp)) {
synchronized (dq) {
// iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
RecordBatch lastBatch = dq.peekLast();
Iterator<RecordBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {
RecordBatch batch = batchIterator.next();
boolean isFull = batch != lastBatch || batch.records.isFull();
// check if the batch is expired
if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
} else {
// Stop at the first batch that has not expired.
/*lastAppendTime : 创建时生成,append消息时更新*/
/*createdMs : 创建batch时生成*/
/*lastAttemptMs : 创建batch时生成,批次发送失败后,如果可以重试,则会重新设置该值*/
public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
boolean expire = false;
//首次发送,batch 已经处于full状态,now > requestTimeoutMs + lastAppendTime
if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
expire = true;
//首次发送,now > this.createdMs + requestTimeoutMs + lingerMs
else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
expire = true;
// 处于retry状态,now > this.lastAttemptMs + retryBackoffMs + requestTimeoutMs
else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
expire = true;
if (expire) {
this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch containing " + recordCount + " record(s) expired due to timeout while requesting metadata from brokers for " + topicPartition));
return expire;
private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
return requests;
for (ClientRequest request : requests)
client.send(request, now);
/*lastSuccessfulRefreshMs : 初始值为0,每次更新成功后,刷新该值*/
/*lastRefreshMs : 初始值为0,每次更新成功或失败后,都会刷新该值*/
/*refreshBackoffMs : 初始化metadata时生成,由参数metadata.max.age.ms控制*/
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
/*lastNoNodeAvailableMs : 上次调用metadataupdate方法但没有可用node节点的时间*/
/*metadataFetchInProgress : 默认false,在调用metadataupdate时设置为true,调用完毕设置为false*/
/*refreshBackoffMs : 初始化metadata时生成,由参数retry.backoff.ms控制*/
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
if (metadataTimeout == 0) {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);
maybeUpdate(now, node);
return metadataTimeout;
// Do actual reads and writes to sockect
public List<ClientResponse> poll(long timeout, long now) {
// 判断是否需要更新metadata
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
//处理各种send, response,断掉的连接,新建连接,超时请求
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
return responses;
RecordAccumulator :消息“累加器”,kafka消息发送并非每条消息发送一个请求,而是会将消息放入“累加器”中,以recordBatch的方式发送,可通过一些参数控制相关逻辑,可配置参数如下:
参数 | 功能 |
batch.size | 一条消息占用内存的最小值 |
buffer.memory | 总共可用内存空间 |
linger.ms | 判断batch expire使用,给判断 batch expire加上一个固定浮动时间 |
compression.type | 压缩类型 |
retry.backoff.ms | batch retry的时间间隔 |
NetWorkClient : 是一个对客户端与kafka集群连接的封装,管理IO连接,屏蔽了消息发送接收细节,可以通过传递一些参数来控制交互细节。
参数 | 功能 |
connections.max.idle.ms | 一个连接可以处于idle状态的最长时间 |
max.in.flight.requests.per.connection | 对于单节点每次可发送的最大batch数,为1时,可以保证消息发送的顺序性,设置为大于1的值,则可能导致消息发送不是完全顺序 |
reconnect.backoff.ms | 连接断掉重新创建的间隔 |
send.buffer.bytes | 每次发送消息buffer(SO_SNDBUF)的大小 |
receive.buffer.bytes | 每次接收消息buffer(SO_RCVBUF)的大小 |
request.timeout.ms | 发送请求后等待response的时间 |
Sender : Sender启动独立线程完成消息发送,可通过部分参数控制相关逻辑,有如下参数可配置
参数 | 功能 |
max.request.size | 每次课发送消息的最大size |
acks | 发送消息“可靠性”模式控制 |
retries | 消息发送失败后可重试的最大次数 |