最近在做平台的kafka消息监听的改造,以前用的是平台自己封装jar,现在统一改用spring-kafka.jar,这样的好处是减少特殊处理,便于统一维护。
以下是配置:
import java.util.*;
/**
* @Description: kafka配置类
* @Author: LiuBing
* @Date: 13:42 2018/9/12
*/
@Configuration
public class KafkaConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.topic.name}")
private String topicName;
@Value("${kafka.consumer.group}")
private String consumerGroup;
@Bean
public DefaultKafkaConsumerFactory defaultKafkaConsumerFactory(){
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(consumerProperties,new StringDeserializer(),new StringDeserializer());
return factory;
}
@Bean
public ContainerProperties containerProperties(MessageListener<String, String> consumerMessageListener){
ContainerProperties properties = new ContainerProperties(topicName);
properties.setGroupId(consumerGroup);
properties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
properties.setMessageListener(consumerMessageListener);
return properties;
}
@Bean
public ConcurrentMessageListenerContainer kafkaMessageListenerContainer(ConsumerFactory defaultKafkaConsumerFactory,
ContainerProperties containerProperties){
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(defaultKafkaConsumerFactory,containerProperties);
container.setConcurrency(3);
return container;
}
@Bean
public MessageListener<String, String> consumerMessageListener(){
return new KafkaConsumeMessageListenerTwo();
}
}
然后是一个监听类,注意在上边的配置中我们需要将这个监听类设置到我们的配置里边
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description: 消费者监听
* @Author: LiuBing
* @Date: 14:46 2018/9/19
*/
@Slf4j
public class kafkaConsumeMessageListenerTwo implements MessageListener<String, String> {
@Autowired
private TInsuranceCommonArgumentDao insuranceCommonArgumentDao;
@Autowired
private RaiseFundServiceImpl raiseFundServiceImpl;
@Autowired
private TradeCenterServiceImpl tradeCenterService;
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord) {
log.info("kafkaConsumeMessageListenerTwo.threadPoolExecutor...hashcode:{}", threadPoolExecutor.hashCode());
threadPoolExecutor.execute(() -> {
try {
String value = insuranceCommonArgumentDao.getValue(MarketActivityEnum.QFBF001_HB_SWITCH.getTemplateCoe());
if ("0".equals(value)){
return;
}
try {
//do business
doBusiness(consumerRecord);
} catch (Exception e) {
log.error("消息接收异常:{}", e.getMessage());
}
} catch (Exception e) {
log.info("错误信息:{}", e);
}
});
}
public void doBusiness(ConsumerRecord<String, String> consumerRecord) throws Exception{
// 将json格式的消息转换为bean对象
log.info("消息体串:{}", consumerRecord);
// topic匹配
switch (consumerRecord.topic()){
case CommonConstant.FUND_ORDER_TOPIC:
log.info("偏移量:{},topic:{}", consumerRecord.offset(),consumerRecord.topic());
TradeCenterFundMessageDTO fundMessageDTO = JsonUtil.fromJson(consumerRecord.value(),TradeCenterFundMessageDTO.class);
raiseFundServiceImpl.capitalDomain(fundMessageDTO, CommonConstant.PREMIUM_ACCOUNT_NO,CommonConstant.QFBF_ACTIVITY_NO);
break;
case CommonConstant.ACQ_ORDER_PAY_TOPIC:
log.info("偏移量:{},topic:{}", consumerRecord.offset(),consumerRecord.topic());
TradeCenterAcqMessageDTO acqMessageDTO = JsonUtil.fromJson(consumerRecord.value(),TradeCenterAcqMessageDTO.class);
tradeCenterService.tradeCenterRaise(acqMessageDTO, CommonConstant.PREMIUM_ACCOUNT_NO,CommonConstant.QFBF_ACTIVITY_NO);
String button = insuranceCommonArgumentDao.getValue("SHQFBF_BUTTON");
if("true".equals(button)) {
tradeCenterService.tradeCenterRaise(acqMessageDTO, CommonConstant.SHPREMIUM_ACCOUNT_NO,CommonConstant.SHQFBF_ACTIVITY_NO);
}
break;
default:
break;
}
}
}
至此便可以监听到消息进行业务处理了。
网友评论