kafka produce流程

* Send the given record asynchronously and return a future which will eventually contain the response information.
* @param record The record to send
* @return A future which will eventually contain the response information
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
* Send a record and invoke the given callback when the record has been acknowledged by the server
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
public class ProducerInterceptorDemo implements ProducerInterceptor<Integer,String>{
public ProducerRecord<Integer,String> onSend(ProducerRecord<Integer,String> record){
if(record.key() % 2 == 0) return record;
return null;
public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
private final long refreshBackoffMs;//metadata 更新失败时,为避免频繁更新 meta,最小的间隔时间,默认 100ms
private final long metadataExpireMs;//metadata 的过期时间, 默认 60*60*1000ms
private int version;//每更新成功1次,version自增1,主要是用于判断 metadata 是否更新
private long lastRefreshMs;//最近一次更新时的时间(包含更新失败的情况)
private long lastSuccessfulRefreshMs;//最近一次成功更新的时间(如果每次都成功的话,与前面的值相等, 否则,lastSuccessulRefreshMs < lastRefreshMs)
private Cluster cluster;// 集群中一些 topic 的信息
private boolean needUpdate;// 是否需要更新 metadata
private final Set<String> topics;
private final List<Listener> listeners;// 事件监控者
private boolean needMetadataForAllTopics;// 是否强制更新所有的 metadata
public final class Cluster {
private final boolean isBootstrapConfigured;
private final List<Node> nodes;//node列表
private final Set<String> unauthorizedTopics;//未认证的topic列表
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;// TopicPartition与PartitionInfo的映射关系
private final Map<String, List<PartitionInfo>> partitionsByTopic;// topic 与 partition 的对应关系
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;// 可用(leader 不为 null)的 topic 与 partition 的对应关系
private final Map<Integer, List<PartitionInfo>> partitionsByNode;// node 与 partition 的对应关系
private final Map<Integer, Node> nodesById;// node 与brokerid 的对应关系
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The amount of time we waited in ms
private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already.
if (!this.metadata.containsTopic(topic))
if (metadata.fetch().partitionsForTopic(topic) != null)
return 0;
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (metadata.fetch().unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
return time.milliseconds() - begin;
先把topic添加到metadata topic list下如果不存在的话,然后去fetch metadata下的cluster信息,获取该topic对应的partition信息,如果能获取到就认为一切都是ready状态。
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {//版本号判断是否更新
if (remainingWaitMs != 0)
wait(remainingWaitMs);//阻塞线程,等待 metadata 的更新
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)//timeout
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
之后获取partition值,调用partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster),如果record指定了partition,就直接返回指定值,如果没有指定,如下:
if (availablePartitions.size() > 0) {
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return DefaultPartitioner.toPositive(nextValue) % numPartitions;
} else {
// hash the keyBytes to choose a partition
return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
try {
// check if we have an in-progress batch
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null)
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
} finally {
public class MemoryRecords implements Records {
private final Compressor compressor;
// 记录buffer最多可以写入多少字节
private final int writeLimit;
// buffer字节总容量
private final int initialCapacity;
// 用于保存消息数据的Java NIO ByteBuffer
private ByteBuffer buffer;
// indicate if the memory records is writable or not
// 发送前设置成只读模式
private boolean writable;
* Append the record to the current record set and return the relative offset within that record set
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
return future;
public final class BufferPool {
private final long totalMemory;
private final ReentrantLock lock;
private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
//可用的空间大小,totalMemory - free中所有ByteBuffer的大小
private long availableMemory;
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
throw e;
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {//先分配一部分的空间,并继续等待空闲空间
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
public final class RecordAccumulator {
private volatile boolean closed;
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final long lingerMs;
private final long retryBackoffMs;
private final BufferPool free;
private final Time time;
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final IncompleteRecordBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
private int drainIndex;
就先看到这里,下次继续Kafka Producer,讲讲Sender线程等等