1、消息处理的类
定义一个类,继承MessageListener接口来处理消息
public class KakaMessageListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
String topic = record.topic();
String content = record.value();
}
}
2、配置Kafka的Consumer
@Data
@Component
@ConfigurationProperties(prefix = "kafkaconfig")
@EnableKafka
public class KafkaConfig {
private String servers;
private String groupid;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(6);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
return propsMap;
}
}
3、动态的进行订阅和取消订阅
@Autowired
KakaMessageListener kakaMessageListener;
@Autowired
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> factory;
public static ConcurrentHashMap<String, ConcurrentMessageListenerContainer<String, String>> cache = new ConcurrentHashMap<>();
@Override
public void subscribe(String topic) {
ConcurrentMessageListenerContainer<String, String> container = null;
if(cache.containsKey(topic)){
container = cache.get(topic);
}
if(container == null){
container = factory.createContainer(topic);
cache.put(topic, container);
}
container.setupMessageListener(kakaMessageListener);
container.start();
log.info("订阅kafka消息:" + topic);
}
@Override
public void unSubscribe(String topic) {
if(cache.containsKey(topic)){
ConcurrentMessageListenerContainer<String, String> container = cache.get(topic);
if(container != null){
container.stop();
log.info("取消订阅kafka消息:" + topic);
}
}
}
网友评论