docker搭建kafka
- 由于是自己电脑本地搭建环境,为了简单点使用docker
docker run -d --name zookeeper -p 2181:2181 zookeeper
docker run -d --name kafka -p 9092:9092 \
--link zookeeper:zookeeper \
--env KAFKA_BROKER_ID=1 \
--env HOST_IP=本机ip地址 \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=本机ip地址 \
--env KAFKA_ADVERTISED_PORT=9092 \
-t wurstmeister/kafka
$ docker exec -it kafka bash
$ cd /opt/kafka_2.12-2.3.0/bin/
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic 主题名
- 创建topic,如果没有创建主题,springboot启动的时候会报错
依赖
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
配置文件
server:
port: 4000
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default_consumer_group
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserialize
消费者
@Slf4j
@Component
public class Consumer {
@KafkaListener(topics = "sun")
public void listen(ConsumerRecord<?, ?> record) {
log.info("主题是{}",record.topic());
log.info("offset:{}",record.offset());
log.info("收到的信息:{}", record.value());
}
}
生产者
@RestController
public class ProducerController {
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
@GetMapping("/")
public String producer(){
kafkaTemplate.send("sun", "这是发送的信息"); //使用kafka模板发送信息
return "生产者接口";
}
}
网友评论