美文网首页
Kafka生产和消费

Kafka生产和消费

作者: 五月笙 | 来源:发表于2020-11-15 18:58 被阅读0次

    在演示生成与消费消息之前,需要创建一个主题作为消息载体。Kafka提供了需要实用的脚本工具,存放在$KAFKA_HOME/bin目录下,其中与主题有关的就是kafka-topics.sh。

    创建主题

    [#] bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 1 --partitions 4
    Created topic topic-demo.
    

    上面为单机模式下创建了一个分区数为4、副本因子为1的主题topic-demo,关键参数说明:

    --zookeeper:Kafka所连接的ZooKeeper服务地址
    --topic:所要创建主题的名称
    --replication-factor:指定副本因子
    --partitions:指定分区个数
    --create:创建主题的动作指令
    --describe:展示主题更多信息的动作指令
    

    展示主题信息指令:

    [#] bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-demo
    Topic: topic-demo   PartitionCount: 4   ReplicationFactor: 1    Configs:
        Topic: topic-demo   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
        Topic: topic-demo   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
        Topic: topic-demo   Partition: 2    Leader: 0   Replicas: 0 Isr: 0
        Topic: topic-demo   Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    

    生产和消费

    创建主题topic-demo后,需要检验Kafka是否可以正常的发送和消费消息。相应的,在$KAFKA_HOME/bin目录下提供了收发消息的脚本工具:kafka-console-consumer.sh和kafka-console-producer.sh。
    启动生产者(consumer)生产消息:

    [#] bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
    >Hello,Kafka!
    >
    

    启动消费者(producer)订阅主题:

    [#] bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
    
    Hello,Kafka!
    

    发现消费者shell终端出现了刚刚生产者生产的消息 “Hello,Kafka!”,其中关键参数:

    --broker-list:指定连接的Kafka地址
    --bootstrap-server:同样为指定连接的Kafka地址
    --topic:发送或者消费的主题
    

    代码示例

    Kafka的脚本工具一般用来做一些测试类的工作,在实际做复杂的的与业务逻辑相关的消息生产与消费的工作,那就需要通过编程手段来实施。

    spring init -b=2.2.7.RELEASE -j=1.8 -d=web,kafka --build=gradle my-kafka && cd my-kafka
    

    项目配置修改:

    vim src/main/resources/application.properties
    
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    server.port=8001 
    

    代码如下:

    vim src/main/java/com/example/mykafka/DemoApplication.java
    
    package com.example.mykafka;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    }
    

    增加一个接口,方便消息发送:

    vim src/main/java/com/example/mykafka/DemoController.java
    
    package com.example.mykafka;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author by Remer
     * @date on 2020-11-17
     */
    @RestController
    public class DemoController {
    
        @Autowired
        private KafkaTemplate<Object, Object> template;
    
        @GetMapping(path = "/send/msg/{what}")
        public void sendMessage(@PathVariable String what) {
            this.template.send("topic1", new Message(what));
        }
    }
    

    消息消费相关逻辑:

    vim src/main/java/com/example/mykafka/KafkaConsumer.java
    
    package com.example.mykafka;
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.context.annotation.Bean;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
    import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
    import org.springframework.kafka.support.converter.RecordMessageConverter;
    import org.springframework.kafka.support.converter.StringJsonMessageConverter;
    import org.springframework.stereotype.Component;
    import org.springframework.util.backoff.FixedBackOff;
    
    /**
     * @author by Remer
     * @date on 2020-11-17
     */
    @Component
    public class KafkaConsumer {
    
        private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
        private final TaskExecutor exec = new SimpleAsyncTaskExecutor();
    
        @KafkaListener(id = "messageGroup", topics = "topic1")
        public void listen(Message msg) {
            logger.info("Received: " + msg);
            if (msg.getContent().startsWith("fail")) {
                throw new RuntimeException("failed");
            }
            this.exec.execute(() -> System.out.println("Hit Enter to terminate..."));
        }
    
        @Bean
        public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
            return new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(template), new FixedBackOff(1000L, 2));
        }
    
        @Bean
        public RecordMessageConverter converter() {
            return new StringJsonMessageConverter();
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("topic1", 1, (short) 1);
        }
    
        @Bean
        public NewTopic dlt() {
            return new NewTopic("topic1.DLT", 1, (short) 1);
        }
    
        @Bean
        public ApplicationRunner runner() {
            return args -> {
                System.out.println("Hit Enter to terminate...");
                System.in.read();
            };
        }
    }
    

    消息主体:

    vim src/main/java/com/example/mykafka/Message.java
    
    package com.example.mykafka;
    
    /**
     * @author by Remer
     * @date on 2020-11-17
     */
    public class Message {
    
        private String content;
    
        public Message() {
    
        }
    
        public Message(String content) {
            this.content = content;
        }
    
        public String getContent() {
            return this.content;
        }
    
        @Override public String toString() {
            return "Message{" +
                "content='" + content + '\'' +
                '}';
        }
    }
    
    

    运行:

    ./gradlew bootrun
    curl localhost:8001//send/msg/HelloWorld
    

    查看消息接收:

    com.example.mykafka.KafkaConsumer        : Received: Message{content='HelloWorld'}
    Hit Enter to terminate...
    

    参考

    Spring Boot CLI
    spring-kafka

    相关文章

      网友评论

          本文标题:Kafka生产和消费

          本文链接:https://www.haomeiwen.com/subject/vhvebktx.html