美文网首页学习
SpringBoot 中使用Kafka的简单入门

SpringBoot 中使用Kafka的简单入门

作者: marksman_e902 | 来源:发表于2018-12-12 11:00 被阅读59次

    0.在使用Spiringboot集成Kafka之前需要将kafka的服务端程序(包括jre、zookeeper、kafka三部分)安装好。

    1.创建Springboot项目,在Spring Initializer中选择上kafka和kafka Stream的依赖项(不管有没有用先都选上)。

    2.Kafka Producer的配置和使用:

    首先时Kafka的配置文件:

    @Configuration

    @EnableKafka

    public class KafkaConfigure {

    @Bean

        public ProducerFactory producerFactory() {

    return new DefaultKafkaProducerFactory<>(producerConfigs());

    }

    @Bean

        public Map producerConfigs() {

    Map props =new HashMap<>();

    //Kafka生产者配置 

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");  

    props.put(ProducerConfig.RETRIES_CONFIG,0);

    props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

    props.put(ProducerConfig.LINGER_MS_CONFIG,1);

    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    return props;

    }

    @Bean

        public KafkaTemplate kafkaTemplate() {

    return new KafkaTemplate(producerFactory());

    }

    }

    然后是生产者发送数据的部分,这里将发送数据的部分放在控制器中,以方便测试,其中发送数据的核心代码是这一句:kafkaTemplate.send("topic_name", msg);

    @RestController

    @Api(tags={"kafka生产者"})

    @RequestMapping("kafkaProducer")

    public class KafkaProducer {

    @Autowired

        private KafkaTemplatekafkaTemplate;

    @GetMapping("send/{msg}")

    public String send(@PathVariable("msg") String msg){

    kafkaTemplate.send("topic_name", msg);//topic_name是指发送信息的目标主题

    return "success";

    }

    }

    3.Kafka Consumer的配置和使用:

    首先是消费者的配置文件:

    @Configuration

    @EnableKafka

    public class KafkaConfigure {

    @Bean

        ConcurrentKafkaListenerContainerFactory

    kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory factory =

    new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    return factory;

    }

    @Bean

        public ConsumerFactory consumerFactory() {

    return new DefaultKafkaConsumerFactory<>(consumerConfigs());

    }

    @Bean

        public Map consumerConfigs() {

    Map props =new HashMap<>();

    //消费者参数设置

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

    props.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return props;

    }

    @Bean

        public KafkaProperties.Listener listener() {

    return new KafkaProperties.Listener();

    }

    }

    然后是消费者接收数据的部分:

    @Component

    public class KafkaConsumer {

    @KafkaListener(topics ="topic_name")

    public void listen (ConsumerRecord record)throws Exception {

    System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());

    }

    }

    4.让程序跑起来:在使用java进行kafka操作之前需要依次将zookeeper和kafka启动,并且新建消息推送的topic(本文topic是“topic_name”),启动kafka消费者端和生产者端,注意消费者和生产者指定的topic要一致,在生产者的控制器发送一条数据,消费者的listen方法就能够接收到了。

    参考文章:https://docs.spring.io/spring-kafka/reference/htmlsingle/#_with_java_configuration

    相关文章

      网友评论

        本文标题:SpringBoot 中使用Kafka的简单入门

        本文链接:https://www.haomeiwen.com/subject/siruhqtx.html