kafka

作者: husky_1 | 来源:发表于2022-02-22 14:55 被阅读0次

kafka 是一个分布式消息中间件,支持多分区,多副本,多订阅者的,基于zookeeper协调的分布式消息系统
特点
1 . 高吞吐量,低延迟: 每秒处理几十万数据,延迟最低只有几毫秒
2 . 可扩展性:kafka集群支持热扩展
3 . 持久性,可靠性: 消息被持久化道本地磁盘,并且支持数据备份
4 .容错性: 允许集群中节点失败(若副本数位n,允许n-1个节点失败)
5 . 高并发: 支持数千个客户端同时读写

主要应用于大数据实时处理领域

1 架构

基本架构
  • producer: 消息生产者,向kafka集群发送消息的客户端

  • consumer: 消息消费者,向kafka集群取消息的客户端

  • consumer group:消费者组,由多个消费者组成。 消费者组内的每个消费者消费不同partiton的数据,一个partition只能由一个组内的消费者消费,消费者组之间互不影响 消费者组在逻辑上就是i一个订阅者

  • broker: 一台kafka服务器就是一个broker。 一个集群由多个broker组成, 一个broker可以容纳多个topic

  • topic: Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)

  • partition: 一个topic可以分不到多个broker上, 此时每个broker 上的topic 就是对应的一个partition(分区), 每个partition是一个有序的队列

  • replica: 副本, 为了防止集群中的某个broker故障,导致对应的partition的数据丢失 而提供的副本机制,一个topic 每一个partition可以有多个副本,即一个leader 对应若干个follower

  • leader : 每个partition 的副本对应的主副本,生产者和消费者通信的对象都是leader

  • follower: 每个partition 的副本对应的从副本, 实时同步leader数据,当leader发生故障时,某个follower就会成为新的leader

  • offset: 偏移量, 每条消息都有自己的偏移量,是消息数据在对应partition中的唯一标识, 也是该消息的索引号。每个consumer都会保存自己消费到的offset+1,consumer 的消费的offset 保存在broker集群中专属的topic中(_consumer_offsets); (0.10.x 版本之前保存在zookeeper中), 在kafka 中提交的offset 都是指的下一条待消费的数据, 即已消费的offset+1

  • message : 消息, 简单来说kafka 中的每个message 由一对key-value 组成, 消息结构如下:

    message

2. 生产者(producer)

关键参数:

参数名 描述
bootstrap.servers 生产者连接集群所需的broker地址清单,可以是一个或者多个,用逗号隔开
key.serializer和value.serializer 指定key和value的序列化类型
buffer.memory RecordAccumlator 缓冲区的总大小,默认32M
batch.size 缓冲区一批次数据的最大值,默认16k,适当增加该值,可以提高吞吐量
linger.ms 如果数据未达到batch.size, 在设置的linger.ms 设置的等待时间到来后,就会发送数据,默认是0ms, 表示没有延迟,一般设置为5~10ms
ack broker 接收生产数据后的应答机制。 0: 生产者只管发送,不等待broker的应答;1: 生产者发送完数据,只等待leader 节点的应答;-1(all): 生产者发送完数据,等待leader节点和ISR队列中的所有节点同步完后应答
max.in.flight.requests.per.connection 指定了生产者在收到服务器响应之前可以发送多少个消息(发送请求的缓存,一个batch 一个request)。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。默认是5
retries 消息发送失败后的重发次数,默认是int的最大值 2147483647
retry.backoff.ms 两次重试之间的时间间隔,默认是100ms
enable.idempotence 是否开启幂等性,默认是true
compression.type 生产者发送数据的压缩方式,默认是none,不压缩,支持的压缩方式:gzip,snappy,lz4,zstd
2.1 发送流程
流程

在消息发送过程中,涉及到了两个线程--main线程和sender线程,在main线程中流程如下:
1 . 将消息数据发送给Interceptors 预处理( 可选),然后通过Serializer 进行序列化处理
2 . 序列化之后的数据通过分区选择器, 将消息发送给对应的双端队列(RecordAccumulator, 默认是32M)

sender 流程如下:
1 . 从队列(RecordAccumulator)中拉取消息, 有两种拉取策略

  • .batch.size:只有队列中积累的数据量达到batch.size大小后,sender才会发送数据,默认是16K
  • . linger.ms:如果数据没达到数据量batch.size, sender会根据linger.ms设置的时间发送数据,默认是0ms, 即无延迟发送

2 . 当拉取到数据后,就会将数据通过selector 发送给对应分区的leder 副本的broker
3 . broker接收到数据后,通过发送ack应答,表明数据发送成功。 ack 应答机制有如下的三种:

  • .0: 生产者只管发送,不等待broker的应答
  • .1: 生产者发送完数据,只等待leader 节点的应答
  • .-1(all): 生产者发送完数据,等待leader节点和ISR队列中的所有节点同步完后应答

4 . 如果生产者没收到ack应答,就将重试发送

2.2 partition 分区
2.2.1 分区的作用
  • .便于合理使用存储资源,每个partition在一个broker上存储,切割海量数据到不同的的broker上, 合理的控制分区任务,可以实现负载均衡
  • . 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费
2.2.2 分区策略
  1. 默认的分区策略:

1 . 如果生产者指定了发送的分区,则按照指定的分区发送
2 .如果没指定分区,则按照消息中的key的hash值对设置的分区数取模
3 . 如果分区和key 都没指定,则选择粘性分区(sticky partition,随机选择分区并不变),直到对应的recordAccumulator的batch.size满了或者linger.ms 时间到了,再随机选择其他分区

  1. 自定义分区器:
2.3 数据可靠保证
2.3.1 Ack应答机制

Ack 的应答机制确保数据不会丢失,Ack 应答机制有如下的三种, 可靠程度依次提高

0 : 生产者只管发送,不等待broker的应答(不推荐)

缺点: 无法保证broker 是否收到了数据

1 : 生产者发送完数据,只等待leader 节点的应答

缺点: 有可能leader 节点ack 应答后,还没同步就挂了,导致后续新选举的leader 节点丢失了该条消息

-1(all): 生产者发送完数据,等待leader节点和ISR队列中的所有follower节点同步完后应答

ISR队列(in-sync replica set): 和leader 保持同步的follower和leader节点的集合(leader:1,isr:1,2,3)
例如: 主题为test的topic 创建了一个分区,总共有两个副本,其中leader 副本对应的broker id=2, follower副本对应的broker id=1,此时isr队列中的集合就是(2,1)

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic test --describe

Topic: test     TopicId: MPY45pufQ_m-bW7h_cSULQ PartitionCount: 1       ReplicationFactor: 2    Configs: segment.bytes=1073741824
Topic: test     Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1

如果follewer 节点长时间没有发送通信请求或者同步数据,则将被踢出ISR 队列, 该时间阈值由参数replica.lag.time.max.ms 设定,默认是30s

总结 三种应答机制的可靠性依次提高,但是在特殊情况下,-1的应答机制也没办法完全保证数据不丢失。例如: 分区副本只有一个,或者isr 中的节点数只有一个, 此时相当于ack=1, 仍然有丢失数据的风险

确保数据不丢失的条件: ack=-1 + 分区副本>1 + isr队列中节点数>1

2.3.2 数据幂等性

procuder 不论向broker 发送多少次重复的数据, Broker 端都只会持久化一条数据,保证数据不重复(去重)

去重标准:具有<PID,Partition,SeqNumber> 相同主键的消息提交时,Broker只会持持久化一条,其中PID 是每次producer分配的一个新的id,Patition 分区号,Sequence Number 单调自增

幂等性

幂等性只能保证在单分区单会话内的不重复
幂等性的开启通过参数:enable.idempoyence= true 设置

2.3.3 生产者事务

开启事务必须要开启幂等性

事务流程
######## 待补充################
2.3.4 数据有序
  1. 在kafka 1.x版本之前 ,为了保证数据的单分区有序,条件如下:max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)

  2. kafka 1.x及以后版本为了保证单分区有序,条件如下:

  • 未开启幂等性
    max.in.flight.requests.per.connection=1
  • 开启幂等性
    max.in.flight.requests.per.connection<5 即可
    原因说明, 在kafka1.x以后,kafka broker 端会缓存producer 发送的最近5个request 的元数据, 同时在幂等性的前提下, 通过Sequence Number对请求进行排序,此时就保证了最近5个request的数据有序
2.3.3 数据传递语义:
  • 至少一次(At Least Once):
    保证数据不丢失,但是不能保证数据不重复

ack=-1 + 分区副本>1 + isr队列中节点数>1

  • 最多一次 (At Most Once):
    保证数据不重复,但是不能保证数据不丢失

幂等性

  • 精确一次 (Exactly Once)
    确保数据不丢失,也不重复

幂等性 + ack=-1 + 分区副本>1 + isr队列中节点数>1

4 Broker

重要参数:

参数名 描述
replica.lag.time.max.ms ISR中,follower 由于长时间未与leader 通信而导致被踢出isr的时间阈值,默认30s
auto.leader.rebalance.enable 自动leader partiiton平衡,默认是trrue,建议关闭
leader.imbalance.per.broker.percentage 默认是10%,每个broker 允许的不平衡的leader的比例,超过这个值,会触发leader自动平衡
leader.imbalance.check.interval.seconds 默认300s,检查leader负载平衡的时间间隔
log.segment.bytes kafka中切割为每一块数据文件的大小,默认1g
log.index.interval,bytes 默认4k,每当写入4kb大小的数据后,就往index文件记录索引(稀疏索引的数据大小阈值)
log.retention.hours kafka 数据保存时间。默认7天
log.retention.minutes kafka 数据保存时间,分钟级别,默认关闭
log.retention.ms kafka 数据保存时间,毫秒级别,默认关闭
log.retention.check.interval.ms 检查数据是否超过保存时间的间隔,默认5min
log.rentention.bytes 默认-1,表示无穷大,超时设置的所有日志的总大小后,删除最早的segment
log.cleanup.policy 数据文件删除策略,默认是delete,如果是compact,表示启用压缩
num.io.threads 默认8,负责写磁盘的线程数,参数值占总核数的50%
num.replica.fetchers 默认1, 副本拉取线程数,参数值占总核数的1/3
num.network.threads 默认3,数据传输线程数,参数值占总核数的2/3
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是log的最大值,一般不建议修改,系统自己管理
log.flush.interva.ms 没隔多久刷写数据到磁盘,默认是null,不建议修改
4.1 工作流程
4.1.1 zookeeper 存储的kafka信息

启动zookeeper 终端客户端:

[root@iZuf6g3hri8hvnuqng6id7Z apache-zookeeper-3.5.7-bin]# ./bin/zkCli.sh

通过ls 命令 查看kafka 相关信息

[zk: localhost:2181(CONNECTED) 0] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
zk 信息
4.2分区副本
4.2.1 副本基本信息
  1. 副本作用: 提高数据的可靠性
  2. 默认一个副本,生产环境一般配置为2 个,保证数据可靠性, 副本太多会增加磁盘存储空间,增加网络数据传输,降低效率
  3. kafka 中副本分为: leader和follower, 生产者将数据发送给leader, follower 和leader 进行同步数据
  4. kafka 分区所有的副本统称为AR(Assigned Replicas)
    AR= ISR + OSR
    ISR: 表示和leader 保持同步的Follower集合,如果follower长时间没有和leader 通信或者同步数据,则该follower将被踢出ISR, 该时间戳由: replica.lag.time.max.ms参数设定,默认30s。 leader发生故障后,会从ISR中选举新的leader
    OSR: 表示和follower 与leader同步时,延时过多的副本集合
4.2.2 副本leader选举流程

kafka 集群中每个broker都有对应的controller, 其中有一个controller 会被选举为controller leader( 通过查看zk的/controller 节点,可以知道选举出来的leader 节点), 负责管理集群broker的上下线和 所有topic的分区副本leader选举和分配
controller的信息同步依赖于zookeeper

  1. broker 启动后就会在zookeeper中注册
  2. controller leader 的选举机制是,先注册先成为,同时由选举出来的leader 通过zookeeper 去监听所有broker节点变化
  3. controller leader同时负责topic的分区副本leader的选择,选举策略如下:

在ISR中的存活为前提,按照AR中排在前的为优先,例如:AR[1,0,2], ISR[1,0,2],那么leader 就会按照[1,0,2]的顺序轮训

  1. controller leader 将选举的信息上传到zk 中
  2. 其他broker 的controller 去zk 中同步呢信息


    broker运行流程

测试:
kafka 集群集群中有三个broker ,id分别为1,2,3, 此时创建主题名为test1, 分区数3,副本数3的topic,查看此时topic的详细信息如下:

test1分区0,1,2的leader 节点id为2,3,1
AR 分别为[2,3,4],[3,1,2],[1,2,3], ISR分别为[2,3,1],[3,1,2],[1,2,3]

[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test1 --partitions 3 --replication-factor 3
Created topic test1.

# 查看topic详细信息
[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test1 
Topic: test1    TopicId: pfPkdWsKTyCCsoektoTayQ PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: test1    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: test1    Partition: 1    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: test1    Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

下线节点broker 3,此时分区1 的leader 节点挂了,需要重新选举leader,安装选举规则,应该选举 brokerid为1 的节点作为leader

[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test1 
Topic: test1    TopicId: pfPkdWsKTyCCsoektoTayQ PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: test1    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,1
        Topic: test1    Partition: 1    Leader: 1       Replicas: 3,1,2 Isr: 1,2
        Topic: test1    Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2
4.2.3 broker 节点故障处理

概念理解:
LEO(Log End Offset):每个腹部的最后一个offset,其实就是最新的offset +1
HW (High WaterMark): 所有副本中最小的LEO

1 . topic 分区的follower 对应的一个节点挂了


2 . topic 分区的leader 节点挂了


4.3 文件存储

4.3.1 文件存储机制

每个topic 对应的每个分区(partition)都对应一个log 文件,该log文件中的存储的就是kafka的生产的数据,producer生产的数据会不断的追加到文件末尾, 为了防止log 文件过大,导致数据查询效率低下,kafka采用分片索引机制。
每个partition分为多个segment,每个segment 文件包括:".index"文件,".log"文件, ".timeindex"等文件, 这些文件位于一个文件夹下,文件夹命名规则:topic名+分区序号

分区数据存储
例如 : 查看test-0 分区的数据文件内容
[root@iZuf6g3hri8hvnuqng6id7Z test-0]# ls 
00000000000000000000.index  00000000000000000000.timeindex  leader-epoch-checkpoint
00000000000000000000.log    00000000000000000001.snapshot   partition.metadata
log-segment 数据查询定位
4.3.2 文件删除机制

kafka 中的默认数据保存时间为7天, 可以通过如下参数修改:

  • log.retention.hours : 最低优先级,默认7天
  • log.retention.minutes: 优先级次之 ,分钟
  • log.retention.ms: 最高优先级,毫秒
  • log.retention.check.interval.ms : 负责设置检查周期,默认5min
    kafka 中的数据文件清理策略有 deletecompact
  1. delete: 将过期数据删除
    设置参数: log.cleanup.policy=delete
    基于时间:默认打开,以segment 中所有记录中的最大时间戳作为该文件的时间戳, 即当一个segment中的部分数据超期了, 此时会等待该segment所有数据超期后,再删除
    基于数据大小:默认关闭,超过设置的所有日志的总大小,删除最早的segment。log.retention.byte=-1 (表示无穷大, 即关闭基于数据大小)
  2. compact: 日志压缩
    对于相同key的不同value值, 只保留最后一个版本
    设置参数:log.cleanup.policy=comapct
    数据压缩
4.4 高效读写数据

1 . kafka本身是分布式的集群,采用分区策略,并行度高
2 . 数据存储采用稀疏索引,可以快速定位要消费的数据
3 . 顺序写磁盘: kafka 的数据写入过程是文件末尾追加的方式

  1. 页缓存 + 零拷贝的方式

5 消费者(consumer)

kafka 中的消费者采用pull(拉)模式
重要参数

参数名 描述
bootstrap.servers 生产者连接集群所需的broker地址清单,可以是一个或者多个,用逗号隔开
key.deserializer和value.deserializer 指定key和value的反序列化类型
gruop.id 消费者所属的消费者组id
enable.auto.commit 默认为true,自动周期性的提交offset
auto.commit.interval.ms 自动提交offset的时间间隔
auto.offset.reset 初始化时偏移量的设置策略,默认latest。1. earliest:自动重置偏移量到最早的偏移量;2. latest:自动重置偏移量到最新的; 3. none:如果原来的消费者组偏移量捕存在,则向消费者抛出异常
offsets.topic.num.partitions _consumer_offsets的分区数,默认50 ,不建议修改
heartbeat.interval.ms kafka 消费者和coordinator之间的心跳时间,默认3s
session.timeout.ms kafka和coodinator之间的连接超时时间,默认45s,超过该时间,该消费者被移除消费者组,消费者组会进行再平衡
max.poll.interval.ms 消费者处理消息的最大时长,默认5min,超过该时间,该消费者被移除消费者组,消费者组会进行再平衡
fetch.mxax.bytes 默认50M,消费者向broker一次拉取的最大字节数,如果服务器端一批次的数据大于该值,仍可以拉取数据,一批次的大小受到message.max.bytes(broker config)和 max.message.bytes(topic config)影响
max.poll.records 一次拉取数据的最大条数,默认500
partition.assignment.strategy 消费者分区分配策略,包括了 range,roundRobin,sticky,cooperativeSticky
5.1 总体消费流程

1 . 消费者按照partition 的offest 按顺序依次读取里面的数据
2 . 一个消费者可以消费多个主题的多个分区的数据
3 . 在消费者组中, 每个分区只能由该消费者组中的一个消费者消费,防止重复消费
4 . 每个消费者的消费offset 由消费者提交到系统的主题中保存


消费流程
5.2 消费者组

Consumer Group :由多个consumer 组成, 组内成员拥有共同的groupId

  • 消费者组内的每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费
  • 消费者组之间互不影响
  • 如果消费者组中的消费者数量多于主题分区的数量,则会有一部分的消费者处于闲置状态,不消费任何数据
5.2.1 消费者组初始化流程

概念:

coordinator: broker 中的组件,负责辅助实现消费者组的初始化和分区的分配
coordinator节点选择= groupid的hashcode %50
注:50 指的是系统主题_consumer_offsets的分区数量

例如: groupId 的hashcode=1,1%50=1, 此时间_consumer_offsets主题的1号分区所在的broker 节点的coordinator就是这个消费者组的老大,此时初始化流程如下:

  1. 消费者的每个消费者发送joinGroup请求给coordinator
  2. coordinator选出一个consumer 作为leader
  3. coordinator把要消费的topic情况发送给leader消费者
  4. leader 消费者指定消费方案并发送给coordinator
  5. coordinator再分发消费方案给各个消费者
  6. 每个消费者和coordinator 保持心跳(默认3s), 一旦超时(session.timeout.ms=45s),该消费者就会从消费者组中移除,并触发再平衡;或者消费者的处理消息时间过长(max.poll.interval.ms=5min),也会触发再平衡
初始化流程

kafka 有专门的分区策略来支持在消费者组中对消费分区的分配
主要策略有 Range,RoundRobin,Sticky,CooperativeSticky(3.0 版本新增), 通过配置参数: partition.assignment.strategy来设置,默认是Range 策略

  1. Range 策略

    Range 策略是针对于每个topic 而言的
    1 首先对同一个topic 中的分区按照序号排序,并对消费者组中的消费者按照client.id字典排序
    2 通过 partition 数/ consumer 数 得到商n 和余数m,则每个消费者至少分到n个分区,然后前m 个消费者多分一个分区

    例如: 现在有7个分区(0,1,2,3,4,5,6), 消费者组中有三个消费者(c0,c1,c2), 7/3=2---1, 那么c0就会多消费一个分。此时c0 消费0,1,2 分区;c1消费3,4分区 ;c2 消费5,6分区。
    如果8个分区的话,8/3=2----2, c0,c1就会多消费一个

    range 分区
    注意: 如果只是针对一个topic,c0多消费一个分区影响不大,但是如果有N个topic, 那么c0就将多消费每一个topic的分区,容易产生数据倾斜

2 . RoundRobin策略

RoundRobin
  • 将所有订阅的topic和partition 组成topicAndPartition 列表,并按hashcode 进行排序,最后以轮训的方式分配给消费者

3 . Sticky策略
可以理解为分区的分配结果带有粘性,即在执行一次新的分配之前,会考虑上一次的分配结果,尽量减少调整改动,节省开销
Sticky策略是在kafka 0.11.x版本之后引入的,首先会尽量均匀的分配分区,类似图range策略,在消费者组中的某一消费者出现问题时,会尽量其他消费者的原有分区不变
例如 0,1,2,3,4,5,6 分区,被一消费者组的三个消费者c0,c1,c2 消费, 初始化分配时按照均匀分配的原则,将所有分区随机均匀分配给消费者, 假如c0 分配到0,2,3;c1 分配到1,4 ;c2 分配到5,6;此时c0 挂了,触发再分配策略,此时将c0分分区分配给c1,c2, 结果;c1 分配到的分区为1,2,4;c2 分配到0,3,5,6

5.2.2 消费者组详细消费流程
消费流程
5.3 offset 偏移量
5.3.1 默认维护位置

从kafka0.9版本之后,consumer默认将消费的offset 存储在kafka 内置的主题_consumer_offsets中
_consumer_offsets里采用了key和value 的形式存储数据,key=group.id+topic+partition.id, value 就是对应的offset,每隔一段时间,kafka 都会对这个内置的topic 进行compact 压缩处理,是的每个key的value值都保留最新的数据


存储位置
5.3.2 自动提交offset
自动提交offset
5.3.3 手动提交offset

手动提交offset 首先需要配置参数 enable.auto.commit=false


手动提交
5.3.4 指定offset 消费

根据配置参数可以设定offset 的初始偏移量
auto.offset.reset= earliest | latest | none(默认是latest)

  1. earlist: 自动将offset 设置为最早的偏移量, --from-beginning
  2. latest: 自动将偏移量重置为最新的偏移量
  3. none: 如果未找到消费者组的先前的偏移量,则向消费者抛出异常
5.3.5 漏消费和重复消费
消费者事务

6 压力测试

kafka 自带压力测试脚本

  • 生产者压测脚本: kafka-producer-perf-test-sh
  • 消费者压测脚本: kafka-consumer-pref-test-sh
6.1 生产者压测

例:

  1. 创建名为test的主题,设置分区3个,副本3个
kafka-topic.sh --bootstrap-server 127.0.0.1:9092 --create --replication-factor 3 --partitions 3  --topic test
  1. 开始测试:
kafka-producer-pref-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 100000 --producer-props bootstrap-server=127.0.0.1:9092 batch.size=16384 linger.ms=0

参数说明:

  • record-size : 一条消息的信息量大小,单位字节
  • num-records: 总共发送的消息的数量
  • throughput:每秒发送的消息数量, 设置为-1 表示不进行限制
  • producer-props 生产者参数的相关信息
6.2 消费者压测

例:

kafka- consumer-pref-test.sh --bootstrap-server 127.0.0.1:9092 --topic test --messages 1000000 --consumer.config  config/consumber.propertier

参数说明

  • messages : 总共消费的数据数目
  • consumer.config: 消费者的配置文件,可以修改配置文件中的参数来做压力测试,查看不同参数对压测的影响

全文参考https://www.bilibili.com/video/BV1vr4y1677k

相关文章

网友评论

      本文标题:kafka

      本文链接:https://www.haomeiwen.com/subject/epwrlrtx.html