一. Producer API
消息发送流程
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了
两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。 main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取 消息发送到 Kafka broker。
pom文件配置
2.2.1 为kafka的版本
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
</dependency>
1.2 异步发送普通生产者
代码:
BATCH_SIZE_CONFIG = "batch.size":消息为batch.size大小,生产者才发送消息
LINGER_MS_CONFIG = "linger.ms":如果消息大小迟迟不为batch.size大小,则等待linger.ms时间后直接发送
package com.bigdata.study.kafka;
/**
* @author 只是甲
* @date 2021-10-29
* @remark kafka生产者 - 异步 - 不带回调函数的API
*/
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.*;
public class CustomProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 设置集群配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
// ack机制
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 1);
// 批次大小:消息大小为16384才发送消息
props.put("batch.size", 16384);
// 等待时间:如果消息大小迟迟不为batch.size大小,则等待linger.ms时间后直接发送
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// ReadAccumulator缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 构造Producer
Producer<String,String> producer=new KafkaProducer<>(props);
// 生产消费
for (int i = 0; i < 100; i++){
producer.send(new ProducerRecord<String, String>("kafka_test1", "test_" + Integer.toString(i), "test_" + Integer.toString(i)));
}
producer.close();
}
}
shell中查看消费者:
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server hp2:9092 --topic kafka_test1
image.png
1.2 异步发送带回调函数的生产者
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别为RecordMetaData和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
消息发送失败会启动重试机制,但需要在回调函数中手动重试
代码:
package com.bigdata.study.kafka;
/**
* @author 只是甲
* @date 2021-10-29
* @remark kafka生产者 - 异步 - 带回调函数的API
*/
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.*;
public class CallBackProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 设置集群配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
// ack机制
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 1);
// 批次大小:消息大小为16384才发送消息
props.put("batch.size", 16384);
// 等待时间:如果消息大小迟迟不为batch.size大小,则等待linger.ms时间后直接发送
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// ReadAccumulator缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 构造Producer
Producer<String,String> producer=new KafkaProducer<>(props);
for (int i = 0; i < 100; i++){
producer.send(new ProducerRecord<String, String>("kafka_test1", "test-" + Integer.toString(i),"test-" + Integer.toString(i)), new Callback() {
//回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if ( e == null ) {
System.out.println("success->" + recordMetadata.offset());
} else {
e.printStackTrace();
}
}
});
}
producer.close();
}
}
1.3 生产者分区策略测试
image.pngpublic ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {};
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {};
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {};
public ProducerRecord(String topic, Integer partition, K key, V value) {};
public ProducerRecord(String topic, K key, V value) {};
public ProducerRecord(String topic, V value) {};
上面ProducerRecord中的partition参数即为指定的分区(分区是有编号的,这是指定分区中的某一个,实际应该为一个分区编号)。
这里要注意,如果指定特定分区的话,消息是会发送到这个编号的特定分区,但是注意如果你的Topic分区只有默认的1个,而你却要发送到分区1号,此时发送会失败!因为你只有1个分区,即0号分区。所以在构建的topic的时候需要注意。
默认分区构造
// 构造消息体,这里加上具体的分区,其中的2是特定的分区编号
producer.send(new ProducerRecord<>("aaroncao",2, "test-" + i, "test-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println(recordMetadata.partition() + "-" + recordMetadata.offset());
} else {
e.printStackTrace();
}
}
});
二. API消费者
2.1 简单消费者
Kafka提供了自动提交offset的功能enable.auto.commit=true;
代码:
package com.bigdata.study.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* @author 只是甲
* @date 2021-10-29
* @remark kafka消费者
*/
public class CustomConsumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 设置集群配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
// 设置消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group1");
// 设置offset的自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 设置offset自动化提交的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 生产者是序列化,消费者则为反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// offset重置,需要设置自动重置为earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 这里需要订阅具体的topic
consumer.subscribe(Collections.singletonList("kafka_test1"));
// 一直处于监听状态中
while (true) {
// 因为消费者是通过pull获取消息消费的,这里设置间隔100ms
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
// 对获取到的结果遍历
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(),consumerRecord.key(),consumerRecord.value());
}
// 同步提交,会一直阻塞直到提交成功,这里可以设置超时时间,如果阻塞超过超时时间则释放
//consumer.commitSync();
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.println("Commit failed, offset = " + offsets);
}
}
});
}
}
}
测试记录:
2.2 消费者重置offset
Consumer消费数据时的可靠性很容易保证,因为数据在Kafka中是持久化的,不用担心数据丢失问题。但由于Consumer在消费过程中可能遭遇断电或者宕机等故障,Consumer恢复之后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费的offset位置,以便故障恢复后可以继续消费。
offset的维护是Consumer消费数据必须考虑的问题。
// offset重置,需要设置自动重置为earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
将消费者组的id变换一下即可,否则由于一条消息只能够被一个消费者组中的消费者消费一次,此时不会重新消费之前的消息,即使设置了offset重置也没有作用。
注意
这里的auto.offset.reset="earliest"的作用等同于在linux控制台,消费者监听的时候添加的--from-beginning命令。
auto.offset.reset取值
- earliest:重置offset到最早的位置
- latest:重置offset到最新的位置,默认值
- none:如果在消费者组中找不到前一个offset则抛出异常
- anything else:抛出异常给消费者
2.3 消费者保存offset读取问题
enable.auto.commit=true即自动提交offset。默认是自动提交的。
2.4 消费者手动提交offset
自动提交offset十分便利,但是由于其实基于时间提交的,开发人员难以把握offset提交的时机,因此kafka提供了手动提交offset的API。
手动提交offset的方法主要有两种:
- commitSync:同步提交
- commitAsync:异步提交
相同点: 两种方式的提交都会将本次poll拉取的一批数据的最高的偏移量提交。
不同点: commitSync阻塞当前线程,持续到提交成功,失败会自动重试(由于不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,有可能提交失败。
代码:
同步提交
package com.bigdata.study.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author 只是甲
* @date 2021-10-29
* @remark kafka消费者 - 同步提交
*/
public class SyncCommitOffset {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 设置集群配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
// 设置消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group2");
// 设置offset的自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 设置offset自动化提交的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 生产者是序列化,消费者则为反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// offset重置,需要设置自动重置为earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 这里需要订阅具体的topic
consumer.subscribe(Collections.singletonList("kafka_test1"));
// 一直处于监听状态中
while (true) {
// 因为消费者是通过pull获取消息消费的,这里设置间隔100ms
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
// 对获取到的结果遍历
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(),consumerRecord.key(),consumerRecord.value());
}
// 同步提交,会一直阻塞直到提交成功,这里可以设置超时时间,如果阻塞超过超时时间则释放
consumer.commitSync();
}
}
}
异步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.println("Commit failed, offset = " + offsets);
}
}
});
2.5 数据漏消费和重复消费分析
无论是同步提交还是异步提交offset,都可能会造成数据的漏消费或者重复消费,先提交offset后消费,有可能造成数据的漏消费,而先消费再提交offset,有可能会造成数据的重复消费。
2.6 自定义存储offset
Kafka0.9版本之前,offset存储在zookeeper中,0.9版本及之后的版本,默认将offset存储在Kafka的一个内置的topic中,除此之外,Kafka还可以选择自定义存储offset数据。offse的维护相当繁琐,因为需要考虑到消费者的rebalance过程:
当有新的消费者加入消费者组、已有的消费者退出消费者组或者订阅的主体分区发生了变化,会触发分区的重新分配操作,重新分配的过程称为Rebalance。
消费者发生Rebalace之后,每个消费者消费的分区就会发生变化,因此消费者需要先获取到重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。(High Water高水位)
代码:
package com.bigdata.study.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
/**
* @author 只是甲
* @date 2021-10-29
* @remark Kafka自定义offset提交
*/
public class CustomOffsetCommit {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
// 设置消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group4");
// 设置offset的自动提交为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 这里的意思是订阅的时候同时定义Consumer重分配的监听器接口
consumer.subscribe(Collections.singletonList("kafka_test1"), new ConsumerRebalanceListener() {
// rebalance发生之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
// rebalance发生之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
// 定位到最新的offset位置
consumer.seek(partition, getOffset(partition));
}
}
});
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
// 记录下当前的offset
currentOffset.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
}
}
}
// 获取某分区最新的offset
private static long getOffset(TopicPartition topicPartition) {
return 0;
}
// 提交该消费者所有分区的offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}
即自己记录下需要提交的offset,利用Rebalance分区监听器监听rebalance事件,一旦发生rebalance,先将offset提交,分区之后则找到最新的offset位置继续消费即可
三. 自定义拦截器
拦截器原理
Producer拦截器interceptor是在Kafka0.10版本引入的,主要用于Clients端的定制化控制逻辑。对于Producer而言,interceptor使得用户在消息发送之前以及Producer回调逻辑之前有机会对消息做一些定制化需求,比如修改消息的展示样式等,同时Producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链interceptor chain,Interceptor实现的接口为ProducerInterceptor,主要有四个方法:
-
configure(Map<String, ?> configs):获取配置信息和初始化数据时调用
-
onSend(ProducerRecord record):该方法封装在KafkaProducer.send()方法中,运行在用户主线程中,Producer确保在消息被序列化之前及计算分区前调用该方法,并且通常都是在Producer回调逻辑出发之前。
-
onAcknowledgement(RecordMetadata metadata, Exception exception):onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
-
close():关闭inteceptor,主要用于执行资源清理工作。
Inteceptor可能被运行到多个线程中,在具体使用时需要自行确保线程安全,另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并紧紧是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。
自定义加入时间戳拦截器
TimeInterceptor
package com.bigdata.study.kafka;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* @author 只是甲
* @date 2021-10-29
* @remark Kafka 消息拦截器
*/
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
"TimeInterceptor:" + System.currentTimeMillis() + "," + record.value());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
CounterInterceptor
package com.bigdata.study.kafka;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* @author 只是甲
* @date 2021-10-29
* @remark Kafka 消息拦截器
*/
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 输出结果,结束输出
System.out.println("Sent successful:" + successCounter);
System.out.println("Sent failed:" + errorCounter);
}
@Override
public void configure(Map<String, ?> map) {
}
}
在CustomProducer中加入拦截器
// 加入拦截器
List<Object> interceptors = new ArrayList<>();
interceptors.add(TimeInterceptor.class);
interceptors.add(CounterInterceptor.class);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
测试记录:
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server hp2:9092 --topic kafka_test1
image.png
网友评论