一.整体流程
kafka生产者整体架构 (1).png步骤:
1.ProducerInterceptors对消息进行拦截。
2.Serializer对消息的key和value进行序列化。
3.Partitioner为消息选择合适的Partition。
4.RecordAccumulator收集消息,实现批量发送。
5.Sender从RecordAccumulator获取消息。
6.构造ClientRequest。
7.将ClientRequest交给NetworkClient,准备发送。
8.NetworkClient将请求送入KafkaChannel的缓存。
9.执行网络I/O,发送请求。
10.收到响应,调用ClientRequest的回调函数。
11.调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。
消息发送过程中,涉及两个线程协同工作。主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也是主线程和sender线程共享的缓冲区)中暂存。Sender线程负责将消息信息构成请求,最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去。
二.主线程:
1.Metadata更新流程:
MetaData更新流程.jpg第一个阶段
在KafkaProducer的构造方法里工作,主要目的是初始化MetaData对象,并把MetaData对象传到Sender类里。
- 初始化MetaData:
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
retryBackoffMs:避让时间。
ProducerConfig.METADATA_MAX_AGE_CONFIG:即使没有请求要求更新MetaData,超出一定时间也要更新,默认5min。
2.第一次调用this.metadata.update方法:
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
把用来启动的server node节点给metadata对象内的Cluster属性。
3.实例化NetworkClient和Sender,并把metadata作为构造方法的参数传进去。
4.启动执行Sender任务的线程。
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
第二阶段
send()主线程发起Metadata的更新。
1.KafkaProducer调用send方法,会调用waitOnMetadata(record.topic(), this.maxBlockTimeMs)方法。
/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The amount of time we waited in ms
*/
private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already.
if (!this.metadata.containsTopic(topic))
this.metadata.add(topic);
if (metadata.fetch().partitionsForTopic(topic) != null)
return 0;
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (metadata.fetch().unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
}
return time.milliseconds() - begin;
}
2.首先请求metadata.requestUpdate(),修改metadata的属性needUpdate 为 true,同时返回metadata的版本号。
3.调用metadata.awaitUpdate(version, remainingWaitMs);等待Sender线程调用metadata.update()更新来释放主线程。
/**
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
第三阶段
Sender线程负责把更新MetaData的request发给Server端。
1.Sender线程run()方法轮询调用NetworkClient.poll()。
2.调用maybeUpdate(long now)。用来判断更新时间是否到了,更新时间到了才会发出更新,而且找到负载最小的 node。
3.调用maybeUpdate(now, node)。构造ClientRequest对象发送给doSend(clientRequest,now)。
...
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);//缓存请求,下次poll()操作会将其发送出去
...
4.request加入his.inFlightRequests.add(request);
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
5.调用selector.send(request.request()):
/**
* Queue the given request for sending in the subsequent {@link #poll(long)} calls
* @param send The request to send
*/
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
5.调用selector.send(request.request()):赋值给selector的send属性,同时关注OP_WRITE事件:
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;//设置send字段
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);//关注OP_WRITE事件
}
第四阶段:
处理server端返回用来更新metaData的数据。
1.NetworkClient.poll()调用selector.poll()轮询到了读事件。
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;//OP_READ事件处理。
while ((networkReceive = channel.read()) != null)
/*
上面channel.read()读取到一个完整的 NetworkReceive,则将其添加到stagedReceives中保存,若读取不到一个完整的则将其添加到stagedReceives,则返回null,下次处理OP_READ事件时,继续读取,直到读到一个完整的NetworkReceive。
*/
addToStagedReceives(channel, networkReceive);
}
2.调用addToStagedReceives方法,把读的消息放到stagedReceives里对应channel的ArrayDeque。
/**
* adds a receive to staged receives
*/
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
- selector.poll()后,NetworkClient执行handleCompletedReceives()方法:
/**
* 从completedReceives得到从broker的返回值 NetworkReceive,然后找到inFlightRequests对应的ClientRequest,
* 构造一个NetworkReceive和ClientRequest 为参数的ClientResponse,并加入到responses里
* Handle any completed receives and update the response list with the responses received.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();//返回响应的NodeId
//从inFlightRequests中取出对应的ClientRequest
ClientRequest req = inFlightRequests.completeNext(source);
//解析响应
Struct body = parseResponse(receive.payload(), req.request().header());
//调用MetadataUpdater.maybeHandleCompletedReceive()方法处理
// MetadataResponse。其中会更新Metadata中记录的集群元数据,并唤醒所有
//等待Metadata更新完成的线程。
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
//如果不是MetadataResponse,则创建ClientResponse并添加到response集合里
responses.add(new ClientResponse(req, now, false, body));
}
}
4.调用metadataUpdater.maybeHandleCompletedReceive(req, now, body)方法:判断是否是metaData的response
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
short apiKey = req.request().header().apiKey();
//检测是否为MetadataRequest请求。
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleResponse(req.request().header(), body, now);
return true;
}
return false;
}
5.如果是metaData的response,调用handleResponse(),处理返回值。
构造MetadataResponse,取出response.cluster(),调用metadata.update(cluster, now),更新了metadata。
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;//收到 MetadataResponse 了,于是修改metadataFetchInProgress=false。
//解析MetadataResponse
MetadataResponse response = new MetadataResponse(body);
//创建Cluster对象
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated。检测 MetadataResponse 里的错误码。
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);
}
}
6.调用metadata.update(Cluster cluster, long now)。
- 把this.needUpdate 置为false。
- 版本号加1,这样前面的metaData.awaitUpdate()方法内就获得了新的版本好,跳出while循环。
- 调用notifyAll():因为metaData.awaitUpdate()在while循环内调用了wait(),是为了给sender线程更新metaData获取时间的。因为metaData更新完了,再notifyAll()后,主线程就不用wait了。
/**
* Update the cluster metadata
*/
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
//1.通知Metadata上的监听器。
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
//更新cluster字段。
// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
最终,主线程获取到了最新版的metadata对象,可以获得topic的partition,然后继续往下走了。
2. RecordAccumulator流程:
RecordAccumulator.append()方法的流程
RecordAccumulator的append工作流程.jpg/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param tp The topic/partition to which this record is being sent
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
try {
// check if we have an in-progress batch
//1.查找TopicPartition对应的Deque
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {//2.对Deque加锁
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
//3.向Deque中最后一个RecordBatch追加Record
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;//4.追加成功返回
}//5.解锁
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
//6.追加失败,从BufferPool中申请新空间。
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
//7.对Deque加锁后,再次调用tryAppend()方法尝试追加Record
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
//8.追加成功,则返回,释放步骤7申请的新空间
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return appendResult;
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
//新建RecordBatch。
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
//9.在新创建的RecordBatch中追加Record,并将其添加到Batches集合中
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
//10.新创建的RecordBatch中追加到incomplete集合。
incomplete.add(batch);
//11.返回RecordAppendResult
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}//12.解锁
} finally {
appendsInProgress.decrementAndGet();
}
}
网友评论