1、Producer API
1.1 消息发送流程
kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。
KafkaProducer发送消息流程.png
相关参数:
batch.size:只有数据积累到batch.size之后,sender才会发送数据。
linger.ms:如果数据迟迟未i达到batch.size,sender等待linger.time之后就会发送数据。
1.2 异步发送API
1)导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
2)编写代码
需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据;
ProducerConfig:获取所需要的一系列配置参数;
ProducerRecord:每条数据都要封装成一个ProducerRecord对象;
1、不带回调函数的API
package com.justops.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
// 创建kafka生产者配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.7.11:9092,10.4.7.12:9092,10.4.7.21:9092");
// properties.put("bootstrap.servers", "10.4.7.11:9092,10.4.7.12:9092,10.4.7.21:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// properties.put("acks", "all");
properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// 发送消息
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("first", "justops--" + i));
}
// 关闭资源(会做资源回收)
producer.close();
}
}
2、带回调函数的API
package com.justops.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CallbackProducer {
public static void main(String[] args) {
// 创建kafka生产者配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.7.11:9092,10.4.7.12:9092,10.4.7.21:9092");
// properties.put("bootstrap.servers", "10.4.7.11:9092,10.4.7.12:9092,10.4.7.21:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// properties.put("acks", "all");
properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// 发送消息
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("first", 0, "justops", "justops--" + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + "--" + metadata.offset());
}
}
});
}
// 关闭资源(会做资源回收)
producer.close();
}
}
3、自定义分区API
a、实现Partitioner
接口
package com.justops.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
// 自定义分区
public class MyPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int ret;
if (key == null) {
ret = 1;
} else {
Integer integer = cluster.partitionCountForTopic(topic);
ret = key.toString().hashCode() % integer;
}
return ret;
}
public void close() {
}
public void configure(Map<String, ?> configs) {
}
}
b、在properties中配置
// 使用自定义的MyPartitioner
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.justops.partitioner.MyPartitioner");
4、同步发送API(比较少用)
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需要调用Future的get方法即可。
// 发送消息
for (int i = 0; i < 100*10000; i++) {
Future<RecordMetadata> future= producer.send(new ProducerRecord<String, String>("first", "justops","justops--" + i*10));
try {
RecordMetadata recordMetadata = future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 或者
// 发送消息
for (int i = 0; i < 100*10000; i++) {
producer.send(new ProducerRecord<String, String>("first", "justops","justops--" + i*10)).get();
}
Consumer API
自动提交offset
1)导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
2)编写代码
需要用到的类:
KafkaConsumer:需要创建一个消费者对象,来消费数据
ConsumerConfig:获取所需亚奥的一系列参数配置
ConsumerRecord:每条数据都要封装成一个ConsumerRecord对象
1、简单消费者自动提交
package com.justops.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.7.11:9092,10.4.7.12:9092,10.4.7.21:9092");
// 开启自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交延时
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata");
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
consumer.subscribe(Arrays.asList("first", "second"));
// 获取数据
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
}
}
}
}
如果需要重新消费(from beginning)
要修改两个参数,否则只能从最新开始消费
// 修改消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata2"); // 重置消费者的offset properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
为了使使用者能够专注于自己的业务逻辑,kafka提供了自动提交offset的功能。如果不开启自动提交,手动没有提交的话,消费者进程只有在自己的内存中存储offset,重启之后会重新消费进程初始的消息,提交之后能将offset保存至kafka。
自动提交相关参数:
enable.auto.commit:是否开启自动提交offset功能
auto.commit.interval.ms:自动提交offset的时间间隔
手动提交offset
虽然自动提交offset十分简单便利,但是由于是基于时间提交的,开发人员难以把握offset提交的时机。因此kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都将本次poll的一批数据最高的offset提交;不同点是,commitSync会阻塞当前线程,一直到提交成功,并且会失败重试;而commitAsync则没有失败重试机制,故有可能提交失败。
1、同步提交offset
由于同步提交offset有失败重试机制,故更加可靠。
package com.justops.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomConsumer {
private static Map<TopicPartition, Long> currentOffset = new HashMap<TopicPartition, Long>();
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.7.11:9092,10.4.7.12:9092,10.4.7.21:9092");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 创建消费者
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("first", "second"));
while (true) {
// 消费者拉取数据
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
}
// 同步提交,当前线程会阻塞直到offset提交成功
consumer.commitSync();
}
}
}
2、异步提交offset
虽然同步提交offset更可靠些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交offset的方式。其代码差异如下:
// 异步提交,可以选有回调函数
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.printf("Commit failed for" + offsets);
}
}
});
自定义存储offset
kafka0.9版本之前,offset存储在zookeeper,0.9版本之后,默认将offset存储在kafka的一个内置topic中。除此之外,kafka还可以选择自定义存储offset。
offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalance。
当有新的消费者加入消费者组,已有的消费者退出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。
消费者发生Rebalance之后,每个消费者的分区就会发生变化。因为消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。
要实现自定义存储offset,需要借助ConsumerRebalanceListener。其中提交和获取offset的方法,需要根据所选的offset存储自己实现。
package com.justops.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomConsumer {
private static Map<TopicPartition, Long> currentOffset = new HashMap<TopicPartition, Long>();
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.7.11:9092,10.4.7.12:9092,10.4.7.21:9092");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 创建消费者
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {
// Rebalance之前调用
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
// Rebalance之后调用
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
// 定位到最近提交的offset继续消费
consumer.seek(partition, getOffset(partition));
}
}
});
while (true) {
// 消费者拉取数据
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "--" + consumerRecord.value());
currentOffset.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
}
commitOffset(currentOffset);
}
}
// 获取某分区的最新offset
private static long getOffset(TopicPartition partition) {
return 0;
}
// 提交该消费者所有分区的offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset){
}
}
其中getOffset(TopicPartition partition)方法和commitOffset(Map<TopicPartition, Long> currentOffset)需要按照时机需求实现。
自定义Interceptor(拦截器)
拦截器原理
Producer拦截器(interceptor)实在kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时producer允许用户指定多个interceptor按序作用于同一条消息,从而形成一个拦截链(interceptor chain)。Interceptor的实现接口时org.apache.kafka.clients.producer.ProducerInteceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord)
该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和partition,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception)
该方法会在消息从RecordAccumulator成功发送到kafka broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAckknowlegement运行在producer的IO线程,因此不要再该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
(4)close
关闭interceptor,主要用于执行一些资源清理工作。
如前所述,interceptor可能被运行在多个线程中,因此再具体实现时用户需要自行确保线程安全。另外倘若制定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅时捕获每个interceptor可能抛出的一场记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
拦截器案例
1)需求:
实现一个简单的双interceptor组成的拦截器。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个inteceptor会在消息发送后更新成功发送消息数或失败发送消息数。
发送的数据 | TimeInterceptor | CounterInterceptor | ProducerInterceptor |
---|---|---|---|
message0 message1 ...... message9 message10 |
1)实现InterceptorProducer 2)获取record数据,并在value前加时间戳 |
1)返回record 2)统计发送成功和失败次数 3)关闭producer时,打印统计次数 |
1)构建拦截器链 2)发送数据 |
2)案例操作
a、新建TimeInteceptor实现ProducerInterceptor
package com.justops.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
public void configure(Map<String, ?> configs) {
}
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 创建一个ProducerRecord对象,并返回
return new ProducerRecord<String, String>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "," + record.value());
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
public void close() {
}
}
b、新建CounterInterceptor实现ProducerInterceptor
package com.justops.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
int success;
int error;
@Override
public void configure(Map<String, ?> map) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata != null) {
success++;
} else {
error++;
}
}
@Override
public void close() {
System.out.println("success: " + success);
System.out.println("error: " + error);
}
}
c、在properties中添加配置
// 添加拦截器
ArrayList<String> interceptors = new ArrayList<String>();
interceptors.add("com.justops.interceptor.TimeInterceptor");
interceptors.add("com.justops.interceptor.CounterInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
网友评论