美文网首页
kafka手动监听主题

kafka手动监听主题

作者: 爱余星痕 | 来源:发表于2020-05-11 14:45 被阅读0次

很多人作kafka消费时,都快速的使用注解@KafkaListener进行监听。
但我现在有个需求,是要动态的手动监听。
实现代码如下:
1.手动编写监听类

public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        messageQueueConsumerService.receiveMessage(data.value());
    }
}

2.配置监听

@Component
public class MessageQueueKafkaConsumerServiceFactory  implements InitializingBean {

    @Autowired
    KafkaProperties kafkaProperties;

    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
    }




    private KafkaMessageListenerContainer<Integer, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<>(props);
        return new KafkaMessageListenerContainer<>(cf, containerProps);
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());

            containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
            );
            KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
            container.setBeanName(messageQueueConsumerService.topic() + "kafkaListener");

            container.start();

        });

    }
}

上面 的IMessageQueueConsumerService是我自定义处理监听的真实类
至此,手动监听代码已完成.

相关文章

网友评论

      本文标题:kafka手动监听主题

      本文链接:https://www.haomeiwen.com/subject/vsnynhtx.html