原创辛苦,转载请注明出处
接着上文,继续看kafka producer
RecordsAccumulator.ready()
上一节中在doSend()方法的最后如果满足条件,会去唤醒Sender线程,在Sender线程的run方法中,会去调用 RecordsAccumulator.ready()来获取集群中符合发送消息条件的节点集合
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
//记录向哪些Node发送过消息
Set<Node> readyNodes = new HashSet<Node>();
//记录下次需要调用ready()方法的时间间隔
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = false;
//是否有线程在阻塞等待BufferPool释放空间
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
//找到当前分区的Leader副本所在的Node
Node leader = cluster.leaderFor(part);
if (leader == null) {
//没有Leader副本则标记为true,等待metadata的更新
unknownLeadersExist = true;
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
//dq中有多个RecordBatch或者不止一个RecordBatch
boolean full = deque.size() > 1 || batch.records.isFull();
//是否超时了
boolean expired = waitedTimeMs >= timeToWaitMs;
//closed:Sender线程准备关闭
boolean sendable = full || expired || exhausted || closed || flushInProgress();//是否有线程正在等待flush操作完成
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
//记录下次需要调用ready()方法的检查时间间隔
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}
在调用ready方法得到ReadyCheckResult,里面包含了readyNodes和metaData需要更新的标志以及下次调用ready的时机
RecordsAccumulator.drain()
drain方法会根据之前readyNodes取出要发送的消息,组合成Map<Integer, List<RecordBatch>>的格式,因为Sender只关心向哪个节点发送数据
Sender
KafkaProducer.send()主要是将消息放入RecordAccumulator缓存中,而网络I/O操作由Sender线程完成
Sender实现了Runnable接口,那就来看看他的run方法
void run(long now) {
//从metaData获取集群元数据
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);
}
1、这里要注意的是,会对accumulator.ready()产生的readyNodes再次进行一次过滤,NetworkClient.ready(),确认节点可连接,可发送消息。
2、确认完后再调用accumulator的drain方法,所有确认待发送的part都mute掉,不再接受新的发送请求。
3、对accumulator中超时的消息,用abortExpiredBatches处理(遍历所有batch,调用maybeExpire确认后调用batch.done(),触发thunk.callback并释放资源)
4、createProduceRequests将消息封装成ClientRequest,然后NetworkClient.send()将ClientRequest写入KafkaChannel的send字段
5、最后由NetworkClient.poll()将请求发出并处理response和用户自定义的callback
Selector
一个单独的线程,管理多条网络连接上的连接、读、写等操作,使用NIO异步非阻塞模式实现网络I/O
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
//创建SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//配置成非阻塞模式
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
//设置为长连接
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
boolean connected;
try {
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
//将socketChannel注册到nioSelector上,并关注OP_CONNECT事件
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
//创建KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
//将kafkaChannel注册到key上
key.attach(channel);
this.channels.put(id, channel);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
poll()
真正执行网络I/O的是Selector.poll()方法
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
//将上一次poll方法的结果都清掉
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
//nioselector.select(),等待I/O事件发生
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//处理I/O
pollSelectionKeys(this.nioSelector.selectedKeys(), false);
pollSelectionKeys(immediatelyConnectedKeys, true);
}
//将stagedReceives复制到completeReceives集合中
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
//关闭长期空闲的连接
maybeCloseOldestConnection();
}
这其中的核心方法一目了然,就是pollSelectionKeys这个方法,它处理connect、read、write事件
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
//获取kafkaChannel
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
//finishConnect检测CONNECT事件,如果已完成,则取消关注CONNECT,而关注READ
if (channel.finishConnect()) {
//添加到已连接集合中
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
//连接未完成,则跳过
continue;
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
//read读取到一个完整的networkReceive,则添加到stageReceives中去,若取不到完整的,则返回null,直到读到一个完整的为止
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
//write将send字段发送出去,如果未完成发送,则返回null,如果发送完成,则返回send,并添加到completedSends里去
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
这里的核心就是read、write还有send方法了,这些操作都在KafkaChannel里进行
InFlightRequests
主要作用是缓存了已经发出去但没收到响应的ClientRequest集合
DefaultMetadataUpdater
在poll消息发送的第一步,首先要确认metadata是否需要更新
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
// 这里会检测Metadata类的needUpdate的值,计算得出下次更新集群元数据的时间
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
if (metadataTimeout == 0) {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);
maybeUpdate(now, node);
}
return metadataTimeout;
}
首先check Metadata.needUpdate字段,默认5分钟更新一次。需要更新时,发送MetadataRequest,通过leastLoadedNode找到负载最小的点,即根据InFlightRequests队列长度来决定负载大小(当发现没有可用节点的时候,就用lastNoNodeAvailableMs记录下当前时间戳)
在接收到response后,会调用MetaUpdater.maybeHandlerCompleteReceive方法检测有有效性,有效响应通过handleResponse来处理,创建新的Cluster并覆盖原有的Cluster数据
NetworkClient
通用的网络客户端实现,包括生产者发送消息,还有消费者消费消息以及服务端Broker之间的通信
ready()
判断ready的条件:
1、check node的host和port有效
2、确保metaData不是正在更新或立即需要更新,并且该节点可以接受消息
3、connectionStates管理的该节点的状态是可连接的
满足条件会调用initiateConnect()建立新连接
send()
NetworkClient.send()做的事就是将request添加到inFlightRequests,并且将请求设置到KafkaChannel.send字段上
poll()
poll方法中调用Selector.poll进行网络I/O,并用handle方法对产生的数据和队列进行处理。
将产生的全部ClientResponse收集到responses列表中,然后遍历responses列表,调用每个ClientRequest中记录的回调,如果异常则请求重发。
/**
* Handle a produce response
*/
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
int correlationId = response.request().request().header().correlationId();
if (response.wasDisconnected()) {
log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
.request()
.destination());
for (RecordBatch batch : batches.values())
completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
} else {
log.trace("Received produce response from node {} with correlation id {}",
response.request().request().destination(),
correlationId);
// if we have a response, parse it
if (response.hasResponse()) {
ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
Errors error = Errors.forCode(partResp.errorCode);
RecordBatch batch = batches.get(tp);
completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
}
this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
this.sensors.recordThrottleTime(response.request().request().destination(),
produceResponse.getThrottleTime());
} else {
// this is the acks = 0 case, just complete all requests
for (RecordBatch batch : batches.values())
completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now);
}
}
}
因断开连接而产生的ClientResponse,会用canRetry判断并进行重试,若不能重试,则遍历RecordBatch每个消息的CallBack函数,并把RecordBatch的状态设置成异常完成并释放ByteBuffer
client producer差不多就介绍到这里
一些觉得可以Mark的点
1、在KafkaProducer初始化过程里的getConfiguredInstance,主要功能是通过统一反射机制实例化originals字段中指定的类
public <T> T getConfiguredInstance(String key, Class<T> t) {
Class<?> c = getClass(key);
if (c == null)
return null;
Object o = Utils.newInstance(c);
if (!t.isInstance(o))
throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
if (o instanceof Configurable)
((Configurable) o).configure(this.originals);
return t.cast(o);
}
2、Compress类中初始化GZIP直接new,而Snappy则用反射方式来初始化
private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
@Override
public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
return Class.forName("org.xerial.snappy.SnappyOutputStream")
.getConstructor(OutputStream.class, Integer.TYPE);
}
});
主要为了在不使用Snappy时避免引入额外的依赖包
3、在RecordAccumulator.tryAppend()方法中,先加synchronized锁然后再重试,而不是放在一个完整的synchronized代码块中。
——主要是因为BufferPool分配新的ByteBuffer的时候可能会阻塞,当消息1较大等待分配空间时,如果一直持有锁的话,消息2如果较小也需要一起等待,造成不必要的阻塞,所以这里将synchronized代码块拆开。
网友评论