Producer负责向kafka发送消息。Producer客户端如下:
public class KafkaProducer{
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static void main(String[] args) {
Properties properties = new Properties();
//key.serializer和value.serializer指定key和value序列化操作的序列化器。
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//bootstrap.servers: kafka集群的broker地址
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer =
new KafkaProducer<>(properties);
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "hello, Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
kafka producer使用方法
1.发送消息
发送消息有三种模式:
- 1.发后即忘(fire-and-forget):性能高,可靠性差
- 2.同步(sync):producer.send(record).get();
- 3.异步(Async):指定callback回调函数
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static Properties initConfig() {
Properties props = new Properties();
//bootstrap.servers: kafka集群的broker地址
props.put("bootstrap.servers", brokerList);
//key.serializer和value.serializer指定key和value序列化操作的序列化器。
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static void main(String[] args) throws InterruptedException {
Properties props = initConfig();
//创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//构建消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
//1.发后即忘(fire-and-forget)
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
//2.同步(sync)
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
//3.异步(Async)
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + ":" + metadata.offset());
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.序列化
生产者需要使用序列化器(Serializer)把对象转换成字节数组才能通过网络发送至kafka,而在对侧,消费者需要使用反序列化器(Deserializer)把kafka中收到的字节数组转换成相应的对象。
3.分区器
分区器的作用是消息分配分区。消息经过序列化之后,需要确定它发往的分区,如果指定了分区partition字段,就不需要分区器的作用了,如果没有指定,就需要根据key字段计算partition值。
kafka提供默认的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner
实现了接口org.apache.kafka.clients.producer.Partitioner
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
}
public class DefaultPartitioner implements Partitioner {
/**
* 计算分区
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
partition的计算方式:
- 如果key为null,则按照一种轮询的方式来计算分区分配
- 如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。
4.生产者拦截器
kafka在消息序列化和计算分区之前调用生产者拦截器onSend()方法对消息进行定制化操作。 主要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
public interface ProducerInterceptor<K, V> extends Configurable {
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
}
Producer客户端原理分析
整个生产者客户端由两个线程协调,这两个线程是主线程和Sender线程(发送线程)。
- 主线程:由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后,缓存到消息累加器(RecordAccumulator)中。以便Sender线程可以批量发送,减少网络资源消耗以提升性能。
- Sender线程:负责从消息累加器中获取消息并将其发送到kafka中。
kafka_producer.png
- RecordAccumulator消息收集器:其内部为每个分区都维护一个双端队列,队列的内容是消息批次ProducerBatch(包含一至多个ProducerRecord),即Deque<ProducerBatch>,消息写入缓存时,追加到双端队列尾部,Sender线程从双端队列头部读取消息。
消息在网络上都是以字节(Byte)形式传输的,发送之前需从创建内存区域保存消息,在RecordAccumulator内部有BufferPool,主要实现ByteBuffer复用,以实现缓存的高效利用。其只针对特定大小的ByteBuffer进行管理。ProducerBatch 的大小和batch . size 参数也有着密切的关系。当一条消息( ProducerRecord )流入RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch (如果没有则新建),查看ProducerBatch 中是否还可以写入这个ProducerRecord ,如果可以则写入,如果不可以则需要创建一个新的Producer Batch 。在新建ProducerBatch 时评估这条消息的大小是否超过batch . size 参数的大小,如果不超过,那么就以batch . size 参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch , 这段内存区域不会被复用。
Sender线程从RecordAccumulator获取缓存消息之后,原本<分区,Deque<ProducerBatch>>的形式转变成<Node,List<ProducerBatch>>的形式,其中Node 表示Kafka集群的broker 节点。在转换成<Node, List<ProducerB atch>>的形式之后, Sender 还会进一步封装成
<Node,Request>的形式,这样就可以将Request 请求发往各个Node了, 这里的Request 是指Kafka 的各种协议请求,对于消息发送而言就是指具体的ProduceRequest.
请求在从Sender 线程发往Kafka 之前还会保存到InF lightRequests 中, InFlightRequests 保存对象的具体形式为Map<Nodeld, Deque<R巳quest>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求.
生产者参数
1.必选属性有3个:
- bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查询其他broker的信息。不过最少提供2个broker的信息,一旦其中一个宕机,生产者仍能连接到集群上。
- key.serializer:生产者接口允许使用参数化类型,可以把Java对象作为键和值传broker,但是broker希望收到的消息的键和值都是字节数组,所以,必须提供将对象序列化成字节数组的序列化器。key.serializer必须设置为实现org.apache.kafka.common.serialization.Serializer的接口类,默认为org.apache.kafka.common.serialization.StringSerializer,也可以实现自定义的序列化器。
- value.serializer:同上。
2.可选参数:
-
acks:指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
- acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
- acks=1:默认值即为1 。生产者发送消息之后,只要分区的leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader 副本,比如在leader 副本崩溃、重新选举新的leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader 副本并返回成功响应给生产者,且在被其他follower 副本拉取之前leader 副本崩溃,那么此时消息还是会丢失,因为新选举的leader 副本中并没有这条对应的消息。acks 设置为1 ,是消息可靠性和吞吐量之间的折中方案。
- acks=-1或all:生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1 (all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR 中可能只有leader 副本,这样就退化成了acks=1的情况。
-
buffer.memory:设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
-
max.block.ms:指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。
-
batch.size:当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。
-
retries:指定生产者可以重发消息的次数。
-
receive.buffer.bytes和send.buffer.bytes:指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
-
linger.ms:指定了生产者在发送批次前等待更多消息加入批次的时间。
网友评论