1、根据分区申请内存块
buffer = free.allocate(size, maxTimeToBlock);
2、构造batch对象加入Dequeue
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
3、将key、value记录到buffer中
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup(); //满足条件后唤醒线程发送batch
}
4、sender线程发送批次
sendProduceRequests(batches, now);
MemoryRecords records = batch.records(); //此处将buffer中的record读取出来
client.send(clientRequest, now);
网友评论