一、pom引入依赖
<!-- Kafka 依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
二、创建拦截器
package com.yang.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* Kafka拦截器
*/
public class MyInterceptor implements ProducerInterceptor<String,String> {
//统计成功、失败发送的消息数
static int success = 0 ;
static int error = 0;
/**
* 拦截处理发送的消息等
* @param producerRecord
* @return
*/
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
String oldValue = (String)producerRecord.value();
String newValue = System.currentTimeMillis() + "_" + oldValue;
ProducerRecord record = new ProducerRecord(producerRecord.topic(),newValue);
return record;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if(e== null){
success++;
}else{
error++;
}
}
@Override
public void close() {
System.out.println("成功发送消息数:"+success);
System.out.println("失败发送消息数:"+error);
}
@Override
public void configure(Map<String, ?> map) {
}
}
二、创建生产者
/**
* 简单发送消息
*/
public static void producer() throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.8.8.101:9092");
//应答机制
properties.put(ProducerConfig.ACKS_CONFIG, "1");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, "1");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//RecordAccumulator 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//key value 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//配置拦截器,消息新增时间戳,及统计成功、失败发送数
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.yang.kafka.interceptor.MyInterceptor");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++){
//异步操作
//producer.send(new ProducerRecord<String, String>("t01", "Hello Kafka " + i));
//异步操作 调用get方法会等待该线程执行完成继续操作
//producer.send(new ProducerRecord<String, String>("t01", "Hello Kafka " + i)).get();
//回调函数
producer.send(new ProducerRecord<String, String>("t01", "Hello Kafka " + i ),new Callback(){
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("success:offset:"+recordMetadata.offset());
}else{
e.printStackTrace();
}
}
});
}
producer.close();
}
三、创建消费者
public static void consumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.8.8.101:9092");
//设置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
//key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//设置拿取大小
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 100 * 1024 * 1024);
//设置自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//设置自动提交延时
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅消息主题
consumer.subscribe(Arrays.asList("t01"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "====" + record.partition() + "====" + record.value());
}
}
}
四、测试
正常生产、正常消费、正常拦截


网友评论