美文网首页
使用非注解形式的javaConfig配置进行kafka消息监听

使用非注解形式的javaConfig配置进行kafka消息监听

作者: 神蛋_狄仁杰 | 来源:发表于2018-09-20 15:12 被阅读0次

    最近在做平台的kafka消息监听的改造,以前用的是平台自己封装jar,现在统一改用spring-kafka.jar,这样的好处是减少特殊处理,便于统一维护。
    以下是配置:

    import java.util.*;
    
    /**
     * @Description: kafka配置类
     * @Author: LiuBing
     * @Date: 13:42 2018/9/12
     */
    @Configuration
    public class KafkaConfig {
    
        @Value("${kafka.bootstrap.servers}")
        private String bootstrapServers;
    
        @Value("${kafka.topic.name}")
        private String topicName;
    
        @Value("${kafka.consumer.group}")
        private String consumerGroup;
    
        @Bean
        public DefaultKafkaConsumerFactory defaultKafkaConsumerFactory(){
            Map<String, Object> consumerProperties = new HashMap<>();
            consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(consumerProperties,new StringDeserializer(),new StringDeserializer());
            return factory;
        }
    
        @Bean
        public ContainerProperties containerProperties(MessageListener<String, String> consumerMessageListener){
            ContainerProperties properties = new ContainerProperties(topicName);
            properties.setGroupId(consumerGroup);
            properties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
            properties.setMessageListener(consumerMessageListener);
            return properties;
        }
    
        @Bean
        public ConcurrentMessageListenerContainer kafkaMessageListenerContainer(ConsumerFactory defaultKafkaConsumerFactory,
                                                                                ContainerProperties containerProperties){
            ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(defaultKafkaConsumerFactory,containerProperties);
            container.setConcurrency(3);
            return container;
        }
    
        @Bean
        public MessageListener<String, String> consumerMessageListener(){
            return new KafkaConsumeMessageListenerTwo();
        }
    
    }
    

    然后是一个监听类,注意在上边的配置中我们需要将这个监听类设置到我们的配置里边

    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @Description: 消费者监听
     * @Author: LiuBing
     * @Date: 14:46 2018/9/19
     */
    @Slf4j
    public class kafkaConsumeMessageListenerTwo implements MessageListener<String, String> {
    
        @Autowired
        private TInsuranceCommonArgumentDao insuranceCommonArgumentDao;
    
        @Autowired
        private RaiseFundServiceImpl raiseFundServiceImpl;
    
        @Autowired
        private TradeCenterServiceImpl tradeCenterService;
    
        @Autowired
        private ThreadPoolExecutor threadPoolExecutor;
    
        @Override
        public void onMessage(ConsumerRecord<String, String> consumerRecord) {
    
            log.info("kafkaConsumeMessageListenerTwo.threadPoolExecutor...hashcode:{}", threadPoolExecutor.hashCode());
            threadPoolExecutor.execute(() -> {
                try {
                    String value = insuranceCommonArgumentDao.getValue(MarketActivityEnum.QFBF001_HB_SWITCH.getTemplateCoe());
                    if ("0".equals(value)){
                        return;
                    }
                    try {
                        //do business
                        doBusiness(consumerRecord);
                    } catch (Exception e) {
                        log.error("消息接收异常:{}", e.getMessage());
                    }
                } catch (Exception e) {
                    log.info("错误信息:{}", e);
                }
            });
        }
    
    
        public void doBusiness(ConsumerRecord<String, String> consumerRecord) throws Exception{
            // 将json格式的消息转换为bean对象
            log.info("消息体串:{}", consumerRecord);
            //  topic匹配
            switch (consumerRecord.topic()){
                case CommonConstant.FUND_ORDER_TOPIC:
                    log.info("偏移量:{},topic:{}", consumerRecord.offset(),consumerRecord.topic());
                    TradeCenterFundMessageDTO fundMessageDTO = JsonUtil.fromJson(consumerRecord.value(),TradeCenterFundMessageDTO.class);
                    raiseFundServiceImpl.capitalDomain(fundMessageDTO, CommonConstant.PREMIUM_ACCOUNT_NO,CommonConstant.QFBF_ACTIVITY_NO);
                    break;
                case CommonConstant.ACQ_ORDER_PAY_TOPIC:
                    log.info("偏移量:{},topic:{}", consumerRecord.offset(),consumerRecord.topic());
                    TradeCenterAcqMessageDTO acqMessageDTO = JsonUtil.fromJson(consumerRecord.value(),TradeCenterAcqMessageDTO.class);
                    tradeCenterService.tradeCenterRaise(acqMessageDTO, CommonConstant.PREMIUM_ACCOUNT_NO,CommonConstant.QFBF_ACTIVITY_NO);
                    String button = insuranceCommonArgumentDao.getValue("SHQFBF_BUTTON");
                    if("true".equals(button)) {
                        tradeCenterService.tradeCenterRaise(acqMessageDTO, CommonConstant.SHPREMIUM_ACCOUNT_NO,CommonConstant.SHQFBF_ACTIVITY_NO);
                    }
                    break;
                default:
                    break;
            }
    
        }
    }
    
    

    至此便可以监听到消息进行业务处理了。

    相关文章

      网友评论

          本文标题:使用非注解形式的javaConfig配置进行kafka消息监听

          本文链接:https://www.haomeiwen.com/subject/vvcnnftx.html