90 kafka

作者: 滔滔逐浪 | 来源:发表于2021-05-12 22:15 被阅读0次

    1,kafka在大数据的应用场景;
    2,kafka为什么性能非常高
    2,kafka常见名词和概念
    4,Partition/offset/Replica作用;
    5,win与linux 环境安装kafka注意事项;
    6,Springboot整合kafka注意事项
    7,kafka生产者投递与消费消息原理
    8,消费者对offset偏移量有哪些作用
    9 kafka消费者组消费的概念
    10 offsetReset 三种模式 复位
    11 为什么kafka需要设计Partition
    12 kafka 如何保证消息顺序一致性问题。

    什么是消息中间件:
    消息中间件基于队列模型实现异步/同步传输数据
    作用: 可以实现支撑高并发,异步解耦,流量消峰,降低耦合度,

    传统的http请求存在哪些缺点:

    1,http请求基于请求响应的模型,在高并发的情况下,客户端发送大量的请求达到服务器端有可能会导致我们的服务器端处理请求堆积;
    2 Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将请求到缓存到队列里,如果请求堆积过多的情况下,有可能会导致tomcat服务器奔溃的问题。
    所以一般都会在nginx入口实现限流,整合服务保护框架

    image.png

    3,http请求处理业务逻辑如果比较耗时的情况下,容易造成客户端一直等待,阻塞等待过程中会导致客户端超时发生重试策略,有可能引发幂等性问题。

    注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑单独交给多线程或者是mq处理。
    mq应用场景有哪些?

    1,异步发送短息
    2,异步发送新人优惠券;
    3,处理一些比较耗时的操作

    1,异步发送短信,
    2,异步发送优惠券
    3,比较耗时的操作。
    为什么需要使用mq

    可以实现支撑高并发,异步解耦,流量消峰,降低耦合度

    同步发送http请求:


    image.png

    客户端发送请求到达服务器端,服务器端实现会员注册业务逻辑;
    1.insertMember() --插入会员数据 1s
    2.sendSms()----发送登陆短信提醒 3s
    3.sendCoupons()----发送新人优惠券 3s
    总共响应需要6s时间,可能会导致客户端阻塞6s时间,对用户体验
    不是很好。

    多线程与MQ方式实现异步?
    互联网项目
    客户端 安卓/ios
    最好使用mq实现异步;

    多线程处理业务逻辑:

    image.png

    用户向数据库中插入一条数据之后,在单独开启一个线程异步发送短信和优惠操作。
    客户端只需要等待1s时间
    优点:适合于小项目 实现异步
    缺点:有可能会消耗服务器cpu资源资源

    Mq处理业务逻辑


    image.png

    Mq与多线程之间的区别
    MQ可以实现异步/解耦/流量消峰问题;

    多线程也可以实现异步,但是消耗到cpu资源,没有实现解耦。

    Mq消息中间件名词:
    Producer生产者: 投递消息到MQ服务器端。
    Consumer 消费者: 从MQ服务器端获取消息处理业务逻辑;
    Broker MQ服务器端
    Topic 主题: 分类业务逻辑发送短息主题,发送优惠券主题;
    Queue 存放消息模型 队列 先进先出,后进后出原则 数组/链表
    Message生产者投递消息报文:json

    MQ设计基础知识
    多线程版本mq
    基于网络通信版本mq netty实现;

    
    
    package com.mayikt.day01;
    
    import com.alibaba.fastjson.JSONObject;
    
    import java.util.concurrent.LinkedBlockingDeque;
    
    /**
    *@title: ThreadMQ
    *@description; 
    *@author wangj
    *@date 2021/5/12 22:02
    */  
    
    public class ThreadMQ {
        private  static LinkedBlockingDeque<JSONObject> broker=new LinkedBlockingDeque<JSONObject>();
    
        public static void main(String[] args) {
            Thread producer=new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            Thread.sleep(1000);
                            JSONObject data=new JSONObject();
                            data.put("phone","18747547454");
                            broker.offer(data);
                        }catch (Exception e){
                            e.getCause();
                        }
                    }
                }
            },"生产者");
    
            producer.start();
            Thread consumer=new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
    
                            JSONObject data=broker.poll();
                            if(data !=null){
                                System.out.println(Thread.currentThread().getName()+",获取到数据:"+data.toJSONString());
                            }
                        }catch (Exception e){
                            e.getCause();
                        }
    
                    }
                }
            },"消费者");
            consumer.start();
    
    
        }
    
    
    
    
    }
    
    
    

    基于netty实现mq
    消费者netty 客户端与 nettyServer 端MQ 服务器端保持长连接。MQ服务器端保存消费者连接
    生产者netty客户端发送请求给 nettyServer端MQ服务器,MQ服务器端在将该消息内容发送给消费者:

    image.png

    body:{"msg":{"userId":"123456","age":"23"},"type":"producer",”topic”:””}

    生产者投递消息给MQ服务器端,MQ服务器端需要缓存该消息
    如果mq服务器端宕机后,消息如何保证不丢失
    1,持久化机制
    如果mq接收到生产者投递消息,如果消费者不在的情况下,该消息是否会丢失?
    不丢失,消息确认机制 必须要消费者消费消息成功后,在通知给mq服务器端,删除该消息。
    MQ服务器端将该消息推送给消费者;
    消费者已经和mq服务器保持长连接。
    消费者主动拉取消息;
    消费者第一次启动的时候
    MQ如何实现高并发思想
    MQ消费者根据自身能力情况,拉取mq服务器端消息消费;
    默认情况下是拉取一条消息。
    缺点: 存在延迟的问题
    需要考虑mq消费者提高速度的问题

    如何消费者提高速率:消费者实现集群、消费者批量获取消息即可。

    Kafak基本的概念:
    Kafka是由Apache软件基金会开发的一个开源处理平台,由Scala和java编写。kafka是一种高吞吐量的分布式发布订阅消息系统,他可以处理消费者在网站中的所有动作流数据。
    这种动作(网页浏览,搜索和其他用户的行动)是在现在网络上的许多社会功能的一个关键因素,这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像handop 一样的日志数据和离线分析系统,但又要求时时处理的限制,这是一个可行的解决方案。kafka的目的是通过handoop的并行加载机制来统一线下和离线的消息处理,也是为了通过集群来提供时时的消息

    百度百科:https://baike.baidu.com/item/Kafka/17930165

    为什么kafka性能非常高

    1,顺序读写;
    2,零拷贝属于linux操作系统内核自带
    3,消息压缩
    4,分批发送;
    5,生产者投递消息,批量投递,‘

    kafak架构设计:

    kafka 使用场景:
    1,活动跟踪
    kafka可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登录信息,登录次数都会作为消息传输到kafka,当你浏览购物的时候,你的浏览信息,你的搜素指数,你的购物爱好都会作为一个个消息传递给kafka,这样可以生成报告,可以作为智能推荐,购买喜好等,

    2,传递数据;
    异步实现发送优惠券,短信操作。
    3,度量指标:
    Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

    4,日志记录:
    整合ek;
    5,流程处理
    6,流量消峰;
    Kafka 设计原理:
    1,高吞吐,低延迟:kafka 最大的特点就是收发消息非常快,kafka每秒可以处理几十万条消息,他的最低延迟只有几毫秒。
    2高伸缩性: 每个主题(topic)包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中;
    3,持久性,可靠性:kafka能够允许数据的持久化存储,消息被持久到磁盘,并支持数据备份防止数据丢失,kafka 底层的数据存储是基于Zookeeper存储的,Zoookeeper 我们知道他的数据能够持久存储。

    4,容错性:允许集群中的节点失败,某个节点宕机,kafka 集群能够正常工作;
    5,高并发: 支持数个客户端同时读写。

    Kafka 消息队列模型

    kafka 的消息队列分为俩种模式: 点对点模式和发布模式。

    kafka 是支持消费者群组的,也就是说,kafka 中 会有一个或者多个消费者,如果一个生产者生产的消息有一个消费的话, 那么这种模式就是点对点模式。

    image.png

    如果一个生产者或者多个生产者的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅式的消息队列。

    image.png

    队列模型非常简单:

    kafka 常见名词:
    Broker MQ 服务器端 构建一个kafka 服务,多个broker 多个kafka集群。
    Producer: 生产者: 向Broker中投递消息,
    Consumer:消费者: 从mq服务器端Broker 当中获取消息;
    ConsumerGroup:消费者组 在同一个消费者组中是不能够消费同一个分区中消息的。
    在多个不同的消费者组中。多个不同的消费者可以消费到同一个消息。
    Topic: 主题: 存放消息的队列分类 根据业务来实现分类,比如邮件主题,短信主题,优惠券主题,逻辑概念。
    Pattern: 分区存放消息,一个topic分成多个不同的区域存放起来 目的是为了后期消费者能够实现扩展性。
    Replica: 副本数据。
    Offset: 标记我们消费者消费的位置
    在kafka中不管消费者消费成功还是失败,实际上该消息不会立马被删除都是后期通过定时任务删除消息,痛offset值可以灵活获取到以前已经被消费的消息。
    Zookeeper: brokers, topics 信息都会存放在zk 上。

    leader:
    Follower:

    kafka 环境安装:

    安装前的准备
    由于Kafka 是用Scala语言开发的,运行在JVM上,因此在安装KAfka之前需要先安装jdk

    Win安装kafka
    安装前的环境要求;
    需要有jdk的环境:
    先安装zk 环境:

    1, 先安装zk解压\zookeeper-3.4.14
    2, 进入到\conf 修改zoo_sample.cfg 为zoo.cfg
    修改配置文件: dataDir=/tmp/zookeeper

    3 进入bin目录; zookeeper-3.4.14\bin
    4 zkServer.cmd;


    image.png

    5 ,使用zktols 连接到zk


    image.png

    先安装kafka环境:
    1,解压kafak 安装包

    1. kafka_2.13-2.5\kafka_2.13-2.5.0
      3启动和停止kafka
      .\bin\windows\kafka-server-start.bat .\config\server.properties


      image.png

    注意解压文件名称:去除 版本名称,不然再启动的时候有可能
    报如下错误:
    The syntax of the command is incorrect.
    查看zk节点信息

    image.png

    创建topic:

    kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --topic msntopic --partitions 3 --replication-factor 1 --config cleanup.policy=compact --config retention.ms=500
    
    

    查看主题:

    kafka-topics.bat --list --bootstrap-server 127.0.0.1:9092
    

    删除主题:

    kafka-topics.bat --delete --bootstrap-server 127.0.0.1:9092 --topic mayikttopic
    
    ``
    #### Linux环境安装kafka
    
    ##### 单机版本安装
    
    1、下载地址: [https://mirrors.bfsu.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz](https://mirrors.bfsu.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz)
    
    2.后台启动kafka
    
    nohup bin/kafka-server-start.sh config/server.properties > kafka-run.log 2>&1 &
    
    3.验证kafka是否启动成功
    
    ps aux | grep 'kafka' 或者jps
    
    ![image.png](https://img.haomeiwen.com/i12197462/552d18414e5cffda.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    停止kafka 
    ./kafka-server-stop.sh
    
    创建topic
    

    ./kafka-topics.sh --create --bootstrap-server 192.168.31.128:9092 --topic mayikttopic --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config retention.ms=500

    查看主题:
    ./kafka-topics.sh --list --bootstrap-server 192.168.31.128:9092
    
    PartitionCount:partition 个数。
     ReplicationFactor:副本个数。
     Partition:partition 编号,从 0 开始递增。
     Leader:当前 partition 起作用的 breaker.id。
     Replicas: 当前副本数据所在的 breaker.id,是一个列表,排在最前面的其作用。
     Isr:当前 kakfa 集群中可用的 breaker.id 列表。
    
    
    删除主题
    

    ./kafka-topics.sh --delete --bootstrap-server 192.168.31.128:9092 --topic mayikttopic

    
    ##### 集群版本安装
    
    #### 演示生产者与消费者
    
    1.创建一个该topic分区为3 ,副本也是为1
    
    kafka-topics.bat --create --bootstrap-server 127.0.0.1:9092 --topic mayikttopic --partitions 3 --replication-factor 1 --config cleanup.policy=compact --config retention.ms=500
    
    2.创建消费者监听
    
    kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic mayikttopic --from-beginning
    
    1. 生产者投递消息
    
    kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic mayikttopic
    
    #### [kafkamanager](https://www.cnblogs.com/toutou/p/kafkamanager.html)
    
    ### Kafka核心配置
    
    ##### Topic 级别配置属性
    
     当如下所示的属性配置到 Topic 上时,将会覆盖 server.properties 上对应的属性
    
    
    Springboot 整合Kafka
    Maven依赖
    
    代码解释:
    原理细节
    1.生产者投递消息的时候,指明是三个参数;
    Topic 主题名称,
    Key: 处理我们的消息应该在那个分区存放:解决消息顺序一致性问题
    Data 数据;
    ---生产者采用批量的形式投递
    2,生产者有一个缓冲区,生产者先将消息投递到缓冲区中,等缓冲区man了以后,在将缓冲区的消息批量的形式投递到mq中,从而可以减少mq客户端与服务端之间的通讯次数,可以提高效率。
    ---mq 服务器如何处理消息存放过程 集群同步 副本数据原理。
    
    3,消费者如何处理我们的消息
    kafka 和其他的mq不一样的,消费者消费完了以后,实际上不会立即被mq服务器端给移除。
    offset概念: 记录该消费者消费我们分区中的消息位置,方便下一次获取消息从该foffset位置开始‘
    同一个消费组中,多个不同的消费者是不能够消费同一条消息的。
    消费者获取消息的时候,根据当前消费者对应自己的offset值获取分区中的消息,
    如果消费者消费消息成功以后,commit offset值,下次获取该消息从offset+1开始
    commit offset 类似于rabbitmq 中 ack消息确认机制
    
    消息既然是没有被删除,那如果容量满了以后如何处理?
    生产者:在投递消息的时候会传递一个
    topic名称
    key 可以根据该key 实现扩展功能,比如解决快消息顺序一致性的问题
    Data数据
    消费者:
    topic 名称
    消息的key
    分区的名称
    offset
    消费者内容:
    #kafka设计原理分析
    1生产者投递消息,首先会将消息投递到kafka客户端缓冲区中,缓冲区满了,在将该消息投递到kafka服务器中,
    2,为什么要设计kafka客户端缓冲区,假设在1s内发送1000条信息,如果每次以一条一条发送对kafak 服务器端压力非常大,所以在这时设计一个缓冲区,缓冲区满了,在批量将消息投递到kafak中,从未减少网络传输的次数
    
    3,kafka 消费者订阅我们的分区,获取消息,获取消息成功或失败,消息不会立马被kafka删除
    采用定时删除策略 或者当数据满自动清除历史的数据。
    log.retention.hours=48 #数据最多保存48小时
    log.retention.bytes=1073741824 #数据最多1G
    
    4,注意,在同一个消费者组中不允许该组中有多个消费者消费同一个消息,多个不同的分组中可以实现不同组中消费者消息
    
    
    ![image.png](https://img.haomeiwen.com/i12197462/548ed88af21ee861.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    手动commit offset
    
    新增配置:
    

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.listener.AbstractMessageListenerContainer;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    import org.springframework.kafka.listener.config.ContainerProperties;

    /**

    • @ClassName KafKaConfig

    • @Author

    • @Version V1.0
      **/
      @Configuration
      public class KafKaConfig {

      @Bean
      public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory);
      factory.getContainerProperties().setPollTimeout(1500);
      //配置手动提交offset
      factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
      return factory;
      }
      }

    
    
    ```
    
    # kafka
    spring:
      kafka:
        # kafka服务器地址(可以多个)
        bootstrap-servers: 127.0.0.1:9092
        consumer:
          # 指定一个默认的组名
          group-id: kafka2
          # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
          # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
          # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
          auto-offset-reset: earliest
          # key/value的反序列化
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          enable-auto-commit: false
    
    ```
    ```
    
    
    package com.taotao.kafka.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author wangj
     * @title: KafakController
     * @description;
     * @date 2021/5/16 9:33
     */
    @RestController
    @Slf4j
    public class KafakController {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
    /*
        @KafkaListener(topics = "mmtopic")
        public void receive(ConsumerRecord<?, ?> consumer) {
            String offset = consumer.offset() + "";
            log.info(">topic名称:{},key:{},offset{},,value{}<", consumer.topic(), consumer.key(), consumer.partition(), offset, consumer.value());
        }
    */
    /*
            @KafkaListener(topics = "mmtopic", groupId = "consumer01")
            public void receive01(ConsumerRecord<?, ?> consumer, Acknowledgment ack) {
                String offset = consumer.offset() + "";
                log.info("分组1的消费者1>topic名称:{},,key:{},分区位置:{},offset{},数据:{}<",
                        consumer.topic(), consumer.key(), consumer.partition(), offset, consumer.value());
                //手动提交
                ack.acknowledge();
            }
    
        @KafkaListener(topics = "mmtopic", groupId = "consumer01")
        public void receive02(ConsumerRecord<?, ?> consumer, Acknowledgment ack) {
            String offset = consumer.offset() + "";
            log.info("分组1的消费者1>topic名称:{},,key:{},分区位置:{},offset{},数据:{}<",
                    consumer.topic(), consumer.key(), consumer.partition(), offset, consumer.value());
            //手动提交
            ack.acknowledge();
        }*/
    
        @KafkaListener(topics = "mmtopic", groupId = "consumer03")
        public void receive03(ConsumerRecord<?, ?> consumer, Acknowledgment ack) {
            String offset = consumer.offset() + "";
            log.info("分组1的消费者1>topic名称:{},,key:{},分区位置:{},offset{},数据:{}<",
                    consumer.topic(), consumer.key(), consumer.partition(), offset, consumer.value());
            //手动提交
            ack.acknowledge();
        }
    
        @KafkaListener(topics = "mmtopic", groupId = "consumer04")
        public void receive04(ConsumerRecord<?, ?> consumer, Acknowledgment ack) {
            String offset = consumer.offset() + "";
            log.info("分组1的消费者1>topic名称:{},,key:{},分区位置:{},offset{},数据:{}<",
                    consumer.topic(), consumer.key(), consumer.partition(), offset, consumer.value());
            //手动提交
            ack.acknowledge();
        }
        @RequestMapping("kafka")
        public String testKafka() {
            int max = 5;
            for (int i = 0; i < max; i++) {
                send("key" + i, "data" + i);
            }
            return "success";
        }
    
        private void send(String key, String data) {
            //topic 名称, key data
            kafkaTemplate.send("mmtopic", key, data);
        }
    }
    
    ```
    Offset Reset 三种模式
    # earliest(最早):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    # latest(最新的):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    # none(没有):topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    auto-offset-reset: earliest
    
    kafka-consumer-groups.bat  --bootstrap-server 127.0.0.1:9092 --group consumer01 --describe
    当前消费者 offset :CURRENT-OFFSET CLIENT-ID
    
    
    Kafka 每个分区只对应一个消费者消费消息。
    ```
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.itmayiedu</groupId>
        <artifactId>springboot2.0_kafka</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.1.RELEASE</version>
        </parent>
        <dependencies>
            <!-- springBoot集成kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <!-- SpringBoot整合Web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.62</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
        </dependencies>
    
    </project>
    ```
    
    为啥kafka 需要设计分区概念:
    1,提高吞吐量
    2,能够更好的支持多个消费者消费;
    3,能够更好的实现动态扩容;
    
    消息顺序一致性的背景:
    在kafak中我们会对mq实现分区,多个消费者集群消费消息,生产者投递的消息会给对个不同的消费者分摊消费者消息,无法保证每个消息先还是后执行的问题;
    
    解决消息顺序一致性问题核心思想:就是生产者投递消息,一定要存放在同一个分区中,最终被同一个消费者消费消息。
    既能够保证消息顺序一致性的问题,又能提高效率,在kafak中就存在分区的架构模型实现一个消费者对应一个分区的消费消息。
    Kafka中已经帮你实现好了,只需要投递消息都设置相同同一个消息key的情况下都会存放到同一个分区中,最终被同一个消费者消费消息
    
    Kafka投递消息如何存放的?
    
    1.Kafka实际上就是日志消息存储系统, 根据offset获取对应的消息,消费者获取到消息之后
    该消息不会立即从mq中移除。
    2.  将topic分成多个不同的分区、每个分区中拆分成多个不同的segment文件存储日志。
    每个segment文件会有
    .index 消息偏移量索引文件
    .log文件 消息物理存放的位置
    
    在默认的情况下,每个segment文件容量最大是为500mb,如果超过500mb的情况下依次内推,产生一个新的segment文件
    
    Segment01  segment500 segment1000
    
    解决消息顺序一致性问题如何提高效率
    1.  增加多个不同的Partition分区  实现 一个消费者对应一个Partition分区
    绝大多数的项目使用到mq时候,是不需要保证消息顺序一致性问题。
    
    
    
    
    
    
    
    1.  消息顺序一致性
    消息顺序一致性产生:
    Kafka的时候 多个不同的分区对应不同的消费者消费消息的时候,需要遵循严格消费顺序。
    每个消费者获取到消息之后实现消费的时候先后执行顺序问题很难保证的。
    
    
    1,kafka消息存储底层结构原理
    2.Kafka消息为何使用稀疏索引结构存放
    3 kafak 如何查找指定offset的message
    4 kafka 如何保证生产消息可靠性
    5,kafakACk 的三种模式之间的区别
    6,kafak, ISR,LEQ,HW,结构设计原理
    7kafak 副本故障机制
    6,kafak 能够严格意义上保证消息不丢失吗
    
    Kafka 投递消息如何存放的?
    1,kafka 实际上就是日消息存储系统,根据offset 获取对应的消息,消费者获取到消息之后, 该消息不会立刻从mq删除
    2,将topic 分成多个不同的分区,每个分区中拆分成多个不同的segment 文件存储日志,每个segment 文件都会有:
    .index 消息偏移量索引文件
    .log 文件 消息物理存放的位置
    
    在默认的情况下,每个segment 文件容量最大是500md,如果超过500md的情况下一次内推,产生一个新的segment文件。
    
    Segment01  segment500 segment1000
    
    1, Kafka mq(日志存储系统)服务端存储消息的时候,不管该消息是否有消费成功,最终该消息不会立即删除
    2,Kafka 分区模式存放消息,存储位置:log.dirs=F:/path/kafkaandzk/kafka/kafka-log, 底层持久化我们消息设计到: segment
    1,在一个分区中,会将一个大的分区拆分n多个不同大小segment 文件,每个segment文件存放我们该分区日志消息;
    2,在每个segment 中会有index,log
    .index----- 消息偏移量索引文件
    .log----消息持久化内容;
    
    
    Message 结构
    log 文件就是实际存储message 的地方,在producer 往kafka 写入的也是一条一条的message,消息主要包含消息体,消息大小,offset,压缩类型。。等主要是下面三个。
    1,offset: offset是一个占8byte的有序id号, 他可以唯一确定每条在parition 内的位置;
    2,消息大小: 消息大小占用4byte,用于描述消息的大小;
    3,消息体: 消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
    存储策略
    ```
    无论消息是否被消费,kafka 都会保存所有的消息,旧消息删除策略:
    1,基于时间,默认配置是168小时(7天);
    2,基于大小,默认配置是1073741824
    
    
    ```
    kafka如何查找指定offset的message
    1,kafak 实际上是一个日志存储系统,采用分区的模式存放消息,日志存放位置:
    log.dirs=F:/path/kafkaandzk/kafka/kafka-log
    2,Topic是一个逻辑概念,Partition 分区为物理存放消息,每个partition 分区对应一个存放消息的log 文件,生产者投递消息不断追加到该log文件未端,且每条消息有自己独立的offset。由于生产者生产的消息不断追加log文件未尾。为了防止log文件过大导致后期查询或者删除部分消息效率偏低,kafak 采用分段和索引机制查询消息。
    3,每个partition 分成n多个不同的segment file,segment file 默认大小是为500MB,当如果达到该大小容量之后,开始创建一个新的segment file,依次内推,类似于ConcurrentHashMap1.7成将一个大的ConcurrentHashMap集合拆分成多个小的HashTable,每个segment对应两个文件,分别为.index索引和.log数据文件,
    .log存储消息文件
    .index存储消息的索引
    .timeIndex,时间索引文件,通过时间戳做索引
    
    ![image.png](https://img.haomeiwen.com/i12197462/aba6b5bf7b62cafe.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    segment file 命名规则:
    每个segment file 也有自己的命名规则,每个名字20个字符,不够用0填充,每个名字从0 开始命名,下一个segment file 文件的名字就是,上一个segment file中最后一条消息的索引的值。在.index文件中,存储的是key-value格式的,key代表在.log中按顺序开始顺序消费的offset值,value代表该消息的物理消息存放位置,但是在.index中不是每条消息都做记录,在已有的记录中查找,范围也大大缩小了。
    
    每个log 文件的大小是一样的,但是存储的message数量是不一定相等(每条的message 大小不一致),文件的命名是以该segment最小 offset来命名的,如果000.index 存储offset 为0~368795的消息,kafka 就是利用分段+索引的方式来解决查找效率的问题。
    
    
    index 偏移量索引文件
    key=offset value 物理位置存放。
    
    原理分析:
    一个分区中拆分n多个不同的小的segment 文件
    Segment0
    容量满的情况下 500条消息 offset 500
    Segment500
    容量满的情况下 500条消息 offset 1000
    Segment1000
    
    
    ![image.png](https://img.haomeiwen.com/i12197462/2a5094994477db27.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    1, offset=7的情况下 对应的segemnet文件;
    二分查找算法
    查找到该分区所有的segment文件list排序 每个segement 文件都是一个命名规范。
    offset=7 在我们的Segment0 文件中。
    2,先访问该index 文件, 根据offset值查询到物理存放位置。
    二分算法:
    Offset=7>6>9 所以定位到offset=6 获取到物理存放位置9807
    
    3,根据物理存放位置9807 去对应的log文件查找消息,依次向下查找+1次,获取到offset=7 的消息。
    为什么kafka中的  索引文件没有对每个消息建立索引呢?
    目的是为了节约我们空间的资源。
    稀疏索引算法+二分查找算法,定位到令居,在根据顺序遍历查找。
    
    如果offset消息,没有对应的索引的索引的情况下,时间复杂度是多少: oN
    如果offset消息 有对应的索引的情况下,时间复杂度为为多少:(01);
    
    kafka 如何查找指定offset 的Message的。
    ![image.png](https://img.haomeiwen.com/i12197462/48ec8681d10eefe5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    算法手写:
    一个分区分成n多个不同的segment
    
    Segment01
    00000000000000000000.index
    容量满了 offset 500=
    Segment02
    00000000000000000500.index
    容量满了 offset 1000=
    Segment03
    000000000000000001000.index
    
    
    Offset=580>500<1000 Segment02 中
    
    ![image.png](https://img.haomeiwen.com/i12197462/e861e75186644c45.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    
    1,遍历获取到所有的segment文件, 拿到所有的segment 文件之后,获取名称。
    2,在使用二分查找算法获取该offset 在那个Segment 文件中。
    3,在index 文件中:
     key=offset 值value就是物理存放位置---log文件。
    原理
    查询offset=7,
    利用二分查找算法查找到9>offset=7>6 直接定位offset=6,获取该对应的物理位置=9807.
    在根据9807 从日志文件中,依次向下遍历一次,得到offset=7的消息;
    
    为什么kafka 在设计的时候 不会所有消息建立索引?
    1,底层采用稀疏索引算法,如果该offset 没有在索引列表中的情况下,则使用二分查找算法+遍历定位物理存放位置。
    
    如何查看Kafka日志和index文件
    kafka-run-class.bat kafka.tools.DumpLogSegments --files 00000000000000000000.log
    kafka-run-class.bat kafka.tools.DumpLogSegments --files 00000000000000000000.index
    
    
    kafka 如何保证生产消息可靠性:
    副本机制的概念:
    1,所谓的副本机制(Replication),也可以称之为备份机制,通常指分布式系统在多台互联网的机器上保存相同的数据拷贝。副本机制有什么好处?
    2,提供数据冗余: 即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性
    3,提高高缩性;支持横向扩展,能够通过添加机器的方式来提升读的性能,进而提高操作吞吐量。
    3,改善数据局部性,允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
    
    kafka 是有主题概念的,而每一个主题又进一步划分成若干个分区。副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本。
    所谓的副本,
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    

    相关文章

      网友评论

          本文标题:90 kafka

          本文链接:https://www.haomeiwen.com/subject/sjrzdltx.html