美文网首页
可修改对接地址的kafka客户端

可修改对接地址的kafka客户端

作者: ShootHzj | 来源:发表于2021-09-09 11:34 被阅读0次

原生的kafka客户端并不支持修改客户端连接的服务器地址,我们有需求需要支持动态地更新kafka客户端的地址,最简单的一个做法就是通过组合的方式,对原生的kafka客户端做一层wrap,来支持修改kafka客户端地址。这层wrap尽量做到无锁化,不影响性能

核心代码

生产者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Properties;
import java.util.concurrent.Future;

/**
 * @author hezhangjian
 */
@Slf4j
public class EnhanceKafkaProducer<K, V> {

    private volatile KafkaProducer<K, V> producer;

    public EnhanceKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.producer = new KafkaProducer<K, V>(properties, keySerializer, valueSerializer);
    }

    /**
     * the caller should ensure thread safe call to this obj
     *
     * @param properties
     * @param keySerializer
     * @param valueSerializer
     */
    public void changeParam(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        KafkaProducer<K, V> oldProducer = this.producer;
        this.producer = null;
        try {
            oldProducer.close();
        } catch (Exception e) {
            log.error("ignore the old client close error");
        }
        this.producer = new KafkaProducer<K, V>(properties, keySerializer, valueSerializer);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        if (producer == null) {
            throw new IllegalStateException("kafka producer is switching");
        }
        return producer.send(record);
    }

}

消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;
import java.util.Properties;

/**
 * @author hezhangjian
 */
@Slf4j
public class EnhanceKafkaConsumer<K, V> {

    private volatile KafkaConsumer<K, V> consumer;

    public EnhanceKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this.consumer = new KafkaConsumer<K, V>(properties, keyDeserializer, valueDeserializer);
    }

    /**
     * the caller should ensure thread safe call to this obj
     *
     * @param properties
     * @param keyDeserializer
     * @param valueDeserializer
     */
    public void changeParam(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        KafkaConsumer<K, V> oldConsumer = this.consumer;
        this.consumer = null;
        try {
            oldConsumer.close();
        } catch (Exception e) {
            log.error("ignore the old client close error");
        }
        this.consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer);
    }

    public ConsumerRecords<K, V> poll(final Duration timeout) {
        if (consumer == null) {
            throw new IllegalStateException("kafka producer is switching");
        }
        return consumer.poll(timeout);
    }

}

相关文章

网友评论

      本文标题:可修改对接地址的kafka客户端

      本文链接:https://www.haomeiwen.com/subject/ovyiwltx.html