依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
关键代码
/**
* @author samlen_tsoi
* @date 2018/3/23
*/
@Slf4j
@RestController
@RequestMapping("kafka")
public class KafkaController {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 同步到kafka
*
* @param topic 主题
* @param mes 消息内容
* @return
*/
@GetMapping("syncKafka")
public Result syncKafka(@RequestParam("topic") String topic,
@RequestParam("mes") String mes) {
kafkaTemplate.send(topic, mes);
return Result.ok();
}
/**
* 这里使用正则匹配topic,其中【*】之前得加上【.】才能匹配到。
*
* @param record
*/
@KafkaListener(topicPattern = "showcase.*")
public void listenPattern(ConsumerRecord<String, String> record) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
log.info("topic : {}, mes : {}", record.topic(), kafkaMessage.get());
}
}
/**
* 指定单个topic
*
* @param record
*/
@KafkaListener(topicPattern = "show-web")
public void listenOne(ConsumerRecord<String, String> record) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
log.info("topic : {}, mes : {}", record.topic(), kafkaMessage.get());
}
}
}
showcase.*
中.
是必须的,否则匹配不到目标topic。
控制台日志
2018-03-23 15:16:42.923 INFO 10297 --- [nio-9003-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.2.0
2018-03-23 15:16:42.923 INFO 10297 --- [nio-9003-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 576d93a8dc0cf421
2018-03-23 15:16:42.967 INFO 10297 --- [ntainer#0-0-L-1] s.t.s.web.controller.KafkaController : topic : showcase-a, mes : a
2018-03-23 15:16:50.693 INFO 10297 --- [ntainer#0-0-L-1] s.t.s.web.controller.KafkaController : topic : showcase-b, mes : b
2018-03-23 15:16:54.915 INFO 10297 --- [ntainer#0-0-L-1] s.t.s.web.controller.KafkaController : topic : showcase-c, mes : c
github地址
showcase,类路径:samlen.tsoi.showcase.web.controller.KafkaController
网友评论