在springboot中对kafka进行读写

作者: 冬天里的懒喵 | 来源:发表于2017-09-06 11:44 被阅读617次

    springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。

    1.POM配置

    只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.4.RELEASE</version>
        </parent>
    
        <properties>
            <java.version>1.8</java.version>
             <spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-aop</artifactId>
            </dependency>
         <!-- spring-kafka -->
                <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>${spring-kafka.version}</version>
                </dependency>
                <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <version>${spring-kafka.version}</version>
                <scope>test</scope>
                </dependency>
         </dependencies>
    

    2.生产者

    参数配置类,其参数卸载yml文件中,通过@Value注入

    package com.dhb.kafka.producer;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class SenderConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public Map<String,Object> producerConfigs() {
            Map<String,Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
            props.put(ProducerConfig.ACKS_CONFIG,"0");
            return props;
        }
    
        @Bean
        public ProducerFactory<String,String> producerFactory() {
            return  new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String,String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    
        @Bean
        public Sender sender() {
            return new Sender();
        }
    }
    
    

    消息发送类

    package com.dhb.kafka.producer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @Slf4j
    public class Sender {
    
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        public void send(String topic,String payload) {
            log.info("sending payload='{}' to topic='{}'",payload,topic);
            this.kafkaTemplate.send(topic,payload);
        }
    }
    
    

    3.消费者

    参数配置类

    package com.dhb.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class ReceiverConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        public Map<String,Object> consumerConfigs() {
            Map<String,Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"helloword");
            return props;
        }
    
        @Bean
        public ConsumerFactory<String,String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String,String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public Receiver receiver() {
            return new Receiver();
        }
    
    }
    

    消息接受类

    package com.dhb.kafka.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.annotation.KafkaListener;
    
    import java.util.concurrent.CountDownLatch;
    
    @Slf4j
    public class Receiver {
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        public CountDownLatch getLatch() {
            return latch;
        }
    
        @KafkaListener(topics = "${kafka.topic.helloworld}")
        public void receive(String payload) {
            log.info("received payload='{}'",payload);
            latch.countDown();
        }
    }
    

    3.web测试类

    定义了一个基于http的web测试接口

    package com.dhb.kafka.web;
    
    import com.dhb.kafka.producer.Sender;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.io.IOException;
    
    @RestController
    @Slf4j
    public class KafkaProducer {
    
        @Autowired
        Sender sender;
    
        @RequestMapping(value = "/sender.action", method = RequestMethod.POST)
        public void exec(HttpServletRequest request, HttpServletResponse response,String data) throws IOException{
            this.sender.send("testtopic",data);
            response.setCharacterEncoding("UTF-8");
            response.setContentType("text/json");
            response.getWriter().write("success");
            response.getWriter().flush();
            response.getWriter().close();
        }
    
    }
    

    4.启动类及配置

    package com.dhb.kafka;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class KafkaApplication {
    
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class,args);
    
        }
    }
    

    application.yml

    kafka:
      bootstrap-servers: 192.168.162.239:9092
      topic:
        helloworld: testtopic
    

    程序结构:

    包结构

    5.读写测试

    通过执行KafkaApplication的main方法启动程序。然后打开postman进行测试:

    Paste_Image.png

    运行后返回success

    Paste_Image.png

    生产者日志:

    Paste_Image.png

    消费者日志:

    Paste_Image.png

    相关文章

      网友评论

      本文标题:在springboot中对kafka进行读写

      本文链接:https://www.haomeiwen.com/subject/qhbmjxtx.html