Producer的发送模式
首先,我们先定义好一些配置属性来获取producer.
package com.tea.modules.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* kafka生产者
*
* @author jaymin
* @since 2022/1/3 20:53
*/
@Slf4j
public class ProducerDemo {
/**
* TOPIC名称
*/
public static final String TOPIC_NAME = "my_topic";
@NotNull
private static KafkaProducer<String, String> getKafkaProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "3");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// config
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
return kafkaProducer;
}
阻塞式发送
/**
* 异步阻塞发送
*/
public static void syncSend() {
KafkaProducer<String, String> kafkaProducer = getKafkaProducer();
try {
// ProducerRecorder
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "jay-" + i, "demo-" + i);
Future<RecordMetadata> metadataFuture = kafkaProducer.send(producerRecord);
// 发送完消息后等待回调
RecordMetadata metadata = metadataFuture.get();
System.out.println("partition:" + metadata.partition() + "|offset:" + metadata.offset());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭kafka通道
kafkaProducer.close();
}
}
异步发送
/**
* 异步发送消息
*/
public static void asyncSend() {
KafkaProducer<String, String> kafkaProducer = getKafkaProducer();
try {
// ProducerRecorder
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "jay-" + i, "demo-" + i);
kafkaProducer.send(producerRecord);
}
} finally {
// 关闭kafka通道
kafkaProducer.close();
}
}
异步回调发送
/**
* 异步回调发送
*/
public static void sendAndCallback() {
KafkaProducer<String, String> kafkaProducer = getKafkaProducer();
try {
// ProducerRecorder
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "jay-" + i, "demo-" + i);
Future<RecordMetadata> metadataFuture = kafkaProducer.send(producerRecord, ((recordMetadata, exception) -> {
log.warn("recordMetadata:{},exception:{}", recordMetadata, exception);
}));
}
} finally {
// 关闭kafka通道
kafkaProducer.close();
}
}
源码分析Producer
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = Time.SYSTEM;
String clientId = config.getString("client.id");
if (clientId.length() <= 0) {
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
}
this.clientId = clientId;
String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null;
LogContext logContext;
if (transactionalId == null) {
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
} else {
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
}
this.log = logContext.logger(KafkaProducer.class);
this.log.trace("Starting the Kafka producer");
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
reporters.add(new JmxReporter("kafka.producer"));
this.metrics = new Metrics(metricConfig, reporters, this.time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
long retryBackoffMs = config.getLong("retry.backoff.ms");
if (keySerializer == null) {
this.keySerializer = Wrapper.ensureExtended((Serializer)config.getConfiguredInstance("key.serializer", Serializer.class));
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore("key.serializer");
this.keySerializer = Wrapper.ensureExtended(keySerializer);
}
if (valueSerializer == null) {
this.valueSerializer = Wrapper.ensureExtended((Serializer)config.getConfiguredInstance("value.serializer", Serializer.class));
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore("value.serializer");
this.valueSerializer = Wrapper.ensureExtended(valueSerializer);
}
userProvidedConfigs.put("client.id", clientId);
List<ProducerInterceptor<K, V>> interceptorList = (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances("interceptor.classes", ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors(interceptorList);
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.metadata = new Metadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), true, true, clusterResourceListeners);
this.maxRequestSize = config.getInt("max.request.size");
this.totalMemorySize = config.getLong("buffer.memory");
this.compressionType = CompressionType.forName(config.getString("compression.type"));
this.maxBlockTimeMs = config.getLong("max.block.ms");
this.requestTimeoutMs = config.getInt("request.timeout.ms");
this.transactionManager = configureTransactionState(config, logContext, this.log);
int retries = configureRetries(config, this.transactionManager != null, this.log);
int maxInflightRequests = configureInflightRequests(config, this.transactionManager != null);
short acks = configureAcks(config, this.transactionManager != null, this.log);
this.apiVersions = new ApiVersions();
this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.totalMemorySize, this.compressionType, config.getLong("linger.ms"), retryBackoffMs, this.metrics, this.time, this.apiVersions, this.transactionManager);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"));
this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), this.time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
NetworkClient client = new NetworkClient(new Selector(config.getLong("connections.max.idle.ms"), this.metrics, this.time, "producer", channelBuilder, logContext), this.metadata, clientId, maxInflightRequests, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.max.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), this.requestTimeoutMs, this.time, true, this.apiVersions, throttleTimeSensor, logContext);
this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt("max.request.size"), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong("retry.backoff.ms"), this.transactionManager, this.apiVersions);
String ioThreadName = "kafka-producer-network-thread | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo("kafka.producer", clientId, this.metrics);
this.log.debug("Kafka producer started");
} catch (Throwable var24) {
this.close(0L, TimeUnit.MILLISECONDS, true);
throw new KafkaException("Failed to construct kafka producer", var24);
}
}
- 初始化metricConfig,用于监控kafka.
- 初始化partitioner,即负载均衡策略器.
- 初始化序列化器.
- 初始化
RecordAccumulator
,消息计数器.- 初始化Send对象,并且开启守护线程启动producer.(注意,new KafkaThread的时候daemon设置为了true,即守护线程,也意味着producer本身是线程安全的).
源码分析send
- org.apache.kafka.clients.producer.KafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback)
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
- org.apache.kafka.clients.producer.KafkaProducer#doSend
TopicPartition tp = null;
try {
KafkaProducer.ClusterAndWaitTime clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
// 序列化key
serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException var18) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var18);
}
byte[] serializedValue;
try {
// 序列化value
serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException var17) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var17);
}
// 计算分区,决定消息最终进入哪个partition
int partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
this.setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
this.ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
this.log.trace("Sending record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
// 回调
Callback interceptCallback = this.interceptors == null ? callback : new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
if (this.transactionManager != null && this.transactionManager.isTransactional()) {
this.transactionManager.maybeAddPartitionToTransaction(tp);
}
// 计算批次
RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, (Callback)interceptCallback, remainingWaitMs);
// 如果append达到了配置的阈值,进行消息批量发送
if (result.batchIsFull || result.newBatchCreated) {
this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
- 使用序列化器序列化消息.
- 计算消息所在的分区
- 计算批次,具体形式
this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, (Callback)interceptCallback, remainingWaitMs);
. - 如果append达到了配置的阈值,进行消息批量发送
Kafka为什么快的原因之一
kafka并不是逐一发送消息,它支持消息的批量发送,通过
linger.ms
(间隔多久发送)和batch.size
(堆积多少消息发送一次)去配置. 这样不仅仅可以大幅减少IO的操作(比如数据库for循环调用insert和一次性批量插入大量数据),同时由于kafka的日志是(append-only)模式,消息在发送的时候已经进行了排序.
查阅官网,你也可以看到这样一段解释:
Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.
批处理是效率的重要驱动因素之一,为了启用批处理,Kafka 生产者将尝试在内存中积累数据并在单个请求中发送更大的批次。批处理可以配置为累积不超过固定数量的消息,等待时间不超过某个固定的延迟限制(比如 64k 或 10 毫秒)。这允许累积更多要发送的字节,并且在服务器上很少有较大的 I/O 操作。这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。
如何保障生产者发送的消息不丢失?
- 配置acks=all,即让leader等待所有的follower写入日志后,producer才认为消息发送成功.
- 使用带有回调的发送模式.
- 配置重试次数,也就是retries.
- 设置
unclean.leader.election.enable=false
,不允许broker参与竞选分区的leader. 5. 冗余,配置min.replication.factor
(>=3),多备份数据. -
min.insync.replicas>1
.
min.replication.factor = min.insync.replicas + 1;
网友评论