美文网首页
Mac kafka安装

Mac kafka安装

作者: matthewfly | 来源:发表于2020-11-12 10:37 被阅读0次

    brew直接安装,包含了zookeeper:

    brew install kafka
    

    安装完成后会提示启动命令:

    //zookeeper启动
    To have launchd start zookeeper now and restart at login:
      brew services start zookeeper
    Or, if you don't want/need a background service you can just run:
      zkServer start
    //kafka启动
    To have launchd start kafka now and restart at login:
      brew services start kafka
    Or, if you don't want/need a background service you can just run:
      zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
    

    其中安装路径:

    ==> Summary
    🍺  /usr/local/Cellar/kafka/2.6.0
    

    配置路径:

    /usr/local/etc/kafka/zookeeper.properties
    /usr/local/etc/kafka/server.properties
    

    输入命令brew services start zookeeper先启动zk,brew services start kafka启动kafka。可以查看配置文件,zk默认端口2181,Kafka默认端口9092。
    进入kafka安装目录/usr/local/Cellar/kafka/2.6.0/bin,并创建一个topic:

    kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test1
    

    查看topic创建情况:

    bin % kafka-topics --list --zookeeper localhost:2181                                                      
    test1
    

    创建生产者:

    kafka-console-producer --broker-list localhost:9092 --topic test1
    

    创建消费者:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
    

    生产者端输入并回车,可以看到消费者端收到消息,说明正常能消费。

    在springboot中使用。
    pom.xml中引入kafka依赖:

           <!-- kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    

    yml中添加kafka配置:

    spring:
        kafka:
            consumer:
                bootstrap-servers: localhost:9092
                client-id: test-client
                group-id: test-consumer
                key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

    添加生产者producer接口:

    @Controller
    public class TestController {
        @Resource
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        @GetMapping("/test/kafka")
        public void send(String msg) {
            kafkaTemplate.send("test1", msg);
        }
    }
    

    添加消费者监听:

    @Component
    public class TestKafkaListener {
        @KafkaListener(id = "c_1", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
        public void partition0(String msgData) {
            System.out.println("demo3 receive : " + msgData + ", partition: 0" );
        }
    
        @KafkaListener(id = "c2", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))})
        public void partition1(String msgData) {
            System.out.println("demo3 receive : " + msgData + ", partition: 1" );
        }
    
        @KafkaListener(id = "c3", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "0"))})
        public void listenPartitionOnly(String msgData) {
            System.out.println("demo3 receive : " + msgData + ", partition: 2" );
        }
    }
    

    注解中必须指定topic,可以指定partition和开始消费的消息offset。
    启动项目,并在浏览器访问:http://localhost:8080/test/kafka?msg=123,可以看到消费者收到数据:

    demo3 receive : 123, partition: 2
    

    相关文章

      网友评论

          本文标题:Mac kafka安装

          本文链接:https://www.haomeiwen.com/subject/fububktx.html