[TOC]
前言
应用在不同的场景下会发送不同主题(Topic)的消息到Kafka服务上(Broker),多种场景也就意味着对消息的发送有不同的要求,比如业界常见的那些灵魂拷问: 消息是否一定不能丢失、消息是否支持重复消费、消息是否有严格的延迟要求等等。而这篇文章主要先学习生产者是如何将一条消息投递到Broker上这一流程。
1. Kafka Producer的创建
1.1 Producer 必填参数
要发送一条消息,首先我们需要一个Kafka的客户端,通过Kafka客户端api进行消息的发送,这个客户端就是 org.apache.kafka.clients.producer.KafkaProducer
,要创建生产者对象,有3个属性是必填的
- bootstrap.servers 这个属性设置的是Broker的地址列表,其实不需要把所有的Broker地址都填写上去,因为kafka的broker在集群内是互相感知的,但是一般为了避免宕机的风险都至少设置2个broker的地址。
- key.serializer 见3的解释
- value.serializer 这个属性见名知意,生产者发送消息其实发送的都是字节,将对象按照指定的序列化协议进行序列化之后进行网络传输发送到broker上面。
例子
// Properties 对象构建
private Properties kafkaProps = new Properties();
// 设置Broker地址
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
// 设置kv的序列化类型为 StringSerializer
kafkaProps.put("key.serializer","org.apache.kafka.coMMon.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// 创建 producer 客户端并且将config属性赋值进去
Producer= new KafkaProducer <Strting,String>(kafkaProps);
1.2 Producer 非必填参数
对于一些非必填但重要的参数,在kafka中都有着合理默认值,使用的时候如果有对性能、可靠性、等等有额外的要求,则可以根据自己的需求去灵活的调整它们,这里介绍几个关键的参数
- acks : acks表示有多少分区副本收到消息才认为本次消息是发送成功的(实际上是持久化到分区中成功),这个参数对于消息的丢失的可能性有影响的,如果设置为0,表示不需要知道是否投递成功,也就是生产者不需要等待服务器的回应这个有好处也有坏处,好处是最大限度的提升了吞吐量,坏处是可能会丢失消息。
如果acks设置为all,这种配置是最安全的,能保证消息一定不会丢失,就算服务器发生故障,但是消息其实还在的。坏处是生产者要等待所有的副本写入完成之后才认为是发生成功了的。
如果acks设置为1,表示只要集群中topic下的主分区成功写入了就认为消息发送成功了,如果主分区节点有问题(主分区节点崩溃,新的主分区节点还在选举的时候)导致消息没有成功递达主分区节点,生产者就会收到一个错误响应,为了避免消息丢失,生产者会进行重试操作。这个时候吞吐量取决于消息的发送方式是同步的还是异步的,如果是同步,显然会增加延迟。如果是异步使用回调的话延迟问题会得以减轻。 - buffer.memory: 用来设置生产者内存缓冲区的大小,生产者用它来缓冲即将投递到 broker上的消息,下面会具体介绍这个生产者内存缓冲区
- compression.type: 消息的压缩方式,为了减少网络IO和磁盘存储的开销,通常在一些会涉及网络传输的中间件中会对传输报文进行压缩,虽然这会带来cpu的负担。这个属性就是设置压缩算法的选择的。通常在生产者压缩、broker也会进行解压再压缩(满足不同版本不同场景的需求,也可以设置为和生产者保持一致)、消费者解压消息进行消费。
- retries 发送消息的重试次数,当消息发送失败的时候,在某些情况下重试可能会解决的,比如连接问题这种。当重试还是失败的话就会抛出异常代表消息未投递成功。
- batch.size 批次,当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里(划分了的缓冲区)。 该参数指定了一个批次可以使用的内存大小。
- max.request.size:代表消息能发送的内容的最大值,,假设这个值为IMB,那么可以发送的单个最大消息为1MB。另外,broker 对可接收的消息最大值也有自己的限制(nessage.nax.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被broker拒绝。
- linger.ms : 消息发送线程发现批次中的消息达到了这个值会立刻将该批次中的消息投递,如果设置为0代表每次都会直接投递到broker上
1.3 附: Producer 的主要API
public interface Producer<K, V> extends Closeable {
/**
* 初始化事务的,如果需要使用事务方法,这个方法会先被调用
*/
void initTransactions();
/**
* 开启一个事务
*/
void beginTransaction() throws ProducerFencedException;
/**
* 向消费组提交当前事务中的消息偏移量
*/
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
/**
* 提交一个事务
*/
void commitTransaction() throws ProducerFencedException;
/**
* 回滚事务
*/
void abortTransaction() throws ProducerFencedException;
/**
* 同步发送消息,生产者使用get()获取发送的结果
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
* 异步发送消息,通过回调得到发送的结果
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
/**
* 忽略所有条件将缓冲区中批次里的消息发送到broker
*/
public void flush();
/**
* 获取topic的分区信息
*/
public List<PartitionInfo> partitionsFor(String topic);
/**
* 获取由生产者收集到的统计的相关信息
*/
public Map<MetricName, ? extends Metric> metrics();
/**
* 关闭生产者
*/
public void close();
/**
* 带定时的关闭生产者
*/
public void close(long timeout, TimeUnit unit);
}
2. 生产者发送消息的方式
Producer的send() 方法用于发送消息,参数 ProducerRecord 封装了消息的内容、topic、分区、key、value等信息。如果发生成功 返回的RecordMetadata 中记录了消息的偏移量,如果发送失败就会重试或者抛出异常。
1. 同步发送
如下代码:我们使用send()方法发送消息, 它会返回一个Future 对象,调用 get() 方法进行等待,就可以知道悄息是否发送成功。这种方式适合我们不需要关注发送的结果的情况,比如上报日志这种,如果服务器返回错误,get()方法会抛出异常。如果服务器返回正常,我们会得到一个 RecordMetadata 对象,这里记录了消息信息中重要的角色:偏移量。Kafka发送消息有2类错误,有一种会重试,比如连接错误、未找到主分区错误,多重试下就可能会被解决的。还有一种是不会重试的,比如消息体过大等,这种重试也不会得到解决的错误在第一次错误的时候就异常返回了。
Future<RecordMetadata> send(ProducerRecord<K, V> record);
2. 异步发送
假设发送消息的网络开销时间是 30ms,如果每次发送都需要等待回应的话,那么发送100条消息在同步发送里,get() 等待的时间至少是3s。这对很多场景下其实是不能接受的,比如上报日志这种,我并不需要立刻知道我本次上报的结果,而且我应用程序可能每秒要上报 1W+ 日志,这会导致应用线程大量的都在等待,浪费资源。所以就需要异步的方式
我们调用send()方法,并指定一个回调方法, 服务器在返回响应时回调该方法。这个回调方法需要一个实现了 org.apache.kafka.clients.producer.Callback
接口的类,这个接口只有一个 onCompletion
方法。如果Kafka返回一个错误,onCompletion方法会拋出一一个非空(non null)异常。我们可以在收到这个回调的时候进行一些业务或者系统上的处理。
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
3. 发送消息的过程
在发送消息时,ProducerRecord 包含了消息的topic,partition,等属性,消息首先会被序列化,继而根据前面负载均衡所提到的,分区器的负载均衡机制会最终决定消息存放到哪个broker上面去。当消息确定了要放在哪台Broker上的分区之后,他们会被放在一个个的批次里(批次可以理解为缓冲区),然后当批次中的消息数达到了 linger.ms的值则会由独立的线程按照批次向指定的Broker进行投递。如果投递成功,则会返回 RecordMetadata 对象,如果投递失败可根据是否重试以及重试次数继续投递重试,如果重试一定次数之后还是失败就返回异常表示发送消息失败。如下图所示的整个流程
producer发送消息
总结
本文首先介绍了初始化 Kafka Producer的参数配置,创建了一个Producer生产者的对象,继而对消息的发送方式及消息发送的流程进行大致的介绍。
期待后续精彩~
网友评论