美文网首页
kafka 学习笔记 3 - Java 使用 kafka 收发消

kafka 学习笔记 3 - Java 使用 kafka 收发消

作者: 张云飞Vir | 来源:发表于2021-07-18 09:54 被阅读0次

    1. 背景

    本文简述 kafka 的相关内容。

    2.知识

    更多基础知识见:https://www.jianshu.com/p/bee2152f476c
    如何安装 kafka 见:https://www.jianshu.com/p/8a076052a9ad

    3. 示例

    3.1 配置一个“生产者”

    1、添加依赖
    新建一个项目,并添加依赖:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    2、配置kafka的服务地址
    在配置文件 application.yml 中配置。

    server:
      port: 8081
    
    spring:
      application:
        name: "producer"
      kafka:
        bootstrap-servers: "localhost:9092"
    
    

    3、创建topic
    我使用 java 来创建 topic ,注入一个 NewTopic 对象即可。

    @Component
    public class KafkaConfig {
        private static final String TOPIC_NAME = "topic2";
    
        // 创建一个主题 topic
        @Bean
        public NewTopic topic1() {
            return TopicBuilder.name(TOPIC_NAME)
                    .partitions(1)
                    .replicas(1)
                    .compact()
                    .build();
        }
    
    }
    

    4、发送消息
    首先需要注入一个 kafkaTemplate 对象。这个是个 kafka 基础操作的模板方法类。springboot 框架已经帮忙配置好了,直接注入即可。

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    

    然后直接 send 发送消息

    private static final String TOPIC_NAME = "topic2";
    
    kafkaTemplate.send(TOPIC_NAME, data);
    
    

    就是这么简单省事。

    3.2 配置一个“消费者 ”

    1、添加依赖
    新建一个项目,并添加依赖同上。

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    2、配置kafka服务器地址
    修改 application.yml ,示例:

    spring:
      kafka:
        bootstrap-servers: "localhost:9092"
        consumer:
          group-id: "myGroup1"
          client-id: "myGroup1"
    
    
    

    3、监听消息

    KafkaListener 这个注解 指示到一个方法上即可。
    格式:

    @KafkaListener(topics = TOPIC_NAME)
    public void someOne(String content){
      ....
    }
    

    我的示例:

    @Component
    public class MyKafkaConsumer {
        private static final String TOPIC_NAME = "topic2";
    
        @KafkaListener(topics = TOPIC_NAME)
        public void processMessage(ConsumerRecord<String, String> record) {
            System.out.println(String.format("# record: %s", record));
            System.out.println(String.format("\t\t# 收到消息: %s", record.value()));
        }
    
    }
    
    

    4. 扩展

    Spring-kafka 的文件值得一下看:https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics

    我的代码示例见:https://github.com/vir56k/java_demo/tree/master/kafka_demo1

    5. 参考

    Springboot 官网文档介绍
    https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.spring-application

    相关文章

      网友评论

          本文标题:kafka 学习笔记 3 - Java 使用 kafka 收发消

          本文链接:https://www.haomeiwen.com/subject/nzpopltx.html