美文网首页
kafka使用的一些坑

kafka使用的一些坑

作者: 楼兰King | 来源:发表于2021-04-21 23:24 被阅读0次

    在安装及测试过程中,可能会发生些错误,那是因为新版本的kafka的一些命令发生了改变。

    本人使用的是2.5.0版本

    创建消费者报错
    错误一:disconnected
    WARN [Consumer clientId=consumer-console-consumer-47753-1, groupId=console-consumer-47753] Bootstrap broker 127.0.0.1:2181 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
    在做kafka测试的时候,使用命令bin/kafka-console-consumer.sh --zookeeper

    原因及解决办法
    新版本不适用端口2181,而是9092

    错误二:zookeeper is not a recognized option
    zookeeper is not a recognized option

    原因及解决办法
    bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --tpoic Rising-topic --from-beginning
    启动消费者,发现一直报错consumer zookeeper is not a recognized option,发现在启动的时候说使用 --zookeeper是一个过时的方法,此时,才知道原来在最新的版本中,这种启动方式已经被删除了,

    0.90版本之后启动消费者的方法

    bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
    zookeeper如何查看kafka的节点信息及查看方式
    详细可阅览这篇博文

    如何知道zk是否正常启动
    有时启动zk,显示started,但其实zk并不成功启动,可采用如下方式:
    telnet 127.0.0.1 2181
    然后输入stat,出现下面这些信息即正常启动


    image.png

    遇到一些问题可阅览这篇博文

    如何远程连接kafka
    kafka部署在阿里云服务器,本地java代码如何连接测试呢?
    问题一:阿里云服务器安全组问题
    阿里云有自己的安全组,禁止访问某些端口,明明防火墙已经关了,但除了22端口其他依然无法访问。
    可以参考下这篇博文

    问题二:无法解析主机名:java.net.UnknownHostException: xxxxxxx
    打开linux下的etc目录下的hosts文件,另起一行添加ip+域名(ip和域名之间使用空格分开),这里的域名就是阿里云的主机名。
    修改了可能不会立即生效,我是重启服务器之后才行的。

    问题三:远程连接问题
    本地java测试显示
    Connection refused: no further information

    这是需要修改配置文件/config/server.properties
    advertised.listener=PLAINTEXT://阿里云外网IP:9092

    把这一项注释打开,就可以了
    其他博文说创建host.name=外网地址,老版本是可以,但新版本没有host.name这一项,加上去kafka会无法启动,在新属性advertised.listener修改就可以了。

    Springboot集成Kafka
    引入依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.3.0.RELEASE</version>
    </dependency>
    
    

    注意这里的依赖是有对应关系的,随便对应有可能会出错,对应关系可见Kafka官网,截图如下:


    image.png

    配置文件application.yml

    server:
      port: 8081
    
    spring:
      application:
        name: producer
      kafka:
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: Springboot-groupid
          auto-offset-reset: earliest
          enable-auto-commit: true
        bootstrap-servers: IP地址:9092
    

    可使用KafkaTemplate,producer举例:

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * @author Rising
     * @date 2020/4/23
     */
    @Component
    public class Producer {
    
        @Autowired
        KafkaTemplate kafkaTemplate;
    
        public void send(){
            kafkaTemplate.send("topic", "msgKey", "msgData");
        }
    }
    
    

    consumer举例:

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    import java.util.Optional;
    
    /**
     * @author Rising
     * @date 2020/4/23
     */
    @Component
    public class Consumer {
        @KafkaListener(topics = {"topic"})
        public void listener(ConsumerRecord record){
            Optional msg = Optional.ofNullable(record.value());
            if (msg.isPresent()){
                System.out.println(msg.get());
            }
        }
    }
    

    启动application即可

    /**
     * @author Administrator
     */
    @SpringBootApplication
    public class ProducerApplication {
        public static void main(String[] args) throws InterruptedException {
            ConfigurableApplicationContext context = SpringApplication.run(ProducerApplication.class, args);
            Producer producer = context.getBean(Producer.class);
            for (int i = 0; i < 10; i++) {
                producer.send();
                TimeUnit.SECONDS.sleep(2);
            }
        }
    }
    
    

    ———————————————————华丽的分割线—————————————————————

    Kafka的一些小知识
    生成写入流程
    producer先从Zookeeper的节点找到改partition分区的Leader
    producer将消息发送给leader
    leader将消息写入本地log
    followers从leader pull消息
    写入本地log后向leader发送ack
    leader收到所有replication的ack后并向producer发送ack

    broker保存消息及存储策略
    存储方式

    物理上把topic分成一个或多个paticition

    存储策略

    无论消息是否被消费,kafka都会保存所有消息,有两种策略可以删除旧数据:

    1.基于时间:168小时
    2.基于大小:1G
    kafka读取特定消息复杂度为O(1),删除过期文件并不能提高Kafka性能。

    tips:producer不在zk中注册,消费者在zk注册

    消费流程
    高级API
    不能自己控制offset,不能重复读

    低级API
    可以自己控制offset,但过于复杂

    相关文章

      网友评论

          本文标题:kafka使用的一些坑

          本文链接:https://www.haomeiwen.com/subject/xubtrltx.html