kafka版本为0.10.1.0
大体流程
1:初始化,读取配置,配置metrics等
2:创建 RecordAccumulator 缓存器
3:获取元数据信息 Metadata
4:创建底层网络客户端 NetworkClient
5:创建数据发送线程 Sender
6:完成Producer的启动
具体代码:
private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) {
try {
log.trace("Starting the Kafka producer");
Map userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = new SystemTime();
// ... // 省略一些基本的初始化工作 // ... // records 缓存器 this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);
// 更新元数据 List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
// 创建客户端 ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
// 创建发送线程,daemon形式 // MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1时,即 guaranteeMessageOrder this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception throw new KafkaException("Failed to construct kafka producer", t);
}
}
具体发送方法
1:先从原数据中获取集群信息
2:获取 其batch处于发送就绪的 节点列表(详见下面RecordAccumulator类ready函数)
3:检查若有topic的leader未知,则强制更新一次Metadata
4:遍历第二步获取的节点集,移除连接未就绪的节点(详见下面RecordAccumulator类ready函数)
5:遍历RecordAccumulator中已有的Batch。基于第四步筛选后的节点,对每个节点,再遍历该节点下每个作为leader的topic partition,取其队列第一个Batch,若符合要求则拿出来。最终遍历结果为,返回每个节点对应的RecordBatch集合(详见下面RecordAccumulator类drain函数)
若配置中要求保证消息发送顺序(即:MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1),则将上一步中所拿取的所有topic partition加入到RecordAccumulator的muted集合中,作用为下次调用RecordAccumulator类drain函数时,不会从这些topic partition拿取Batch,以达到发往该topic partition的只有一个request处于未确认状态,即保证到达每个topic partition的消息是有序的(详见下面RecordAccumulator类drain函数)
6:遍历已经超过请求时间的Batch
7:更新metrics
8:通过第五步中的数据,封装成针对每个节点的ClientRequest
9:通过NetworkClient发送请求包
源码 - RecordAccumulator 类中 ready 函数:
函数作用为:返回处于就绪状态的节点列表,以及未知leader的topic集
一个Broker处于就绪状态 ready的条件为:
1. 其节点下至少包含一个不处于重发策略(back off)的属于leader的 topic partititon;
2. 且 这些topic partititon没有被放入muted,即没有正在发送该topic partititon中的request,防止乱序
3. 且满足以下条件中的一个即可:
Batch已满
或 Batch已经在缓冲池待了至少lingerMs时间
或 缓冲池已满和有将写入的数据被阻塞
或 缓冲池被关闭
未完待续...........................................................................................................................
网友评论