美文网首页
3.kafka生产者

3.kafka生产者

作者: 呦丶耍脾气 | 来源:发表于2022-12-20 20:45 被阅读0次

一、原理

  • Producer首先调用send方法进行发送
  • 会经过拦截器(一般情况不使用),可以对数据进行一些加工处理
  • 随后会经过序列化

kafka并没有采用Java提供的序列化器,而是自己实现的序列化器,因为Java提供的序列化器,会在原有数据的基础上,增加很多的用于安全校验的数据,在大数据的场景下,每次传输的数据量很大,如果在此基础上还要加入大量用于安全校验的数据,严重的影响了效率,所以kafka等中间件,自己实现了序列化器,仅仅进行简单的校验,增加了效率。


  • 随后经过分区器。

分区器实际上是将数据发送到了缓冲队列中,缓冲队列是一个双端队列,其内部包含内存池(当通过分区器创建数据后,申请内存,发送到集群后再释放),避免频繁的申请和释放内存。
因为kafka可以对topic进行分区,所以发送时就需要确定向哪个分区发送信息,就由分区器定义的规则来发送,一个分区对应一个队列,这些队列都是在内存中创建的,总大小默认32M,每一批次默认大小16K

  • sender线程帮助我们将缓冲队列中的数据,发送到kafka集群中。

batch.size:只有数据累积到batch.size之后,sender才会发送数据。默认16K
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送信息。单位ms,默认值是0ms(即默认到了就发送,不等待到达batch.size阈值)
生产环境中上面两个参数都需要调整

  • 发送时,以分区节点为key,即broker1,broker2为key,请求为value进行发送,形成一个请求。请求发送到某个broker中,如果第一个请求发送到broker1,broker1没有即使的应答,允许继续发送第二个请求,直到五个请求都没有得到应答,后续的请求不会再发送,直到得到了请求的应答才继续发送。

  • kafka集群收到请求之后会涉及到一个应答机制,应答级别分为0、1、-1

0:生产者发送过来的数据,不需要等待数据落盘应答
1:生产者发送过来的数据,Leader(数据落盘)收到后应答,副本有没有无所谓
-1(all) :生产者发送过来的数据,Leader和ISR里面的所有节点收齐数据后应答,-1和all等价。
(1) Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follow + Leader集合(leader:0,ISR:0,1,2)
(2) 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follow将被踢出ISR。改时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如如果2超时,(leader:0,ISR:0,1)
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景

  • kafka集群应答之后,如果成功,进行数据的清理,如果失败,进行重试,默认重试次数是int的最大值

二、Kafka事务原理

  • 开启事务,必须开启幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了数据不重复
重复数据的评判标准:具有<PID,Partition,SeqNumber>相同主键的信息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。如果想保证数据一定不重复,就需要开启事务
使用幂等性:开启参数enable.idempotence默认为true,即默认为开启

事务并不是直接存储在磁盘中,而是存储在一个特殊的topic的分区中。


三、数据乱序

生产端的InFilghtRequests,默认每个broker最多缓存五个请求,当第一个数据发送过去,第二个数据没有发送成功,这时第二波数据就要进行重试,但是此时第三波数据发送,发送成功了,然后第二波数据的重试才发送成功,本来的数据顺序是123,但是现在被改为了132,发生了数据乱序


max.in.flight.requests-per.connection设置为1,即不缓存request请求,自然不会发生数据乱序的情况。

开启幂等性以后,因为SeqNumber是单调递增的,所以当数据是顺序的时候,不需要排序就可以发送,但是当发生上面的情况之后,服务端发现数据的SeqNumber是132,不是单调递增了,会对数据进行缓存,攒到5个以后会进行重新排序,之后再进行发送。

相关文章

  • 3.kafka生产者

    一、原理 Producer首先调用send方法进行发送 会经过拦截器(一般情况不使用),可以对数据进行一些加工处理...

  • MQ深入讲解

    问题导读1.为什么使用消息队列?2.消息队列有什么优点和缺点?3.Kafka、ActiveMQ、RabbitMQ、...

  • 【转载】消息中间件MQ初步

    面试题 1.为什么使用消息队列? 2.消息队列有什么优点和缺点? 3.Kafka、ActiveMQ、RabbitM...

  • Dubbo起步

    目录 定义接口 生产者生产者依赖生产者实现 消费者消费者依赖消费者实现 初步印象 定义接口 生产者 生产者依赖 生...

  • JAVA-每日一面 2022-01-07

    问:1.kafka数据分区和消费者的关系,2.kafka的数据offset读取流程,3.kafka内部如何保证顺序...

  • KAFKA生产者

    生产者生产消息过程 生产者生产(多线程) 生产者常用配置(producer.properties) acksack...

  • Kafka生产者

    目录 生产者是如何向kafka发送消息的 生产者发送消息有几种方式 生产者怎么通过配置进行调优 生产者是如何序列化...

  • Kafka专题:3.kafka协调者

    主消费者负责计算分区的分配,将结果发送给协调者。所有消费者从协调者处获取自己的分区信息。每个消费者需要发送两种请求...

  • 智米面试总结

    1.sql优化 2.介绍下springcloud 3.kafka的消费者重复了怎么解决 4.斐波那契数列 用递归写...

  • Kafka中的服务端

    阅读以下内容你将了解到:1.Kafka的协议2.Kafka的时间轮实现(作用、原理、多级时间轮)3.Kafka中的...

网友评论

      本文标题:3.kafka生产者

      本文链接:https://www.haomeiwen.com/subject/rcxuqdtx.html