美文网首页kafkaspringboot
Kafka Java Client基本使用及整合SpringBo

Kafka Java Client基本使用及整合SpringBo

作者: QuoVadis_k | 来源:发表于2020-08-02 13:31 被阅读0次

    本文所有代码均在Github上:

    https://github.com/kevinwang0224/kafka-java-demo

    https://github.com/kevinwang0224/kafka-springboot-demo


    kafka-clients

    添加依赖

     <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
    </dependency>
    

    消费者 Consumer

    代码上总体可以分为三部分:

    1. 消费者的配置
      1. 消费者的配置在 org.apache.kafka.clients.consumer.ConsumerConfig 类中都有列举包括每个配置项的文档说明
    2. 创建消费者实例并订阅topic
    3. 消费消息

    代码如下:

    // 1. 配置
    Properties properties = new Properties();
    //bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    // key.deserializer 消息key序列化方式
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // value.deserializer 消息体序列化方式
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // group.id 消费组id
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
    // enable.auto.commit 设置自动提交offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // auto.offset.reset
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
    // 2. 创建消费者实例并订阅topic
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    String[] topics = new String[]{"demo-topic"};
    consumer.subscribe(Arrays.asList(topics));
    
    // 3. 消费消息
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record);
      }
    }
    

    生产者 Producer

    生产者这块的代码基本上和消费者的结构一样,不同的是,producer 的发消息的部分:

    1. 生产者的配置
      1. org.apache.kafka.clients.producer.ProducerConfig 类中也都有列举
    2. 创建生产者实例
    3. 发送消息到 topic
      1. 异步发送消息 producer.send(new ProducerRecord<>("demo-topic", data))
      2. 同步发送消息 ,使用 Future.get() 阻塞接收
      3. 异步发送消息,回调的方式

    整体代码如下

    // 1. 配置
    Properties properties = new Properties();
    // bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    // key.deserializer 消息key序列化方式
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // value.deserializer 消息体序列化方式
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    
    // 2. 创建生产者实例
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
    // 3. 发送消息
    
    // 0 异步发送消息
    for (int i = 0; i < 10; i++) {
      String data = "async :" + i;
      // 发送消息
      producer.send(new ProducerRecord<>("demo-topic", data));
    }
    
    // 1 同步发送消息 调用get()阻塞返回结果
    for (int i = 0; i < 10; i++) {
      String data = "sync : " + i;
      try {
        // 发送消息
        Future<RecordMetadata> send = producer.send(new ProducerRecord<>("demo-topic", data));
        RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    
    
    // 2 异步发送消息 回调callback()
    for (int i = 0; i < 10; i++) {
      String data = "callback : " + i;
      // 发送消息
      producer.send(new ProducerRecord<>("demo-topic", data), new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
          // 发送消息的回调
          if (exception != null) {
            exception.printStackTrace();
          } else {
            System.out.println(metadata);
          }
        }
      });
    }
    
    producer.close();
    

    整合SpringBoot

    添加依赖

    <parent>
      <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.3.2.RELEASE</version>
      <relativePath/> <!-- lookup parent from repository -->
    </parent>
    
    ....
    ....
    
    
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--kafka starter-->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <!--方便测试用-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <scope>test</scope>
    </dependency>
    

    代码

    # application.yml
    spring:
      kafka:
        bootstrap-servers: 127.0.0.1:9092
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: test-group
    
    // 启动类
    @SpringBootApplication
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    }
    
    // 消费者
    @Component
    public class Consumer {
    
        @KafkaListener(topics = { "test-topic" })
        public void receiveMessage(String message) {
            System.out.println(message);
        }
    }
    
    // 生产者
    @Component
    public class Producer {
    
        @Resource
        KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    }
    
    // 测试
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DemoApplicationTests {
    
        @Autowired
        private Producer producer;
    
        @Test
        public void send() {
            producer.sendMessage("test-topic", "test-message");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    总结

    整合SpringBoot之后的代码还是非常简洁的,不过还是要熟悉原生API,这样才能在实际项目中遇到问题时游刃有余。

    相关文章

      网友评论

        本文标题:Kafka Java Client基本使用及整合SpringBo

        本文链接:https://www.haomeiwen.com/subject/czcjrktx.html