美文网首页
springboot 集成kafka

springboot 集成kafka

作者: 楚长铭 | 来源:发表于2020-03-15 11:33 被阅读0次

    docker搭建kafka

    • 由于是自己电脑本地搭建环境,为了简单点使用docker
    docker run -d --name zookeeper  -p 2181:2181 zookeeper
    
    • kafka需要zookeeper环境
    docker run -d --name kafka -p 9092:9092 \
    --link zookeeper:zookeeper \
    --env KAFKA_BROKER_ID=1 \
    --env HOST_IP=本机ip地址 \
    --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    --env KAFKA_ADVERTISED_HOST_NAME=本机ip地址 \
    --env KAFKA_ADVERTISED_PORT=9092 \
    -t wurstmeister/kafka
    
    • 搭建时填入本机ip地址即可
    $ docker exec -it kafka bash
    
    • 进入dokcer容器的中kafaka
    $ cd /opt/kafka_2.12-2.3.0/bin/
    
    • 进入bin文件夹
    $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic 主题名
    
    • 创建topic,如果没有创建主题,springboot启动的时候会报错

    依赖

    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation 'org.springframework.kafka:spring-kafka'
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        testImplementation('org.springframework.boot:spring-boot-starter-test') {
            exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
        }
        testImplementation 'org.springframework.kafka:spring-kafka-test'
    }
    

    配置文件

    server:
      port: 4000
    spring:
      kafka:
        bootstrap-servers: localhost:9092
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: default_consumer_group
          enable-auto-commit: true
          auto-commit-interval: 1000
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserialize
    

    消费者

    @Slf4j
    @Component
    public class Consumer {
    
        @KafkaListener(topics = "sun")
        public void listen(ConsumerRecord<?, ?> record) {
            log.info("主题是{}",record.topic());
            log.info("offset:{}",record.offset());
            log.info("收到的信息:{}", record.value());
        }
    
    }
    

    生产者

    @RestController
    public class ProducerController {
    
    
        @Resource
        private KafkaTemplate<String,Object> kafkaTemplate;
    
        @GetMapping("/")
        public String producer(){
            kafkaTemplate.send("sun", "这是发送的信息"); //使用kafka模板发送信息
            return "生产者接口";
        }
    
    }
    
    
    • 使用 @Autowired会爆红线但也不影响使用

    相关文章

      网友评论

          本文标题:springboot 集成kafka

          本文链接:https://www.haomeiwen.com/subject/jxtnshtx.html