美文网首页程序员
spring kafka之如何批量给topic加前缀

spring kafka之如何批量给topic加前缀

作者: linyb极客之路 | 来源:发表于2020-12-05 00:02 被阅读0次

前言

最近业务开发部门给我们部门提了一个需求,因为他们开发环境和测试环境共用一套kafka,他们希望我们部门能帮他们实现自动给kafka的topic加上环境前缀,比如开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是使用spring-kafka。一开始接到这个需求的时候,我心里是拒绝的,为啥开发环境和测试环境不分别部署一套kafka,还要那么麻烦。但老大都答应接这个需求了,作为小罗罗也只能接了

实现思路

1、生产者端

可以通过生产者拦截器,来给topic加前缀

2、实现步骤

a、编写一个生产者拦截器

@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {



    /**
     * 运行在用户主线程中,在消息被序列化之前调用
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {
        log.info("原始topic:{}",record.topic());
        return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),
                record.partition(),record.timestamp(),record.key(), record.value());
    }




    /**
     * 在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
      log.info("实际topic:{}",metadata.topic());
    }


    /**
     *  清理工作
     */
    @Override
    public void close() {
    }


    /**
     * 初始化工作
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }

b、配置拦截器

kafka:
    producer:
      # 生产者拦截器配置
      properties:
        interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor

c、测试

image.png

2、消费者端

这个就稍微有点难搞了,因为业务开发部门他们是直接用@KafkaListener的注解,形如下

 @KafkaListener(id = "msgId",topics = {Constant.TOPIC})

像这种也没啥好的办法,就只能通过源码了,通过源码可以发现在如下地方

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

会把@KafkaListener的值赋值给消费者,如果对spring有了解的朋友,可能会知道postProcessAfterInitialization是spring后置处理器的方法,主要用来bean初始化后的一些操作,既然我们知道@KafkaListener会在bean初始化后再进行赋值,那我们就可以在bean初始化前,修改掉@KafkaListener的值。具体实现如下

@Component
public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {

    @SneakyThrows
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

        List<String> packageNames = AutoConfigurationPackages.get(beanFactory);

        for (String packageName : packageNames) {
            Reflections reflections = new Reflections(new ConfigurationBuilder()
                    .forPackages(packageName) // 指定路径URL
                    .addScanners(new SubTypesScanner()) // 添加子类扫描工具
                    .addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
                    .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
                    .addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
            );

            Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
            if(!CollectionUtils.isEmpty(methodSet)){
                for (Method method : methodSet) {
                    KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                    changeTopics(kafkaListener);
                }
            }
        }

    }


    private void changeTopics(KafkaListener kafkaListener) throws Exception{
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
        Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
        memberValuesField.setAccessible(true);
        Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);
        String[] topics = (String[])memberValues.get("topics");
        System.out.println("修改前topics:" + Lists.newArrayList(topics));
        for (int i = 0; i < topics.length; i++) {
            topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
        }
        memberValues.put("topics", topics);
        System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics()));

    }
}

测试

image.png
image.png

总结

虽然实现了动态修改topic,但我还是觉得topic不要随便改变,有条件的话,kafka还是得基于物理环境隔离,其次真的客观条件不允许,要动态变更topic,则需做好topic动态变更宣导以及相关wiki的编写,不然很容易掉坑

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-mq-idempotent-consume

相关文章

  • spring kafka之如何批量给topic加前缀

    前言 最近业务开发部门给我们部门提了一个需求,因为他们开发环境和测试环境共用一套kafka,他们希望我们部门能帮他...

  • 如何在kafka中创建topic

    [Toc] 如何在kafka中创建topic 在使用kafka发送消息和消费消息之前,必须先要创建topic,在k...

  • kafka笔记

    docker运行Kafka kafka操作命令 进入容器 创建topic 查看topic 删除topic 启动生产...

  • zookeeper删除kafka元数据

    问题:卸载kafka前未删除kafka topic,重新安装kafka后,生成跟之前topic名字相同的topic...

  • 2018-11-14

    Kafka如何彻底删除topic 环境描述 Kafka集群环境如下,三台broker,三台zookeeper,搭建...

  • Kafka操作指令

    启动kafka 安全关闭kafka 创建topic 删除topic 查询topic 启动控制台Producer,向...

  • kafka2.x 基本命令

    export kafka_zk=export kafka_brokers= 创建 topic列表 topic详细 ...

  • 修改kafka分区数

    这里记录下如何修改kafka的topic分区 查看当前topic分区,假设主题是testtopic 修改分区个数 ...

  • kafka极简入门(四)--常用配置

    回顾:kafka极简入门(三)--创建topic 前言 kafka针对broker, topic, produce...

  • flume使用kafka作为sink

    启动kafka 先启动zookeeper: 然后启动kafka: 创建topic 查看创建的topic: 启动消费...

网友评论

    本文标题:spring kafka之如何批量给topic加前缀

    本文链接:https://www.haomeiwen.com/subject/eqewwktx.html