File>New>Spring Starter Project
Next
搜索Kafka,选择依赖
完成
Receiver.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = { "sce_gather_job", "test1" })
//可以写多个topic
public void receiveMessage(ConsumerRecord<String, String> record) {
logger.info("kafka的key: " + record.key());
logger.info("kafka的value: " + record.value().toString());
System.out.println(record.topic());
System.out.println("【*** 接收消息 ***】key = " + record.key() + "、value = " + record.value());
}
}
SpringBootKafkaApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootKafkaApplication {
//身份认证时需要
static {
System.setProperty("java.security.auth.login.config", "/home/wucan/conf/client.cf");
}
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApplication.class, args);
}
}
网友评论