美文网首页
Kafka学习

Kafka学习

作者: 帆子_8c3a | 来源:发表于2020-07-13 00:48 被阅读0次

    Kafka 核心概念

    Message

    Key+Value ,根据路由规则发往Broker

    Topic和Log

    image.png
    • 同一个topic会在不同Broker上分成多个partition
    • 每个partition对应一个Log,每个Log存储时候分成多个Segment。
    • 写操作都是追加写
    • 每个Segment默认1G,写满以写下一个Segment
    • 用一个索引文件对Log进行索引,不是每条记录都有索引,而是稀疏记录的。

    Topic的保存策略

    • 根据retention
    • 根据日志文件大小的阈值
    • Log Compact策略

    Broker

    Replica

    • 每个partition可以有1个或多个replica
    • 其中一个选做lead,其实为follower
    • Follower主动去从lead拉取最新数据,更新自己本地Log


      image.png

    ISR Table

    • ISR - In Sync Replica, 和Lead进度接近的Follower
      • 需要和zookeeper保持连接
      • 本地offset和Lead的offset的差值小于阈值
    • 每个Lead维护ISR Table,对于失去连接的,或者catch up慢的replica,从ISR Table中删掉

    HW和LEO

    • HW : High Watermark, 在ISR Table中最慢的那个offset决定了HW
    • LEO : Log end offset, 消费者提交的职位
    • 消费者只能拉取HW标记之前的数据, 之后的数据对消费者不可见
    • HW之前的数据被称作committed的数据


      image.png

      offset为11的数据最终变成commit状态

    Cluster和Controller

    • 多个Broker组成Cluster
    • 每个Cluster有一个Controller,管理replica状态,监听zookeeper上数据变化。Controller的竞选由Zookeeper帮忙选择。

    Producer

    Properties props = new Properties();
    props.put("client.id", InetAddress.getLocalHost().getHostName());
    props.put("bootstrap.servers", "host1:9092,host2:9092");
    props.put("key.serializer", ...);
    props.put("value.serializer", ...);
    
    producer = new KafkaProducer<K, V>(props);
    
    ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
    Future<RecordMetadata> future = producer.send(record);
    

    Consumer

    • offset可以由消费者自己维护,也可以由Kafka集群帮忙维护


      image.png

    Consumer Group

    image.png
    • 每个Consumer只能属于一个Consumer Group
    • 每个partition只能被Consumer Group中的一个Consumer消费
    Properties props = new Properties();
    props.put("bootstrap.servers", "host1:9092,host2:9092");
    props.put("group.id", "test"); //group id
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    
    consumer = new KafkaConsumer<K, V>(props);
    consumer.subscribe(Arrays.asList("test1","test2"));
    
    ConsumerRecords<K, V> records = consumer.poll(100);
    

    Rebalance

    image.png image.png

    协议

    • Kafka自定义了进程间通信API的格式,大约有40多个API。有的是Broker之间发送的,有的是Producer和Broker之间的,有的是Consumer和Broker之间。
    • 每个API包括了Request和Response,每个API会自带版本信息

    Produce过程

    • Producer Client会定期(每隔5分钟)MetaData请求给Broker(选择负载最小的一个),获取Broker的分布,Partition的分布情况
    • Producer Client按照路由规则,将消息路由到相应partition上

    Produce过程的传递保证语义(Delivery guarantee semantic)

    • 默认时,Kafka的Produce保证了At Least Once语义。
    • 由于网络原因没有响应,导致Client会重发消息。不会丢数据,但可以能会有重复数据。
    • 在0.11版本中实现了幂等性, 幂等性 + At Least Once = Exactly Once

    Produce的幂等性的实现

    • 每次建立连接的时候,向Broker发送一个InitProducerId命令,获取Cluster全局唯一的Produce ID
    • 每次发送消息的时候,每条消息会自带sequence number,Broker判断Produce ID+sequence number出现过,即认定已经发过此条消息。
    • 添加InitProducerId命令,修改Produce命令
    • 下面代码打开Produce的幂等性
    props.put("enable.idempotence", "true");
    

    Consume过程的传递保证语义(Delivery guarantee semantic)

    • 先消息处理,然后commit offset,但commit失败,会导致At Least Once
      • 如果消息处理实现幂等性,可以认为整体是Exactly Once的
    • 先commit offset,然后处理消息,但处理消息失败,会导致At Most Once
    • 高版本Kafka已经通过2PC实现了Exactly Once

    Consume使用Transaction

    Properties props = new Properties();
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("client.id", "ProducerTranscationnalExample");
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "test-transactional");
    props.put("acks", "all");
    KafkaProducer producer = new KafkaProducer(props);
    producer.initTransactions();
    
    String msg = "matt test";
    producer.beginTransaction();
    producer.send(new ProducerRecord(topic, "0", msg.toString()));
    producer.send(new ProducerRecord(topic, "1", msg.toString()));
    producer.send(new ProducerRecord(topic, "2", msg.toString()));
    producer.commitTransaction();
    
    image.png

    Consume的具体过程

    • 默认每间隔5分钟会发送一次 MetaData 命令
    • 每个Consumer member向Cluster中一台(选择负载最低的一个),发送GroupCoordinator命令,获取哪个broker是Coordinator。
    • 每个Consumer member向Coordinator发送JoinGroup命令, GroupCoordinator在给所有的member返回时,只有一个member被标记为Lead
    • 每个Consumer member定期向Coordinator发送HeartBeat命令
    • 每个Consumer member向Coordinator发送SyncGroup命令,其中Lead会发送如何分组的信息
    • 当Coordinator发现HeartBeat停止,或者由member发送LeaveGroup命令,触发reblance操作,重新分配分组
    • Member发送OffsetCommit命令
    • Member发送OffsetFetch命令
    • Member发送Fetch命令消费message

    Log的存储

    image.png

    Log的Index

    image.png

    reading list

    rd_kafka_subscribe => RD_KAFKA_OP_SUBSCRIBE
      rd_kafka_cgrp_subscribe
        rd_kafka_cgrp_join
          set state to RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN
          rd_kafka_JoinGroupRequest
    
    
    
    
    rd_kafka_cgrp_handle_JoinGroup
    rd_kafka_SyncGroupRequest
    rd_kafka_handle_SyncGroup
    

    相关文章

      网友评论

          本文标题:Kafka学习

          本文链接:https://www.haomeiwen.com/subject/pbpncktx.html