美文网首页
SpringBoot 整合 Kafka 简明教程(Mac 下)

SpringBoot 整合 Kafka 简明教程(Mac 下)

作者: panzhangbao | 来源:发表于2018-12-28 09:15 被阅读156次

    Mac 下安装启动 kafka

    1. 安装 kakfa,安装过程将依赖安装 zookeeper
    brew install kafka
    

    软件位置

    /usr/local/Cellar/zookeeper
    
    /usr/local/Cellar/kafka
    

    配置文件位置

    /usr/local/etc/kafka/zookeeper.properties 
    
    /usr/local/etc/kafka/server.properties
    

    备注:后续操作均需进入 /usr/local/Cellar/kafka/xxx/bin 目录下。

    1. 启动zookeeper
    zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
    
    1. 启动kafka服务
    kafka-server-start /usr/local/etc/kafka/server.properties & 
    
    1. 创建 topic ,命名为test1
    kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
    
    1. 查看创建的topic
    kafka-topics --list --zookeeper localhost:2181  
    
    1. 生产数据
    kafka-console-producer --broker-list localhost:9092 --topic test1
    
    1. 消费数据
    kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1 --from-beginning
    

    备注:--from-beginning 将从第一个消息还是接收

    SpringBoot 整合 Kafka

    • pom.xml 添加 spring-kafka依赖,根据自身版本做调整
    <!--springboot 版本:2.1.1.RELEASE 对应 spring-kafka 版本 2.2.2.RELEASE 对应 kafka 版本 2.1.0-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.2.2.RELEASE</version>
            </dependency>
    
    • application.yml 配置,根据具体服务器填写 bootstrap-servers
    spring:
      # Kafka
      kafka:
        # Kafka 服务器地址
        bootstrap-servers: 192.168.10.31:9092
        # 生产者
        producer:
          retries: 0
          # 每次批量发送消息的数量
          batch-size: 16384
          buffer-memory: 33554432
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # key-value 序列化
        # 消费者
        consumer:
          group-id: consumer-group
          auto-offset-reset: earliest
          enable-auto-commit: true
          auto-commit-interval: 100s
          # 指定消息key和消息体的编解码方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    • KafkaProducer.java 生产者
    package tc.smartlockapplet.utils.http.kafka;
    
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import tc.smartlockapplet.utils.http.json.JSONUtils;
    
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Kafka 生产者
     *
     * Created by panzhangbao on 2018-12-27 16:57:54
     * Copyright © 2018 panzhangbao. All rights reserved.
     */
    @Component
    @EnableScheduling
    public class KafkaProducer {
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
    
        /**
         * 定时器测试 生产者发送信息
         *
         * @param
         * @return void
         * @date 2018/12/28 08:37
         * @author panzhangbao
         */
        @Scheduled(cron = "0/3 * * * * *")
        public void schedulingTest() {
            String topic = "personalInfoTopic";
            Map<String, String> messageMap = new HashMap<>();
            messageMap.put("name", "JasonPan");
            messageMap.put("motto", "If not now, when? if not me, who?");
    
    
            System.out.println("\n----------   producer   ----------\n");
            System.out.println("topic is " + topic + "\nmessage is " + messageMap + "\n");
    
            kafkaTemplate.send(topic, "personalInfo", JSONUtils.mapToJson(messageMap));
        }
    }
    
    • KafkaConsumer.java 消费者
    package tc.smartlockapplet.utils.http.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    /**
     * Kafka 消费者
     *
     * Created by panzhangbao on 2018-12-27 19:03:19
     * Copyright © 2018 panzhangbao. All rights reserved.
     */
    @Component
    public class KafkaConsumer {
        @KafkaListener(topics = {"personalInfoTopic"})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    
            if (kafkaMessage.isPresent()) {
    
                Object message = kafkaMessage.get();
    
                System.out.println("----------   consumer   ----------");
                System.out.println("record is " + record + "\nmessage is " + message);
            }
    
        }
    }
    
    • 运行项目,结果如下:


      SpringBoot 整合 Kafka 运行结果

    参考

    1. DevinShuai 的博客:Mac 安装使用kafka
    2. tzs_ 的博客:SpringBoot Kafka 整合使用

    相关文章

      网友评论

          本文标题:SpringBoot 整合 Kafka 简明教程(Mac 下)

          本文链接:https://www.haomeiwen.com/subject/qdddlqtx.html