美文网首页程序员从入门到放弃
Spring Boot整kafka的极简操作

Spring Boot整kafka的极简操作

作者: 虾游于海 | 来源:发表于2020-07-03 13:58 被阅读0次

    kafka应用最少需要两部分,一部分是producer,另外一部分是consumer,这两部分可以在一个应用中,也可以不在一个应用中。在通常情况下,为了消费性能,可能需要多个消费者,也可能需要多个生产者,而消费者和生产者可能处于不同的位置或者环境,所以本示例将生产者和消费者放在不同的应用中。

    生产者端

    引入依赖

    在spring boot中使用kafka生产者端,需要引入如下依赖

     <dependencies>
        ......
        <!--Spring 的kafka依赖-->
        <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
        </dependency>
        ......
      </dependencies>
    

    启用kafka支持

    在项目中的配置类,或者启动类上增加@EnableKafka,它会帮助我们创建一些必要的Bean,包括KafkaTemplate,KafkaMessageListenerContainer等

    @EnableKafka
    public class KafkaConfig{
    }
    

    修改连接配置

    修改producer的配置文件,配置连接地址

    spring:
      kafka:
        # 指定kafka集群地址,多为地址用逗号分割
        bootstrap-servers: dm105:9092,dm106:9092,dm107:9092
    

    创建Topic

    如果你的topic还未在kafka中创建,则可以使用spring-boot自动创建主题,只需创建一个类型为NewTopic的bean,并指定topic相关的信息即可

        /**
         * 新建一个主题
         *
         * @return
         */
        @Bean
        public NewTopic testTopic() {
            return TopicBuilder.name("test")// 指定主题名称
                .partitions(30) // 指定分区数量,这个数量通常要大于消费者的数量,按消费者线程数计算
                .replicas(2) // 指定副本数量
                .compact()
                .build();
        }
    

    基本的配置已经完成,接下来就是发送消息了。

    使用KafkaTemplate发送消息

    接下来使用kafkaTempalte发送消息到服务端,以下是一个极简示例

    /**
     * 使用Spring boot test测试消息发送
     */
    @SpringBootTest
    class KafkaDemoApplicationTests {
    
        /**
         * 注入KafkaTemplate,用于发送消息
         */
        @Autowired
        private KafkaTemplate template;
    
        @Test
        public void newMessage() {
            System.out.println("start at " + ZonedDateTime.now() + "");
    
            for (int i = 0; i < 1000000; i++) {
                long now = System.currentTimeMillis();
                // 调用template,将消息发送到kafka
                // 第一个参数是topic名称,第二个参数是要发送的消息内容
                template.send("test", "adg" + now);
            }
        }
    }
    

    消费者端

    根据上面的生产者,需要一个消费者来消费生产者生产的数据。spring boot整合kafka的消费者也非常方便

    引入依赖

    生产者和消费者的依赖是一致的。在此不再赘述

    启用kafka支持

    该操作和生产者应用一致,不再赘述。

    修改连接配置

    消费者的配置需要除了需要指定连之外,最好指定一些额外的配置参数,以便提高消费者性能

    spring:
      kafka:
        # 指定kafka集群地址
        bootstrap-servers: dm105:9092,dm106:9092,dm107:9092
        consumer:
          # 如果两个应用程序为并行消费某个topic的消息,需要将两个应用的group-id指定一致
          group-id: "message-group"
        listener:
          # 指定消息消费的模式,type=batch代表可以批量消费
          type: batch
          # 指定消费者的并发数,也就是可以同时有多少个消费者线程在监听数据,默认为1,
          # 更具情况设置并行数据,通常建议最小为Cpu的核心数
          concurrency: 16
    

    创建消费者

    消费者的就是一个普通的Spring bean.在对应的方法上添加@KafkaListener注解,并指定需要消费的topic即可开始消费者监听。

    @Component
    public class Consumer {
        
        /**
         * 注入repository,用户数据持久化(略)
         */
        @Autowired
        private MessageRepository repository;
    
        /**
         * 使用@KafkaListener注解标记消费方法,指定topics属性指定监听的待消费topic
         *
         * @param messages 待消费的数据,由于启用了批量消费模式,所以监听获取到的是一个集合
         */
        @KafkaListener(topics = {"test"})
        @Transactional
        public void test(List<String> messages) {
            List<Message> result = messages.stream().map(Message::new).collect(Collectors.toList());
            repository.saveAll(result);
            System.out.println("save message [" + messages.size() + "] 条 at" + ZonedDateTime.now().toString());
        }
    }
    

    测试项目

    当项目构建完成之后,可以按照如下步骤来测试项目

    1. 启动消费者程序
    2. 执行生产者测试代码,观察生产者执行结果

    项目仓库地址

    完整项目参考https://github.com/ldwqh0/hadoop-demo

    相关文章

      网友评论

        本文标题:Spring Boot整kafka的极简操作

        本文链接:https://www.haomeiwen.com/subject/zgdsqktx.html