原生的kafka
客户端并不支持修改客户端连接的服务器地址,我们有需求需要支持动态地更新kafka
客户端的地址,最简单的一个做法就是通过组合的方式,对原生的kafka
客户端做一层wrap
,来支持修改kafka
客户端地址。这层wrap尽量做到无锁化,不影响性能
核心代码
生产者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author hezhangjian
*/
@Slf4j
public class EnhanceKafkaProducer<K, V> {
private volatile KafkaProducer<K, V> producer;
public EnhanceKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this.producer = new KafkaProducer<K, V>(properties, keySerializer, valueSerializer);
}
/**
* the caller should ensure thread safe call to this obj
*
* @param properties
* @param keySerializer
* @param valueSerializer
*/
public void changeParam(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
KafkaProducer<K, V> oldProducer = this.producer;
this.producer = null;
try {
oldProducer.close();
} catch (Exception e) {
log.error("ignore the old client close error");
}
this.producer = new KafkaProducer<K, V>(properties, keySerializer, valueSerializer);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
if (producer == null) {
throw new IllegalStateException("kafka producer is switching");
}
return producer.send(record);
}
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import java.time.Duration;
import java.util.Properties;
/**
* @author hezhangjian
*/
@Slf4j
public class EnhanceKafkaConsumer<K, V> {
private volatile KafkaConsumer<K, V> consumer;
public EnhanceKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this.consumer = new KafkaConsumer<K, V>(properties, keyDeserializer, valueDeserializer);
}
/**
* the caller should ensure thread safe call to this obj
*
* @param properties
* @param keyDeserializer
* @param valueDeserializer
*/
public void changeParam(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
KafkaConsumer<K, V> oldConsumer = this.consumer;
this.consumer = null;
try {
oldConsumer.close();
} catch (Exception e) {
log.error("ignore the old client close error");
}
this.consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer);
}
public ConsumerRecords<K, V> poll(final Duration timeout) {
if (consumer == null) {
throw new IllegalStateException("kafka producer is switching");
}
return consumer.poll(timeout);
}
}
网友评论