美文网首页Java 杂谈
如何动态加载@KafkaListener的topics

如何动态加载@KafkaListener的topics

作者: 逍遥jc | 来源:发表于2019-06-27 14:06 被阅读0次

    问题来源

    我司最近刚重构完,终于有时间去偿还欠下的技术债了。

    最先准备改造的就是将一些原本应该异步执行而因重构时间紧而被迫同步执行的方法,通过消息队列异步化。

    在原来的老项目中,我们使用的是kafka。因此我们对kafka的熟悉程度远胜于其他消息队列,也正因此,我们在新系统中依然采用了kafka作为我们的消息队列中间件。

    而摆在我们面前的问题是,该如何在Spring Boot环境下集成kafka呢?

    spring-kafka作为spring的全家桶成员,是目前最简单的集成kafka的方式,自然也成为了我们的首选。

    这个集成kafka的重任,最终落到我的组员头上。

    在前期的编码阶段,他一帆风顺,并没有遇到什么问题。但在后期测试的时候,他发现了一个问题。因为topics肯定会随着业务的发展而不断增加,所以@KafkaListener注解下的topics字段肯定不能像demo一样写死固定的topic。

    他最初询问我的时候,我也没有很好的办法,因为我之前搭建kafka的时候,topics是我通过写代码的方式加载到消费者线程上去的。但是在注解中,这个方法显然连编译都通不过。

    然后他也尝试过将topics写在常量类中,而不再写在枚举中。这样当然能解决问题,但是这么做,对后续的调用者而言,简直是个灾难。

    调用者不光需要在常量类中添加topic和维护topic对应的业务消费方法,还需要额外将添加的topic加入到@KafkaListener注解中。

    最后一步至关重要,如果不将topic加到注解中,listener将不会监听该topic,也自然无法消费对应的topic了,但这最后一步显然是最容易被大家遗忘的。

    解决方案

    那么还有什么更好的方案么?

    那自然是有的。

    既然topics字段需要一个字符串数组,那我们可以通过spel语言,例如topics = {"#{'${topics}'.split(',')}"}这么写,然后通过字符串分割来动态生成一个字符串数组。

    spel语言在spring中应用广泛,尤其是在注解中。我们之前也通过spel解决了分布式锁中动态参数的问题。

    但是这么写带来了另外一个问题。 如果Spring在加载@KafkaListener对应的消费者bean时(下称ConsumerListener),找不到topics,就会报『Could not resolve placeholder 'topics' in value "#{'${topics}'.split(',')}"』的错误。

    那么${topics}该如何被替换呢?

    同学们应该很快就能想到可以通过配置文件来处理这个问题。

    最简单的,自然是在application.properties中配置用逗号隔开的多个topic了。

    但是这个解决方案,还是不太友好。虽然我们可以采用apollo或者disconf等分布式配置中心来管理配置文件,但依然没有解决很容易遗忘配置topic的问题。

    这时候,我们应该回顾Spring Boot配置属性加载的相关内容了。

    既然配置文件能解决${topics}被替换的问题,那么加载优先级更高的配置自然也可以解决该问题。而优先级比配置文件更高的配置中,我们可以发现Java系统参数(System.getProperties())是个可以利用的点。

    我们可以新创建一个KafkaTopicConfig的配置类,加上@Configuration注解,然后在该配置类初始化后通过System.setProperty("topics", topics)把topics加到系统参数中。

    具体在bean初始化后执行指定方法做法有四种,实现 InitializingBean接口定制初始化后的方法,通过 <bean> 元素的 init-method属性指定初始化后调用的操作,在指定方法上加上@PostConstruct来指定该方法在初始化后调用,以及最简单的构造器。

    本文选择了实现 InitializingBean接口来完成该操作。具体代码如下:

    @Configuration
    public class KafkaTopicConfig implements InitializingBean {
    
        @Override
        public void afterPropertiesSet() {
            String topics = Sets.newHashSet(KafkaTopicEnum.values()).stream()
                    .map(KafkaTopicEnum::getTopic).collect(Collectors.joining(","));
            System.setProperty("topics", topics);
        }
    }
    

    然后还需要注意的一点就是,KafkaTopicConfig配置类必须在ConsumerListener类之前加载到Spring的容器内,否则依然会在加载ConsumerListener类时报『Could not resolve placeholder 'topics' in value "#{'${topics}'.split(',')}"』的错误。

    而Spring Boot的bean装配规则是根据Application类所在的包位置从近到远进行扫描的,所以如果KafkaTopicConfig所在目录离Application的距离比ConsumerListener所在目录更远,就会导致ConsumerListener在KafkaTopicConfig之前加载。

    为了避免出现这种情况,我们必须调整这两个bean的加载顺序。具体修改加载bean加载顺序的方式有很多种,我们采用了在ConsumerListener类上加DependsOn注解的方式来解决,如下所示:

    @DependsOn(value = "kafkaTopicConfig")
    

    至此,动态加载@KafkaListener的topics的问题就被完美解决了。

    好了,我们下一期再见,欢迎大家一起留言讨论。同时也欢迎点赞和送小星星~

    相关文章

      网友评论

        本文标题:如何动态加载@KafkaListener的topics

        本文链接:https://www.haomeiwen.com/subject/texrcctx.html